package edu.cmu.lti.oaqa.framework.collection;

import com.google.common.base.Throwables;
import com.google.common.io.Closeables;
import edu.cmu.lti.oaqa.ecd.BaseExperimentBuilder;
import edu.cmu.lti.oaqa.framework.DataElement;
import edu.cmu.lti.oaqa.framework.async.ProducerManager;
import edu.cmu.lti.oaqa.framework.async.Topics;
import edu.cmu.lti.oaqa.framework.async.activemq.ActiveMQQueueConsumer;
import edu.cmu.lti.oaqa.framework.async.activemq.ActiveMQQueueProducer;
import edu.cmu.lti.oaqa.framework.async.activemq.ActiveMQTopicSubscriber;
import edu.cmu.lti.oaqa.framework.types.ExperimentUUID;
import edu.cmu.lti.oaqa.framework.types.InputElement;
import java.io.IOException;
import java.util.UUID;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.apache.uima.analysis_engine.AnalysisEngine;
import org.apache.uima.analysis_engine.AnalysisEngineProcessException;
import org.apache.uima.cas.CAS;
import org.apache.uima.collection.CollectionException;
import org.apache.uima.collection.CollectionReader_ImplBase;
import org.apache.uima.jcas.JCas;
import org.apache.uima.resource.ResourceInitializationException;
import org.apache.uima.util.Progress;
import org.apache.uima.util.ProgressImpl;

/* loaded from: input_file:edu/cmu/lti/oaqa/framework/collection/AbstractCollectionReaderConsumer.class */
public abstract class AbstractCollectionReaderConsumer extends CollectionReader_ImplBase implements MessageListener {
    int nextInput;
    private String consumerUuid;
    private String experimentUuid;
    private DataElement nextElement;
    private AnalysisEngine[] decorators;
    private ActiveMQQueueConsumer consumer;
    private ActiveMQQueueProducer producer;
    private ActiveMQTopicSubscriber closeListener;
    private boolean processing = true;
    private int stageId;
    private String lastSequenceId;
    private String dataset;

    public void initialize() throws ResourceInitializationException {
        String str = (String) getConfigParameterValue("broker-url");
        String str2 = str + "?jms.prefetchPolicy.queuePrefetch=0";
        this.consumerUuid = UUID.randomUUID().toString();
        this.experimentUuid = (String) getConfigParameterValue("cse.driver.experiment.uuid");
        this.stageId = ((Integer) getConfigParameterValue("cse.driver.experiment.stageIdd")).intValue();
        try {
            initDecorators();
            this.closeListener = new ActiveMQTopicSubscriber(str, this, Topics.COLLECTION_READER_COMPLETE);
            this.consumer = new ActiveMQQueueConsumer(str2, this.experimentUuid + AbstractCollectionReaderProducer.COLLECTION_READER_QUEUE_SUFFIX);
            this.producer = new ActiveMQQueueProducer(str, this.experimentUuid + ProducerManager.COMPLETION_QUEUE_SUFFIX);
        } catch (Exception e) {
            throw new ResourceInitializationException(e);
        }
    }

    private void initDecorators() {
        this.nextInput = 0;
        String str = (String) getConfigParameterValue("decorators");
        if (str != null) {
            this.decorators = BaseExperimentBuilder.createAnnotators(str);
        }
    }

    public boolean hasNext() throws IOException, CollectionException {
        if (this.lastSequenceId != null) {
            try {
                notifyProcessed(this.dataset, this.lastSequenceId);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
        return waitForNext();
    }

    public void getNext(CAS cas) throws IOException, CollectionException {
        try {
            this.nextInput++;
            JCas jCas = cas.getJCas();
            jCas.setDocumentText(this.nextElement.getText());
            ExperimentUUID experimentUUID = new ExperimentUUID(jCas);
            experimentUUID.setUuid(this.experimentUuid);
            experimentUUID.setStageId(this.stageId);
            experimentUUID.addToIndexes();
            String sequenceId = this.nextElement.getSequenceId();
            InputElement inputElement = new InputElement(jCas);
            inputElement.setDataset(this.nextElement.getDataset());
            inputElement.setQuestion(this.nextElement.getText());
            inputElement.setSequenceId(sequenceId);
            inputElement.addToIndexes();
            decorate(jCas);
            this.dataset = this.nextElement.getDataset();
            this.lastSequenceId = sequenceId;
        } catch (Exception e) {
            throw new CollectionException(e);
        }
    }

    private boolean waitForNext() throws CollectionException {
        if (!this.processing) {
            return false;
        }
        try {
            Message receive = this.consumer.receive();
            try {
                MapMessage mapMessage = (MapMessage) receive;
                if (mapMessage == null) {
                    return false;
                }
                int i = mapMessage.getInt("stageId");
                if (this.stageId != i) {
                    throw new IllegalStateException(String.format("Received stage id %s expected %s ", Integer.valueOf(i), Integer.valueOf(this.stageId)));
                }
                this.nextElement = getDataElement(mapMessage);
                receive.acknowledge();
                return true;
            } catch (Exception e) {
                this.consumer.recover();
                Throwables.propagateIfInstanceOf(e, CollectionException.class);
                throw new CollectionException(e);
            }
        } catch (JMSException e2) {
            throw new CollectionException(e2);
        }
    }

    protected abstract DataElement getDataElement(MapMessage mapMessage) throws Exception;

    protected void decorate(JCas jCas) throws AnalysisEngineProcessException {
        if (this.decorators != null) {
            for (AnalysisEngine analysisEngine : this.decorators) {
                analysisEngine.process(jCas);
            }
        }
    }

    public Progress[] getProgress() {
        return new Progress[]{new ProgressImpl(this.nextInput, -1, "entities")};
    }

    private void notifyProcessed(String str, String str2) throws JMSException {
        Message createMapMessage = this.producer.createMapMessage();
        createMapMessage.setString("dataset", str);
        createMapMessage.setString("sequenceId", str2);
        createMapMessage.setString("consumerUuid", getConsumerUuid());
        this.producer.send(createMapMessage);
    }

    private String getConsumerUuid() {
        return this.consumerUuid;
    }

    public void onMessage(Message message) {
        TextMessage textMessage = (TextMessage) message;
        try {
            if (textMessage.getText().equals(this.experimentUuid)) {
                this.processing = false;
                Closeables.closeQuietly(this.consumer);
            }
        } catch (Exception e) {
            System.err.println("Unable to process message: " + textMessage);
        }
    }

    public void close() throws IOException {
        System.out.printf("(%s) Closing connections!!\n", Integer.valueOf(this.stageId));
        Closeables.closeQuietly(this.consumer);
        Closeables.closeQuietly(this.producer);
        Closeables.closeQuietly(this.closeListener);
    }
}
