package org.apache.qpid.ra.inflow;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.Topic;
import javax.resource.ResourceException;
import javax.resource.spi.endpoint.MessageEndpoint;
import javax.resource.spi.endpoint.MessageEndpointFactory;
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAResource;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.XAConnectionImpl;
import org.apache.qpid.ra.QpidResourceAdapter;
import org.apache.qpid.ra.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/qpid/ra/inflow/QpidMessageHandler.class */
public class QpidMessageHandler extends QpidExceptionHandler implements MessageListener {
    private static final Logger _log = LoggerFactory.getLogger(QpidMessageHandler.class);
    private MessageConsumer _consumer;
    private MessageEndpoint _endpoint;
    private Session _session;
    private final TransactionManager _tm;

    public QpidMessageHandler(QpidResourceAdapter qpidResourceAdapter, QpidActivationSpec qpidActivationSpec, MessageEndpointFactory messageEndpointFactory, TransactionManager transactionManager, Connection connection) throws ResourceException {
        super(qpidResourceAdapter, qpidActivationSpec, messageEndpointFactory);
        this._tm = transactionManager;
        this._connection = connection;
    }

    public QpidMessageHandler(QpidResourceAdapter qpidResourceAdapter, QpidActivationSpec qpidActivationSpec, MessageEndpointFactory messageEndpointFactory, TransactionManager transactionManager) throws ResourceException {
        super(qpidResourceAdapter, qpidActivationSpec, messageEndpointFactory);
        this._tm = transactionManager;
    }

    @Override // org.apache.qpid.ra.inflow.QpidExceptionHandler
    public void setup() throws Exception {
        if (_log.isTraceEnabled()) {
            _log.trace("setup()");
        }
        setupCF();
        setupDestination();
        String messageSelector = this._spec.getMessageSelector();
        if (this._spec.isUseConnectionPerHandler().booleanValue()) {
            setupConnection();
            this._connection.setExceptionListener(this);
        }
        if (isXA()) {
            this._session = this._ra.createXASession((XAConnectionImpl) this._connection);
        } else {
            this._session = this._ra.createSession((AMQConnection) this._connection, this._spec.getAcknowledgeModeInt(), this._spec.isUseLocalTx().booleanValue(), this._spec.getPrefetchLow(), this._spec.getPrefetchHigh());
        }
        if (this._isTopic) {
            Topic topic = this._destination;
            String subscriptionName = this._spec.getSubscriptionName();
            if (this._spec.isSubscriptionDurable()) {
                this._consumer = this._session.createDurableSubscriber(topic, subscriptionName, messageSelector, false);
            } else {
                this._consumer = this._session.createConsumer(topic, messageSelector);
            }
        } else {
            this._consumer = this._session.createConsumer(this._destination, messageSelector);
        }
        if (isXA()) {
            this._endpoint = this._endpointFactory.createEndpoint(this._session.getXAResource());
        } else {
            this._endpoint = this._endpointFactory.createEndpoint((XAResource) null);
        }
        this._consumer.setMessageListener(this);
        this._connection.start();
        this._activated.set(true);
    }

    @Override // org.apache.qpid.ra.inflow.QpidExceptionHandler
    public void teardown() {
        if (_log.isTraceEnabled()) {
            _log.trace("teardown()");
        }
        super.teardown();
        try {
            if (this._endpoint != null) {
                this._endpoint.release();
                this._endpoint = null;
            }
        } catch (Throwable th) {
            _log.debug("Error releasing endpoint " + this._endpoint, th);
        }
    }

    public void onMessage(Message message) {
        if (_log.isTraceEnabled()) {
            _log.trace("onMessage(" + Util.asString(message) + ")");
        }
        boolean z = false;
        try {
            if (this._spec.getTransactionTimeout().intValue() > 0 && this._tm != null) {
                this._tm.setTransactionTimeout(this._spec.getTransactionTimeout().intValue());
            }
            this._endpoint.beforeDelivery(QpidActivation.ONMESSAGE);
            z = true;
            if (isXA()) {
                message.acknowledge();
            }
            this._endpoint.onMessage(message);
            if (!isXA() || this._tm.getTransaction() == null) {
                message.acknowledge();
            } else {
                int status = this._tm.getStatus();
                if (status == 1 || status == 9 || status == 4) {
                    this._session.recover();
                }
            }
            try {
                this._endpoint.afterDelivery();
                if (!isXA() && this._spec.isUseLocalTx().booleanValue()) {
                    this._session.commit();
                }
            } catch (ResourceException e) {
                _log.warn("Unable to call after delivery", e);
            }
        } catch (Throwable th) {
            _log.error("Failed to deliver message", th);
            if (z) {
                try {
                    this._endpoint.afterDelivery();
                } catch (ResourceException e2) {
                    _log.warn("Unable to call after delivery", th);
                }
            }
            if (isXA() || !this._spec.isUseLocalTx().booleanValue()) {
                try {
                    this._session.recover();
                    return;
                } catch (JMSException e3) {
                    _log.warn("Unable to recover XA transaction", e3);
                    return;
                }
            }
            try {
                this._session.rollback();
            } catch (JMSException e4) {
                _log.warn("Unable to roll local transaction back", e4);
            }
        }
    }

    @Override // org.apache.qpid.ra.inflow.QpidExceptionHandler
    public void start() throws Exception {
        this._deliveryActive.set(true);
        setup();
    }

    @Override // org.apache.qpid.ra.inflow.QpidExceptionHandler
    public void stop() {
        this._deliveryActive.set(false);
        teardown();
    }
}
