package com.socklabs.elasticservices.activemq;

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.io.BaseEncoding;
import com.socklabs.elasticservices.core.ServiceProto;
import com.socklabs.elasticservices.core.message.MessageUtils;
import com.socklabs.elasticservices.core.misc.Ref;
import com.socklabs.elasticservices.core.service.DefaultMessageController;
import com.socklabs.elasticservices.core.service.MessageController;
import com.socklabs.elasticservices.core.transport.Transport;
import com.socklabs.elasticservices.core.transport.TransportConsumer;
import java.util.Iterator;
import java.util.List;
import javax.annotation.PostConstruct;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;

/* loaded from: input_file:com/socklabs/elasticservices/activemq/ActiveMqTransport.class */
public class ActiveMqTransport implements Transport, DisposableBean {
    private static final BaseEncoding B16 = BaseEncoding.base16();
    private final List<TransportConsumer> consumers = Lists.newArrayList();
    private final ActiveMqTransportRef activeMqTransportRef;
    private final Connection connection;
    private final Session session;

    /* loaded from: input_file:com/socklabs/elasticservices/activemq/ActiveMqTransport$ActiveMqTransportListener.class */
    private static class ActiveMqTransportListener implements MessageListener {
        private static final Logger LOGGER = LoggerFactory.getLogger(ActiveMqTransportListener.class);
        private final List<TransportConsumer> transportConsumers;

        private ActiveMqTransportListener(List<TransportConsumer> list) {
            this.transportConsumers = list;
        }

        public void onMessage(Message message) {
            Optional<MessageController> buildMessageController = buildMessageController(message);
            if (!buildMessageController.isPresent()) {
                LOGGER.error("Could not parse message controller from message.");
                return;
            }
            Optional<byte[]> buildMessageBody = buildMessageBody(message);
            if (!buildMessageBody.isPresent()) {
                LOGGER.error("Could not parse body from message.");
                return;
            }
            MessageController messageController = (MessageController) buildMessageController.get();
            byte[] bArr = (byte[]) buildMessageBody.get();
            Iterator<TransportConsumer> it = this.transportConsumers.iterator();
            while (it.hasNext()) {
                try {
                    it.next().handleMessage(messageController, bArr);
                } catch (Exception e) {
                    LOGGER.error("Error giving message to transport consumer:", e);
                }
            }
        }

        private Optional<byte[]> buildMessageBody(Message message) {
            if (message instanceof BytesMessage) {
                try {
                    long bodyLength = ((BytesMessage) message).getBodyLength();
                    Preconditions.checkArgument(bodyLength <= 2147483647L);
                    byte[] bArr = new byte[(int) bodyLength];
                    ((BytesMessage) message).readBytes(bArr);
                    return Optional.of(bArr);
                } catch (JMSException e) {
                    LOGGER.error("JMSException raised reading body from message.", e);
                }
            }
            return Optional.absent();
        }

        private Optional<MessageController> buildMessageController(Message message) {
            try {
                Optional fromJson = MessageUtils.fromJson(ServiceProto.ContentType.getDefaultInstance(), message.getStringProperty("content-type"));
                Optional fromJson2 = MessageUtils.fromJson(ServiceProto.ServiceRef.getDefaultInstance(), message.getStringProperty("app-id"));
                Optional fromJson3 = MessageUtils.fromJson(ServiceProto.ServiceRef.getDefaultInstance(), message.getStringProperty("reply-to"));
                Optional absent = Optional.absent();
                Optional absent2 = Optional.absent();
                String stringProperty = message.getStringProperty("message-id");
                if (stringProperty != null && !stringProperty.isEmpty()) {
                    absent = Optional.of(ActiveMqTransport.B16.decode(stringProperty));
                }
                String stringProperty2 = message.getStringProperty("correlation-id");
                if (stringProperty2 != null && !stringProperty2.isEmpty()) {
                    absent2 = Optional.of(ActiveMqTransport.B16.decode(stringProperty2));
                }
                Optional absent3 = Optional.absent();
                if (message.propertyExists("expires")) {
                    absent3 = Optional.of(new DateTime(message.getLongProperty("expires")));
                }
                return Optional.of(new DefaultMessageController((ServiceProto.ServiceRef) fromJson3.get(), (ServiceProto.ServiceRef) fromJson2.get(), (ServiceProto.ContentType) fromJson.get(), absent, absent2, absent3));
            } catch (JMSException e) {
                LOGGER.error("JMSException raised processing message headers.", e);
                return Optional.absent();
            }
        }
    }

    public ActiveMqTransport(ConnectionFactory connectionFactory, ActiveMqTransportRef activeMqTransportRef) throws JMSException {
        this.activeMqTransportRef = activeMqTransportRef;
        this.connection = connectionFactory.createConnection();
        this.session = this.connection.createSession(false, 1);
        this.session.createConsumer(this.session.createQueue(activeMqTransportRef.getQueue())).setMessageListener(new ActiveMqTransportListener(this.consumers));
    }

    public void addConsumer(TransportConsumer transportConsumer) {
        this.consumers.add(transportConsumer);
    }

    public Ref getRef() {
        return this.activeMqTransportRef.getRef();
    }

    @PostConstruct
    public void init() throws JMSException {
        this.connection.start();
    }

    public void destroy() throws Exception {
        this.connection.stop();
        this.session.close();
        this.connection.close();
    }
}
