package com.ning.metrics.meteo.subscribers;

import com.espertech.esper.client.EPServiceProvider;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/ning/metrics/meteo/subscribers/AMQSubscriber.class */
class AMQSubscriber implements Subscriber {
    private static final Logger log = Logger.getLogger(AMQSubscriber.class);
    private Connection connection;
    private Session session;
    private MessageConsumer consumer;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final AMQSubscriberConfig amqConfig;
    private final MessageListener listener;

    public AMQSubscriber(AMQSubscriberConfig aMQSubscriberConfig, EPServiceProvider ePServiceProvider) {
        this.amqConfig = aMQSubscriberConfig;
        this.listener = new TopicListener(aMQSubscriberConfig.getEventOutputName(), ePServiceProvider);
    }

    @Override // com.ning.metrics.meteo.subscribers.Subscriber
    public void subscribe() {
        failSafeConnect(this.amqConfig.getInitialBackoffTime(), this.amqConfig.getMaxBackoffTime());
    }

    private void failSafeConnect(int i, int i2) {
        unsubscribe();
        try {
            log.info("Attempting to connect to ActiveMQ");
            connect();
        } catch (JMSException e) {
            log.warn("Unable to connect to ActiveMQ. Will retry in " + i + " ms", e);
            unsubscribe();
            try {
                Thread.sleep(i);
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
            }
            int i3 = i * 2;
            if (i3 > i2) {
                i3 = i2;
            }
            failSafeConnect(i3, i2);
        }
    }

    private void connect() throws JMSException {
        String format = String.format("%s://%s:%d", this.amqConfig.getProtocol(), this.amqConfig.getHost(), Integer.valueOf(this.amqConfig.getPort()));
        log.info("Connecting to: " + format);
        this.connection = new ActiveMQConnectionFactory(this.amqConfig.getUsername(), this.amqConfig.getPassword(), format).createConnection();
        this.connection.start();
        this.session = this.connection.createSession(false, 1);
        this.consumer = this.session.createConsumer(this.session.createTopic(this.amqConfig.getTopic()));
        this.consumer.setMessageListener(this.listener);
        log.info("Connected!");
    }

    @Override // com.ning.metrics.meteo.subscribers.Subscriber
    public void unsubscribe() {
        try {
            this.closed.set(true);
            if (this.consumer != null) {
                this.consumer.close();
                this.consumer = null;
            }
            if (this.session != null) {
                this.session.close();
                this.session = null;
            }
            if (this.connection != null) {
                this.connection.close();
                this.connection = null;
            }
        } catch (JMSException e) {
        }
    }
}
