package org.springframework.amqp.rabbit.listener;

import ch.qos.logback.classic.turbo.ReconfigureOnChangeFilter;
import com.rabbitmq.client.Channel;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.aopalliance.aop.Advice;
import org.springframework.amqp.AmqpConnectException;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.AmqpIllegalStateException;
import org.springframework.amqp.ImmediateAcknowledgeAmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils;
import org.springframework.amqp.rabbit.connection.ConsumerChannelRegistry;
import org.springframework.amqp.rabbit.connection.RabbitResourceHolder;
import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter;
import org.springframework.amqp.rabbit.support.MessagePropertiesConverter;
import org.springframework.aop.Pointcut;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.aop.support.DefaultPointcutAdvisor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.jmx.export.annotation.ManagedMetric;
import org.springframework.jmx.support.MetricType;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.interceptor.DefaultTransactionAttribute;
import org.springframework.transaction.interceptor.TransactionAttribute;
import org.springframework.transaction.support.TransactionCallback;
import org.springframework.transaction.support.TransactionTemplate;
import org.springframework.util.Assert;

/* loaded from: input_file:lib/spring-rabbit-1.2.0.RELEASE.jar:org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.class */
public class SimpleMessageListenerContainer extends AbstractMessageListenerContainer {
    public static final long DEFAULT_RECEIVE_TIMEOUT = 1000;
    public static final int DEFAULT_PREFETCH_COUNT = 1;
    public static final long DEFAULT_SHUTDOWN_TIMEOUT = 5000;
    public static final long DEFAULT_RECOVERY_INTERVAL = 5000;
    private volatile int prefetchCount;
    private volatile int txSize;
    private volatile Executor taskExecutor;
    private volatile int concurrentConsumers;
    private long receiveTimeout;
    private long shutdownTimeout;
    private long recoveryInterval;
    private Set<BlockingQueueConsumer> consumers;
    private final Object consumersMonitor;
    private PlatformTransactionManager transactionManager;
    private TransactionAttribute transactionAttribute;
    private volatile Advice[] adviceChain;
    private final ActiveObjectCounter<BlockingQueueConsumer> cancellationLock;
    private volatile MessagePropertiesConverter messagePropertiesConverter;
    private volatile boolean defaultRequeueRejected;
    private final ContainerDelegate delegate;
    private ContainerDelegate proxy;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:lib/spring-rabbit-1.2.0.RELEASE.jar:org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.class */
    public class AsyncMessageProcessingConsumer implements Runnable {
        private final BlockingQueueConsumer consumer;
        private final CountDownLatch start = new CountDownLatch(1);
        private volatile FatalListenerStartupException startupException;

        public AsyncMessageProcessingConsumer(BlockingQueueConsumer blockingQueueConsumer) {
            this.consumer = blockingQueueConsumer;
        }

        public FatalListenerStartupException getStartupException() throws TimeoutException, InterruptedException {
            this.start.await(ReconfigureOnChangeFilter.DEFAULT_REFRESH_PERIOD, TimeUnit.MILLISECONDS);
            return this.startupException;
        }

        @Override // java.lang.Runnable
        public void run() {
            boolean z = false;
            try {
            } catch (FatalListenerExecutionException e) {
                SimpleMessageListenerContainer.this.logger.error("Consumer received fatal exception during processing", e);
                z = true;
                if (SimpleMessageListenerContainer.this.transactionManager != null) {
                    ConsumerChannelRegistry.unRegisterConsumerChannel();
                }
            }
            try {
                try {
                    try {
                        try {
                            try {
                                this.consumer.start();
                                this.start.countDown();
                                if (SimpleMessageListenerContainer.this.transactionManager != null) {
                                    ConsumerChannelRegistry.registerConsumerChannel(this.consumer.getChannel(), SimpleMessageListenerContainer.this.getConnectionFactory());
                                }
                                boolean z2 = false;
                                while (true) {
                                    if (!SimpleMessageListenerContainer.this.isActive() && !z2) {
                                        break;
                                    } else {
                                        try {
                                            z2 = SimpleMessageListenerContainer.this.receiveAndExecute(this.consumer) && !SimpleMessageListenerContainer.this.isChannelTransacted();
                                        } catch (ListenerExecutionFailedException e2) {
                                        }
                                    }
                                }
                                if (SimpleMessageListenerContainer.this.transactionManager != null) {
                                    ConsumerChannelRegistry.unRegisterConsumerChannel();
                                }
                            } catch (FatalListenerStartupException e3) {
                                SimpleMessageListenerContainer.this.logger.error("Consumer received fatal exception on startup", e3);
                                this.startupException = e3;
                                z = true;
                                if (SimpleMessageListenerContainer.this.transactionManager != null) {
                                    ConsumerChannelRegistry.unRegisterConsumerChannel();
                                }
                            }
                        } catch (InterruptedException e4) {
                            SimpleMessageListenerContainer.this.logger.debug("Consumer thread interrupted, processing stopped.");
                            Thread.currentThread().interrupt();
                            z = true;
                            if (SimpleMessageListenerContainer.this.transactionManager != null) {
                                ConsumerChannelRegistry.unRegisterConsumerChannel();
                            }
                        }
                    } catch (Throwable th) {
                        if (SimpleMessageListenerContainer.this.logger.isDebugEnabled() || !(th instanceof AmqpConnectException)) {
                            SimpleMessageListenerContainer.this.logger.warn("Consumer raised exception, processing can restart if the connection factory supports it", th);
                        } else {
                            SimpleMessageListenerContainer.this.logger.warn("Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: " + th);
                        }
                        if (SimpleMessageListenerContainer.this.transactionManager != null) {
                            ConsumerChannelRegistry.unRegisterConsumerChannel();
                        }
                    }
                    this.start.countDown();
                    if (SimpleMessageListenerContainer.this.isActive() && !z) {
                        SimpleMessageListenerContainer.this.logger.info("Restarting " + this.consumer);
                        SimpleMessageListenerContainer.this.restart(this.consumer);
                        return;
                    }
                    SimpleMessageListenerContainer.this.logger.debug("Cancelling " + this.consumer);
                    try {
                        this.consumer.stop();
                    } catch (AmqpException e5) {
                        SimpleMessageListenerContainer.this.logger.info("Could not cancel message consumer", e5);
                    }
                    if (z) {
                        SimpleMessageListenerContainer.this.logger.info("Stopping container from aborted consumer");
                        SimpleMessageListenerContainer.this.stop();
                    }
                } catch (Throwable th2) {
                    if (SimpleMessageListenerContainer.this.transactionManager != null) {
                        ConsumerChannelRegistry.unRegisterConsumerChannel();
                    }
                    throw th2;
                }
            } catch (FatalListenerStartupException e6) {
                throw e6;
            } catch (Throwable th3) {
                this.start.countDown();
                SimpleMessageListenerContainer.this.handleStartupFailure(th3);
                throw th3;
            }
        }
    }

    /* loaded from: input_file:lib/spring-rabbit-1.2.0.RELEASE.jar:org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer$ContainerDelegate.class */
    public interface ContainerDelegate {
        void invokeListener(Channel channel, Message message) throws Exception;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:lib/spring-rabbit-1.2.0.RELEASE.jar:org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer$WrappedTransactionException.class */
    public static class WrappedTransactionException extends RuntimeException {
        public WrappedTransactionException(Throwable th) {
            super(th);
        }
    }

    public SimpleMessageListenerContainer() {
        this.prefetchCount = 1;
        this.txSize = 1;
        this.taskExecutor = new SimpleAsyncTaskExecutor();
        this.concurrentConsumers = 1;
        this.receiveTimeout = 1000L;
        this.shutdownTimeout = 5000L;
        this.recoveryInterval = 5000L;
        this.consumersMonitor = new Object();
        this.transactionAttribute = new DefaultTransactionAttribute();
        this.adviceChain = new Advice[0];
        this.cancellationLock = new ActiveObjectCounter<>();
        this.messagePropertiesConverter = new DefaultMessagePropertiesConverter();
        this.defaultRequeueRejected = true;
        this.delegate = new ContainerDelegate() { // from class: org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.1
            @Override // org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.ContainerDelegate
            public void invokeListener(Channel channel, Message message) throws Exception {
                SimpleMessageListenerContainer.super.invokeListener(channel, message);
            }
        };
        this.proxy = this.delegate;
    }

    public SimpleMessageListenerContainer(ConnectionFactory connectionFactory) {
        this.prefetchCount = 1;
        this.txSize = 1;
        this.taskExecutor = new SimpleAsyncTaskExecutor();
        this.concurrentConsumers = 1;
        this.receiveTimeout = 1000L;
        this.shutdownTimeout = 5000L;
        this.recoveryInterval = 5000L;
        this.consumersMonitor = new Object();
        this.transactionAttribute = new DefaultTransactionAttribute();
        this.adviceChain = new Advice[0];
        this.cancellationLock = new ActiveObjectCounter<>();
        this.messagePropertiesConverter = new DefaultMessagePropertiesConverter();
        this.defaultRequeueRejected = true;
        this.delegate = new ContainerDelegate() { // from class: org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.1
            @Override // org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.ContainerDelegate
            public void invokeListener(Channel channel, Message message) throws Exception {
                SimpleMessageListenerContainer.super.invokeListener(channel, message);
            }
        };
        this.proxy = this.delegate;
        setConnectionFactory(connectionFactory);
    }

    public void setAdviceChain(Advice[] adviceArr) {
        this.adviceChain = adviceArr;
    }

    public void setRecoveryInterval(long j) {
        this.recoveryInterval = j;
    }

    public void setConcurrentConsumers(int i) {
        Assert.isTrue(i > 0, "'concurrentConsumers' value must be at least 1 (one)");
        this.concurrentConsumers = i;
    }

    public void setReceiveTimeout(long j) {
        this.receiveTimeout = j;
    }

    public void setShutdownTimeout(long j) {
        this.shutdownTimeout = j;
    }

    public void setTaskExecutor(Executor executor) {
        Assert.notNull(executor, "taskExecutor must not be null");
        this.taskExecutor = executor;
    }

    public void setPrefetchCount(int i) {
        this.prefetchCount = i;
    }

    public void setTxSize(int i) {
        this.txSize = i;
    }

    public void setTransactionManager(PlatformTransactionManager platformTransactionManager) {
        this.transactionManager = platformTransactionManager;
    }

    public void setTransactionAttribute(TransactionAttribute transactionAttribute) {
        this.transactionAttribute = transactionAttribute;
    }

    public void setMessagePropertiesConverter(MessagePropertiesConverter messagePropertiesConverter) {
        Assert.notNull(messagePropertiesConverter, "messagePropertiesConverter must not be null");
        this.messagePropertiesConverter = messagePropertiesConverter;
    }

    public void setDefaultRequeueRejected(boolean z) {
        this.defaultRequeueRejected = z;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer
    public void validateConfiguration() {
        super.validateConfiguration();
        Assert.state(!getAcknowledgeMode().isAutoAck() || this.transactionManager == null, "The acknowledgeMode is NONE (autoack in Rabbit terms) which is not consistent with having an external transaction manager. Either use a different AcknowledgeMode or make sure the transactionManager is null.");
        if (getConnectionFactory() instanceof CachingConnectionFactory) {
            CachingConnectionFactory cachingConnectionFactory = (CachingConnectionFactory) getConnectionFactory();
            if (cachingConnectionFactory.getChannelCacheSize() < this.concurrentConsumers) {
                cachingConnectionFactory.setChannelCacheSize(this.concurrentConsumers);
                this.logger.warn("CachingConnectionFactory's channelCacheSize can not be less than the number of concurrentConsumers so it was reset to match: " + this.concurrentConsumers);
            }
        }
    }

    private void initializeProxy() {
        if (this.adviceChain.length == 0) {
            return;
        }
        ProxyFactory proxyFactory = new ProxyFactory();
        for (Advice advice : getAdviceChain()) {
            proxyFactory.addAdvisor(new DefaultPointcutAdvisor(Pointcut.TRUE, advice));
        }
        proxyFactory.setProxyTargetClass(false);
        proxyFactory.addInterface(ContainerDelegate.class);
        proxyFactory.setTarget(this.delegate);
        this.proxy = (ContainerDelegate) proxyFactory.getProxy();
    }

    protected final boolean sharedConnectionEnabled() {
        return true;
    }

    @Override // org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer
    protected void doInitialize() throws Exception {
        if (!isExposeListenerChannel() && this.transactionManager != null) {
            this.logger.warn("exposeListenerChannel=false is ignored when using a TransactionManager");
        }
        initializeProxy();
    }

    @ManagedMetric(metricType = MetricType.GAUGE)
    public int getActiveConsumerCount() {
        return this.cancellationLock.getCount();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer
    public void doStart() throws Exception {
        super.doStart();
        synchronized (this.consumersMonitor) {
            int initializeConsumers = initializeConsumers();
            if (this.consumers == null) {
                if (this.logger.isInfoEnabled()) {
                    this.logger.info("Consumers were initialized and then cleared (presumably the container was stopped concurrently)");
                }
                return;
            }
            if (initializeConsumers <= 0) {
                if (this.logger.isInfoEnabled()) {
                    this.logger.info("Consumers are already running");
                }
                return;
            }
            HashSet hashSet = new HashSet();
            Iterator<BlockingQueueConsumer> it = this.consumers.iterator();
            while (it.hasNext()) {
                AsyncMessageProcessingConsumer asyncMessageProcessingConsumer = new AsyncMessageProcessingConsumer(it.next());
                hashSet.add(asyncMessageProcessingConsumer);
                this.taskExecutor.execute(asyncMessageProcessingConsumer);
            }
            Iterator it2 = hashSet.iterator();
            while (it2.hasNext()) {
                FatalListenerStartupException startupException = ((AsyncMessageProcessingConsumer) it2.next()).getStartupException();
                if (startupException != null) {
                    throw new AmqpIllegalStateException("Fatal exception on listener startup", startupException);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer
    public void doStop() {
        shutdown();
        super.doStop();
    }

    @Override // org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer
    protected void doShutdown() {
        if (isRunning()) {
            try {
                this.logger.info("Waiting for workers to finish.");
                if (this.cancellationLock.await(Long.valueOf(this.shutdownTimeout), TimeUnit.MILLISECONDS)) {
                    this.logger.info("Successfully waited for workers to finish.");
                } else {
                    this.logger.info("Workers not finished.  Forcing connections to close.");
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                this.logger.warn("Interrupted waiting for workers.  Continuing with shutdown.");
            }
            synchronized (this.consumersMonitor) {
                this.consumers = null;
            }
        }
    }

    protected int initializeConsumers() {
        int i = 0;
        synchronized (this.consumersMonitor) {
            if (this.consumers == null) {
                this.cancellationLock.reset();
                this.consumers = new HashSet(this.concurrentConsumers);
                for (int i2 = 0; i2 < this.concurrentConsumers; i2++) {
                    this.consumers.add(createBlockingQueueConsumer());
                    i++;
                }
            }
        }
        return i;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer
    public boolean isChannelLocallyTransacted(Channel channel) {
        return super.isChannelLocallyTransacted(channel) && this.transactionManager == null;
    }

    protected BlockingQueueConsumer createBlockingQueueConsumer() {
        String[] requiredQueueNames = getRequiredQueueNames();
        return new BlockingQueueConsumer(getConnectionFactory(), this.messagePropertiesConverter, this.cancellationLock, getAcknowledgeMode(), isChannelTransacted(), this.prefetchCount > this.txSize ? this.prefetchCount : this.txSize, this.defaultRequeueRejected, requiredQueueNames);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void restart(BlockingQueueConsumer blockingQueueConsumer) {
        synchronized (this.consumersMonitor) {
            if (this.consumers != null) {
                try {
                    blockingQueueConsumer.stop();
                    this.cancellationLock.release(blockingQueueConsumer);
                    this.consumers.remove(blockingQueueConsumer);
                    BlockingQueueConsumer createBlockingQueueConsumer = createBlockingQueueConsumer();
                    this.consumers.add(createBlockingQueueConsumer);
                    this.taskExecutor.execute(new AsyncMessageProcessingConsumer(createBlockingQueueConsumer));
                } catch (RuntimeException e) {
                    this.logger.warn("Consumer failed irretrievably on restart. " + e.getClass() + ": " + e.getMessage());
                    throw e;
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean receiveAndExecute(final BlockingQueueConsumer blockingQueueConsumer) throws Throwable {
        if (this.transactionManager == null) {
            return doReceiveAndExecute(blockingQueueConsumer);
        }
        try {
            return ((Boolean) new TransactionTemplate(this.transactionManager, this.transactionAttribute).execute(new TransactionCallback<Boolean>() { // from class: org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.2
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // org.springframework.transaction.support.TransactionCallback
                public Boolean doInTransaction(TransactionStatus transactionStatus) {
                    ConnectionFactoryUtils.bindResourceToTransaction(new RabbitResourceHolder(blockingQueueConsumer.getChannel(), false), SimpleMessageListenerContainer.this.getConnectionFactory(), true);
                    try {
                        return Boolean.valueOf(SimpleMessageListenerContainer.this.doReceiveAndExecute(blockingQueueConsumer));
                    } catch (RuntimeException e) {
                        throw e;
                    } catch (Throwable th) {
                        throw new WrappedTransactionException(th);
                    }
                }
            })).booleanValue();
        } catch (WrappedTransactionException e) {
            throw e.getCause();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean doReceiveAndExecute(BlockingQueueConsumer blockingQueueConsumer) throws Throwable {
        Channel channel = blockingQueueConsumer.getChannel();
        for (int i = 0; i < this.txSize; i++) {
            this.logger.trace("Waiting for message from consumer.");
            Message nextMessage = blockingQueueConsumer.nextMessage(this.receiveTimeout);
            if (nextMessage == null) {
                break;
            }
            try {
                executeListener(channel, nextMessage);
            } catch (ImmediateAcknowledgeAmqpException e) {
            } catch (Throwable th) {
                blockingQueueConsumer.rollbackOnExceptionIfNecessary(th);
                throw th;
            }
        }
        return blockingQueueConsumer.commitIfNecessary(isChannelLocallyTransacted(channel));
    }

    private Advice[] getAdviceChain() {
        return this.adviceChain;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer
    public void invokeListener(Channel channel, Message message) throws Exception {
        this.proxy.invokeListener(channel, message);
    }

    protected void handleStartupFailure(Throwable th) throws Exception {
        try {
            long currentTimeMillis = System.currentTimeMillis() + this.recoveryInterval;
            while (isActive() && System.currentTimeMillis() < currentTimeMillis) {
                Thread.sleep(200L);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IllegalStateException("Unrecoverable interruption on consumer restart");
        }
    }
}
