package edu.cmu.lti.oaqa.framework.collection;

import com.google.common.collect.Sets;
import com.google.common.io.Closeables;
import edu.cmu.lti.oaqa.ecd.BaseExperimentBuilder;
import edu.cmu.lti.oaqa.ecd.ExperimentPersistenceProvider;
import edu.cmu.lti.oaqa.framework.DataElement;
import edu.cmu.lti.oaqa.framework.async.activemq.ActiveMQQueueProducer;
import java.io.IOException;
import java.util.Set;
import javax.jms.JMSException;
import javax.jms.Message;
import org.apache.uima.cas.CAS;
import org.apache.uima.collection.CollectionException;
import org.apache.uima.collection.CollectionReader_ImplBase;
import org.apache.uima.resource.ResourceInitializationException;
import org.apache.uima.util.Progress;
import org.apache.uima.util.ProgressImpl;

/* loaded from: input_file:edu/cmu/lti/oaqa/framework/collection/AbstractCollectionReaderProducer.class */
public abstract class AbstractCollectionReaderProducer extends CollectionReader_ImplBase {
    public static final String COLLECTION_READER_QUEUE_SUFFIX = "-producer";
    private int count = 0;
    private Set<String> topics = Sets.newHashSet();
    private ActiveMQQueueProducer producer;
    private ExperimentPersistenceProvider persistence;

    public void initialize() throws ResourceInitializationException {
        super.initialize();
        String str = (String) getConfigParameterValue("broker-url");
        String str2 = (String) getConfigParameterValue("persistence-provider");
        if (str2 == null) {
            throw new ResourceInitializationException(new IllegalArgumentException(String.format("%s must be provided with a parameter of type <persistence-provider>", getClass().getSimpleName())));
        }
        this.persistence = BaseExperimentBuilder.loadProvider(str2, ExperimentPersistenceProvider.class);
        try {
            this.producer = new ActiveMQQueueProducer(str, getUUID() + COLLECTION_READER_QUEUE_SUFFIX);
        } catch (JMSException e) {
            throw new ResourceInitializationException(e);
        }
    }

    public final void getNext(CAS cas) throws IOException, CollectionException {
        try {
            DataElement nextFromSource = getNextFromSource();
            Message createMapMessage = this.producer.createMapMessage();
            createMapMessage.setString("dataset", getDataset());
            createMapMessage.setString("sequenceId", nextFromSource.getSequenceId());
            createMapMessage.setInt("stageId", getStageId());
            this.producer.send(createMapMessage);
            this.topics.add(nextFromSource.getSequenceId());
            this.count++;
            this.persistence.updateExperimentMeta(getUUID(), this.count);
        } catch (Exception e) {
            throw new CollectionException(e);
        }
    }

    protected abstract String getDataset();

    protected abstract int getStageId();

    protected abstract DataElement getNextFromSource() throws Exception;

    protected abstract String getUUID();

    public Progress[] getProgress() {
        return new Progress[]{new ProgressImpl(this.count, -1, "entities")};
    }

    public void close() throws IOException {
        this.persistence.updateExperimentMeta(getUUID(), this.count, this.topics);
        Closeables.closeQuietly(this.producer);
    }
}
