package gov.nasa.pds.crawler.mq.amq;

import com.google.gson.Gson;
import gov.nasa.pds.crawler.mq.MQPublisher;
import gov.nasa.pds.crawler.proc.DirectoryProcessor;
import gov.nasa.pds.registry.common.mq.msg.CollectionInventoryMessage;
import gov.nasa.pds.registry.common.mq.msg.DirectoryMessage;
import gov.nasa.pds.registry.common.mq.msg.MQConstants;
import gov.nasa.pds.registry.common.mq.msg.ProductMessage;
import gov.nasa.pds.registry.common.util.ExceptionUtils;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:BOOT-INF/classes/gov/nasa/pds/crawler/mq/amq/DirectoryConsumerActiveMQ.class */
public class DirectoryConsumerActiveMQ implements Runnable, MQPublisher {
    private Thread thread;
    private Session session;
    private Destination dirQueue;
    private Destination prodQueue;
    private Destination colQueue;
    private MessageConsumer dirConsumer;
    private MessageProducer dirProducer;
    private MessageProducer prodProducer;
    private MessageProducer colProducer;
    private volatile boolean stopRequested = false;
    private Logger log = LogManager.getLogger(getClass());
    private Gson gson = new Gson();
    private DirectoryProcessor proc = new DirectoryProcessor(this);

    public DirectoryConsumerActiveMQ(Connection connection) throws Exception {
        this.session = connection.createSession(false, 2);
        this.dirQueue = this.session.createQueue(MQConstants.MQ_DIRS);
        this.prodQueue = this.session.createQueue(MQConstants.MQ_PRODUCTS);
        this.colQueue = this.session.createQueue("harvest.collections");
        this.dirConsumer = this.session.createConsumer(this.dirQueue);
        this.dirProducer = this.session.createProducer(this.dirQueue);
        this.dirProducer.setDeliveryMode(2);
        this.prodProducer = this.session.createProducer(this.prodQueue);
        this.prodProducer.setDeliveryMode(2);
        this.colProducer = this.session.createProducer(this.colQueue);
        this.colProducer.setDeliveryMode(2);
    }

    public void start() {
        this.thread = new Thread(this);
        this.thread.start();
    }

    public void stop() {
        this.stopRequested = true;
    }

    public void join() throws InterruptedException {
        this.thread.join();
    }

    @Override // java.lang.Runnable
    public void run() {
        do {
            Message message = null;
            try {
                message = this.dirConsumer.receive(3000L);
            } catch (Exception e) {
                this.log.error(ExceptionUtils.getMessage(e));
            }
            if (message != null) {
                try {
                    processMessage(message);
                    message.acknowledge();
                } catch (Exception e2) {
                    this.log.error(ExceptionUtils.getMessage(e2));
                }
            }
        } while (!this.stopRequested);
        close(this.session);
    }

    private void processMessage(Message message) throws Exception {
        if (!(message instanceof TextMessage)) {
            this.log.warn("Invalid message. ID = " + message.getJMSMessageID());
            return;
        }
        String text = ((TextMessage) message).getText();
        try {
            this.proc.processMessage((DirectoryMessage) this.gson.fromJson(text, DirectoryMessage.class));
        } catch (Exception e) {
            this.log.error("Could not parse message: " + text);
        }
    }

    @Override // gov.nasa.pds.crawler.mq.MQPublisher
    public void publish(DirectoryMessage directoryMessage) throws Exception {
        this.dirProducer.send(this.session.createTextMessage(this.gson.toJson(directoryMessage)));
    }

    @Override // gov.nasa.pds.crawler.mq.MQPublisher
    public void publish(ProductMessage productMessage) throws Exception {
        this.prodProducer.send(this.session.createTextMessage(this.gson.toJson(productMessage)));
    }

    @Override // gov.nasa.pds.crawler.mq.MQPublisher
    public void publish(CollectionInventoryMessage collectionInventoryMessage) throws Exception {
        this.colProducer.send(this.session.createTextMessage(this.gson.toJson(collectionInventoryMessage)));
    }

    private void close(Session session) {
        try {
            session.close();
        } catch (Exception e) {
        }
    }
}
