package edu.cmu.lti.oaqa.framework.async;

import com.google.common.collect.Sets;
import edu.cmu.lti.oaqa.cse.driver.AsyncConfiguration;
import edu.cmu.lti.oaqa.framework.async.activemq.ActiveMQQueueConsumer;
import edu.cmu.lti.oaqa.framework.async.activemq.ActiveMQTopicPublisher;
import edu.cmu.lti.oaqa.framework.async.activemq.ActiveMQTopicSubscriber;
import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/* loaded from: input_file:edu/cmu/lti/oaqa/framework/async/ProducerManagerImpl.class */
public class ProducerManagerImpl implements ProducerManager, MessageListener {
    private final String experimentUuid;
    private final ActiveMQQueueConsumer consumer;
    private final ActiveMQTopicPublisher publisher;
    private final ActiveMQTopicSubscriber completionListener;
    private final Set<String> consumers = Sets.newHashSet();
    private CountDownLatch latch;
    int count;

    public ProducerManagerImpl(String str, AsyncConfiguration asyncConfiguration) throws JMSException {
        this.experimentUuid = str;
        String brokerUrl = asyncConfiguration.getBrokerUrl();
        this.consumer = new ActiveMQQueueConsumer(brokerUrl, str + ProducerManager.COMPLETION_QUEUE_SUFFIX);
        this.publisher = new ActiveMQTopicPublisher(brokerUrl, Topics.values());
        this.completionListener = new ActiveMQTopicSubscriber(asyncConfiguration.getBrokerUrl(), this, Topics.PIPELINE_COMPLETE);
    }

    @Override // edu.cmu.lti.oaqa.framework.async.ProducerManager
    public void notifyCloseCollectionReaders() throws Exception {
        this.publisher.publish(this.experimentUuid, Topics.COLLECTION_READER_COMPLETE);
    }

    @Override // edu.cmu.lti.oaqa.framework.async.ProducerManager
    public void waitForReaderCompletion(long j) throws JMSException {
        this.consumers.clear();
        for (int i = 0; i < j; i++) {
            this.consumers.add(this.consumer.receive().getString("consumerUuid"));
        }
    }

    @Override // edu.cmu.lti.oaqa.framework.async.ProducerManager
    public void notifyNextConfigurationIsReady() throws JMSException {
        this.publisher.publish(this.experimentUuid, Topics.DB_CONFIG_READY);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.consumer.close();
        this.publisher.close();
        this.completionListener.close();
    }

    @Override // edu.cmu.lti.oaqa.framework.async.ProducerManager
    public void waitForProcessCompletion() throws InterruptedException {
        this.latch = new CountDownLatch(this.consumers.size());
        this.latch.await();
    }

    public void onMessage(Message message) {
        TextMessage textMessage = (TextMessage) message;
        try {
            if (textMessage.getText().equals(this.experimentUuid) && this.latch != null) {
                this.latch.countDown();
            }
        } catch (JMSException e) {
            System.err.println("Unable to process message: " + textMessage);
        }
    }
}
