package com.solace.messaging.receiver;

import com.solace.messaging.MessagingService;
import com.solace.messaging.PubSubPlusClientException;
import com.solace.messaging.config.SolaceConstants;
import com.solace.messaging.config.SolaceProperties;
import com.solace.messaging.receiver.AsyncReceiverSubscriptions;
import com.solace.messaging.receiver.InboundMessage;
import com.solace.messaging.receiver.MessageReceiver;
import com.solace.messaging.receiver.ReceiverBuffers;
import com.solace.messaging.resources.CachedTopicSubscription;
import com.solace.messaging.resources.Queue;
import com.solace.messaging.resources.TopicSubscription;
import com.solace.messaging.util.CompletionListener;
import com.solace.messaging.util.LifecycleControl;
import com.solace.messaging.util.Manageable;
import com.solace.messaging.util.ManageableReceiver;
import com.solace.messaging.util.TypedProperties;
import com.solace.messaging.util.async.ExecutableMessageHandler;
import com.solace.messaging.util.async.ExtendedCompletableFuture;
import com.solace.messaging.util.async.ThreadFactories;
import com.solace.messaging.util.internal.ClientSession;
import com.solace.messaging.util.internal.Internal;
import com.solace.messaging.util.internal.MessageReceiptFailureNotificationDispatcher;
import com.solace.messaging.util.internal.MessagingServiceInternalView;
import com.solace.messaging.util.internal.ServiceEventImpl;
import com.solace.messaging.util.internal.Task;
import com.solace.messaging.util.internal.TerminationEventImpl;
import com.solace.messaging.util.internal.TerminationNotificationDispatcher;
import com.solace.messaging.util.internal.Validation;
import com.solacesystems.common.util.DestinationUtil;
import com.solacesystems.jcsmp.AccessDeniedException;
import com.solacesystems.jcsmp.CapabilityType;
import com.solacesystems.jcsmp.ConsumerFlowProperties;
import com.solacesystems.jcsmp.EndpointProperties;
import com.solacesystems.jcsmp.FlowEvent;
import com.solacesystems.jcsmp.FlowEventArgs;
import com.solacesystems.jcsmp.FlowEventHandler;
import com.solacesystems.jcsmp.FlowReceiver;
import com.solacesystems.jcsmp.InvalidPropertiesException;
import com.solacesystems.jcsmp.JCSMPErrorResponseException;
import com.solacesystems.jcsmp.JCSMPErrorResponseSubcodeEx;
import com.solacesystems.jcsmp.JCSMPException;
import com.solacesystems.jcsmp.JCSMPFactory;
import com.solacesystems.jcsmp.JCSMPInterruptedException;
import com.solacesystems.jcsmp.PropertyMismatchException;
import com.solacesystems.jcsmp.Subscription;
import com.solacesystems.jcsmp.Topic;
import com.solacesystems.jcsmp.XMLMessageListener;
import com.solacesystems.jcsmp.impl.QueueImpl;
import com.solacesystems.jcsmp.impl.ReplayStartLocationBeginningImpl;
import com.solacesystems.jcsmp.impl.ReplayStartLocationDateImpl;
import com.solacesystems.jcsmp.impl.ReplicationGroupMessageIdImpl;
import com.solacesystems.jcsmp.statistics.StatType;
import java.time.Instant;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicStampedReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.osgi.annotation.versioning.ProviderType;

@Internal
@ProviderType
/* loaded from: input_file:com/solace/messaging/receiver/PersistentMessageReceiverImpl.class */
public class PersistentMessageReceiverImpl implements PersistentMessageReceiver {
    private final SolaceQueueHolder queue;
    private final MessagingServiceInternalView serviceInternalView;
    private final TypedProperties receiverConfiguration;
    private final ConsumerFlowProperties consumerProps;
    private final ReceiverBuffers.ReceiverBuffer buffer;
    private final AsyncConsumerMessageDispatchTask asyncConsumerMessageDispatchTask;
    private final MessageReceiptFailureNotificationDispatcher messageReceiptFailureNotificationDispatcher;
    private final boolean autoAckEnabled;
    private static final AtomicLong instanceIdGenerator = new AtomicLong(0);
    private static final Log logger = LogFactory.getLog(PersistentMessageReceiverImpl.class);
    private static final Task<PersistentMessageReceiverImpl> NO_OP_TASK = persistentMessageReceiverImpl -> {
    };
    private final StopSolaceConsumerTask stopSolaceConsumerTask;
    static final int STATE_NOT_STARTED = 0;
    static final int STATE_STARTING = 1;
    static final int STATE_STARTED = 2;
    static final int STATE_TERMINATING = 3;
    static final int STATE_TERMINATED = 4;
    private final List<Task<PersistentMessageReceiverImpl>> preStartTasks = new CopyOnWriteArrayList();
    private final List<Task<PersistentMessageReceiverImpl>> postStartAsyncTasks = new CopyOnWriteArrayList();
    private final List<TopicSubscription> appliedTopicSubscriptions = new CopyOnWriteArrayList();
    private final AtomicReference<ExecutableMessageHandler> messageHandlerRef = new AtomicReference<>();
    private final ReentrantLock messageHandlersLock = new ReentrantLock();
    private final ReentrantLock subscriptionsLock = new ReentrantLock();
    private final Condition notEmpty = this.messageHandlersLock.newCondition();
    private volatile FlowReceiver solaceReceiver = null;
    final AtomicStampedReference<CompletableFuture> stateHolder = new AtomicStampedReference<>(null, STATE_NOT_STARTED);
    private final long id = instanceIdGenerator.incrementAndGet();
    private final String instanceName = "PersistentMessageReceiver@" + this.id;
    private final TerminationNotificationDispatcher terminationNotificationDispatcher = new TerminationNotificationDispatcher();
    private final FlowControlHandler flowControlHandler = new FlowControlHandler();
    private final ExecutorService defaultReceiverExecutorService = Executors.newSingleThreadExecutor(new ThreadFactories.NamedDaemonThreadFactory(this.instanceName + "-message-dispatcher"));
    private final ExecutorService asyncSubscriptionsExecutorService = Executors.newSingleThreadExecutor(new ThreadFactories.NamedDaemonThreadFactory(this.instanceName + "-subscription-dispatcher"));
    private final ManageableReceiver.PersistentReceiverInfo receiverInfo = new PersistentReceiverInfoImpl();
    private final MessagingService.ServiceInterruptionListener serviceInterruptionListener = new MessagingService.ServiceInterruptionListener() { // from class: com.solace.messaging.receiver.PersistentMessageReceiverImpl.1
        @Override // com.solace.messaging.MessagingService.ServiceInterruptionListener
        public void onServiceInterrupted(MessagingService.ServiceEvent serviceEvent) {
            if (PersistentMessageReceiverImpl.logger.isWarnEnabled()) {
                PersistentMessageReceiverImpl.logger.warn(PersistentMessageReceiverImpl.this.instanceName + " is shutting down due to service interruption");
            }
            PersistentMessageReceiverImpl.this.stateHolder.set(null, PersistentMessageReceiverImpl.STATE_TERMINATED);
            PersistentMessageReceiverImpl.this.terminationNotificationDispatcher.onTermination(new TerminationEventImpl(serviceEvent.getTimestamp(), serviceEvent.getMessage(), serviceEvent.getCause()));
            PersistentMessageReceiverImpl.this.terminateOnUnsolicitedInterruption();
        }
    };
    private final ClientSession.ClientSessionStateListener closedSessionListener = new ClientSession.ClientSessionStateListener() { // from class: com.solace.messaging.receiver.PersistentMessageReceiverImpl.2
        @Override // com.solace.messaging.util.internal.ClientSession.ClientSessionStateListener
        public void onClientSessionState(ClientSession.ClientSessionStateChangeEvent clientSessionStateChangeEvent) {
            if (PersistentMessageReceiverImpl.logger.isWarnEnabled()) {
                PersistentMessageReceiverImpl.logger.warn("Shutting down receiver due to service closure");
            }
            PersistentMessageReceiverImpl.this.stateHolder.set(null, PersistentMessageReceiverImpl.STATE_TERMINATED);
            PersistentMessageReceiverImpl.this.terminationNotificationDispatcher.onTermination(new TerminationEventImpl(clientSessionStateChangeEvent.getTimestamp(), clientSessionStateChangeEvent.getMessage(), clientSessionStateChangeEvent.getCause()));
            PersistentMessageReceiverImpl.this.terminateOnUnsolicitedInterruption();
        }
    };
    private final MessagingService.ReconnectionListener reconnectionListener = new MessagingService.ReconnectionListener() { // from class: com.solace.messaging.receiver.PersistentMessageReceiverImpl.3
        @Override // com.solace.messaging.MessagingService.ReconnectionListener
        public void onReconnected(MessagingService.ServiceEvent serviceEvent) {
            if (PersistentMessageReceiverImpl.logger.isDebugEnabled()) {
                PersistentMessageReceiverImpl.logger.debug(PersistentMessageReceiverImpl.this.instanceName + " is reconnected.");
            }
        }
    };
    private final MessagingService.ReconnectionAttemptListener reconnectionAttemptListener = new MessagingService.ReconnectionAttemptListener() { // from class: com.solace.messaging.receiver.PersistentMessageReceiverImpl.4
        @Override // com.solace.messaging.MessagingService.ReconnectionAttemptListener
        public void onReconnecting(MessagingService.ServiceEvent serviceEvent) {
            if (PersistentMessageReceiverImpl.logger.isDebugEnabled()) {
                PersistentMessageReceiverImpl.logger.debug(PersistentMessageReceiverImpl.this.instanceName + " is reconnecting.");
            }
        }
    };

    /* JADX INFO: Access modifiers changed from: private */
    @Internal
    @ProviderType
    /* loaded from: input_file:com/solace/messaging/receiver/PersistentMessageReceiverImpl$AsyncConsumerMessageDispatchTask.class */
    public static class AsyncConsumerMessageDispatchTask implements Callable<Void> {
        private final ReceiverBuffers.ReceiverBuffer buffer;
        private final PersistentMessageReceiverImpl receiver;
        private final ReentrantLock messageHandlersLock;
        private final Condition notEmpty;

        private AsyncConsumerMessageDispatchTask(ReceiverBuffers.ReceiverBuffer receiverBuffer, PersistentMessageReceiverImpl persistentMessageReceiverImpl, ReentrantLock reentrantLock, Condition condition) {
            this.buffer = receiverBuffer;
            this.receiver = persistentMessageReceiverImpl;
            this.messageHandlersLock = reentrantLock;
            this.notEmpty = condition;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Void call() throws Exception {
            MessageReceiver.MessageHandler messageHandler;
            while (true) {
                if (!this.receiver.isRunning() && !this.receiver.isTerminating()) {
                    return null;
                }
                try {
                    messageHandler = (MessageReceiver.MessageHandler) this.receiver.messageHandlerRef.get();
                } catch (Exception e) {
                    if (Thread.interrupted()) {
                        if (PersistentMessageReceiverImpl.logger.isInfoEnabled()) {
                            PersistentMessageReceiverImpl.logger.info(this.receiver.instanceName + " waiting for a message was interrupted", e);
                        }
                    } else if (PersistentMessageReceiverImpl.logger.isErrorEnabled()) {
                        PersistentMessageReceiverImpl.logger.error(this.receiver.instanceName + " Problem during a message reception", e);
                    }
                }
                if (messageHandler != null) {
                    ReceiverBuffers.Receivable consume = this.buffer.consume();
                    if (consume != null) {
                        InboundMessage message = consume.getMessage();
                        if (message != null) {
                            boolean z = PersistentMessageReceiverImpl.STATE_NOT_STARTED;
                            try {
                                messageHandler.onMessage(message);
                            } catch (Exception e2) {
                                z = PersistentMessageReceiverImpl.STATE_STARTING;
                                if (PersistentMessageReceiverImpl.logger.isErrorEnabled()) {
                                    PersistentMessageReceiverImpl.logger.error("Application code throw an unhandled exception by message processing: " + message, e2);
                                }
                            }
                            if (this.receiver.autoAckEnabled && !z) {
                                ((MessageReceiver.InboundMessageImpl) message).doAck();
                            }
                        } else if (PersistentMessageReceiverImpl.logger.isInfoEnabled()) {
                            PersistentMessageReceiverImpl.logger.info(this.receiver.instanceName + " buffer returned null message");
                        }
                    } else if (PersistentMessageReceiverImpl.logger.isInfoEnabled()) {
                        PersistentMessageReceiverImpl.logger.info(this.receiver.instanceName + " buffer returned no message");
                    }
                } else {
                    ReentrantLock reentrantLock = this.messageHandlersLock;
                    try {
                        reentrantLock.lockInterruptibly();
                    } catch (InterruptedException e3) {
                        reentrantLock.unlock();
                    } catch (Throwable th) {
                        reentrantLock.unlock();
                        throw th;
                        break;
                    }
                    if (this.receiver.messageHandlerRef.get() != null) {
                        reentrantLock.unlock();
                    } else {
                        this.notEmpty.await();
                        reentrantLock.unlock();
                    }
                }
            }
        }
    }

    @Internal
    @ProviderType
    /* loaded from: input_file:com/solace/messaging/receiver/PersistentMessageReceiverImpl$FlowControlHandler.class */
    class FlowControlHandler implements ReceiverBuffers.CapacityChangeListener {
        private final ExecutorService defaultFlowControlExecutorService;
        private final FlowControlResumeTask resumeTask;
        private final FlowControlPauseTask pauseTask;

        @ProviderType
        /* loaded from: input_file:com/solace/messaging/receiver/PersistentMessageReceiverImpl$FlowControlHandler$FlowControlPauseTask.class */
        private final class FlowControlPauseTask implements Callable<Void> {
            private final PersistentMessageReceiverImpl messageReceiverParent;

            FlowControlPauseTask(PersistentMessageReceiverImpl persistentMessageReceiverImpl) {
                this.messageReceiverParent = persistentMessageReceiverImpl;
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws Exception {
                FlowReceiver flowReceiver = this.messageReceiverParent.solaceReceiver;
                if (!this.messageReceiverParent.isRunning() || flowReceiver == null || flowReceiver.isClosed()) {
                    return null;
                }
                try {
                    flowReceiver.stop();
                    return null;
                } catch (Exception e) {
                    this.messageReceiverParent.messageReceiptFailureNotificationDispatcher.onException(e);
                    return null;
                }
            }
        }

        @ProviderType
        /* loaded from: input_file:com/solace/messaging/receiver/PersistentMessageReceiverImpl$FlowControlHandler$FlowControlResumeTask.class */
        private final class FlowControlResumeTask implements Callable<Void> {
            private final PersistentMessageReceiverImpl messageReceiverParent;

            FlowControlResumeTask(PersistentMessageReceiverImpl persistentMessageReceiverImpl) {
                this.messageReceiverParent = persistentMessageReceiverImpl;
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws Exception {
                FlowReceiver flowReceiver = this.messageReceiverParent.solaceReceiver;
                if (!this.messageReceiverParent.isRunning() || flowReceiver == null || flowReceiver.isClosed()) {
                    return null;
                }
                try {
                    flowReceiver.start();
                    return null;
                } catch (JCSMPException e) {
                    this.messageReceiverParent.messageReceiptFailureNotificationDispatcher.onException(e);
                    return null;
                }
            }
        }

        private FlowControlHandler() {
            this.defaultFlowControlExecutorService = Executors.newSingleThreadExecutor(new ThreadFactories.NamedDaemonThreadFactory(PersistentMessageReceiverImpl.this.instanceName + "-flow-control-dispatcher"));
            this.resumeTask = new FlowControlResumeTask(PersistentMessageReceiverImpl.this);
            this.pauseTask = new FlowControlPauseTask(PersistentMessageReceiverImpl.this);
        }

        @Override // com.solace.messaging.receiver.ReceiverBuffers.CapacityChangeListener
        public void low() {
            FlowReceiver flowReceiver = PersistentMessageReceiverImpl.this.solaceReceiver;
            if (!PersistentMessageReceiverImpl.this.isRunning() || flowReceiver == null) {
                return;
            }
            try {
                this.defaultFlowControlExecutorService.submit(this.pauseTask);
            } catch (RejectedExecutionException e) {
                if (PersistentMessageReceiverImpl.logger.isWarnEnabled()) {
                    PersistentMessageReceiverImpl.logger.warn(PersistentMessageReceiverImpl.this.instanceName + " could not schedule pause on flow control, executing on same thread");
                    try {
                        this.pauseTask.call();
                    } catch (Exception e2) {
                        if (PersistentMessageReceiverImpl.logger.isErrorEnabled()) {
                            PersistentMessageReceiverImpl.logger.error(PersistentMessageReceiverImpl.this.instanceName + " flow control 'pause' operation failed", e2);
                        }
                    }
                }
            }
        }

        @Override // com.solace.messaging.receiver.ReceiverBuffers.CapacityChangeListener
        public void normal() {
            FlowReceiver flowReceiver = PersistentMessageReceiverImpl.this.solaceReceiver;
            if (!PersistentMessageReceiverImpl.this.isRunning() || flowReceiver == null) {
                return;
            }
            try {
                this.defaultFlowControlExecutorService.submit(this.resumeTask);
            } catch (RejectedExecutionException e) {
                if (PersistentMessageReceiverImpl.logger.isWarnEnabled()) {
                    PersistentMessageReceiverImpl.logger.warn(PersistentMessageReceiverImpl.this.instanceName + " could not schedule pause on flow control, executing on same thread");
                    try {
                        this.resumeTask.call();
                    } catch (Exception e2) {
                        if (PersistentMessageReceiverImpl.logger.isErrorEnabled()) {
                            PersistentMessageReceiverImpl.logger.error(PersistentMessageReceiverImpl.this.instanceName + " flow control 'resume' operation failed", e2);
                        }
                    }
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @Internal
    @ProviderType
    /* loaded from: input_file:com/solace/messaging/receiver/PersistentMessageReceiverImpl$PersistentReceiverInfoImpl.class */
    public class PersistentReceiverInfoImpl implements ManageableReceiver.PersistentReceiverInfo {
        private final ManageableReceiver.PersistentReceiverInfo.ResourceInfo resourceInfo = new QueueResourceInfoImpl();

        @ProviderType
        /* loaded from: input_file:com/solace/messaging/receiver/PersistentMessageReceiverImpl$PersistentReceiverInfoImpl$QueueResourceInfoImpl.class */
        private class QueueResourceInfoImpl implements ManageableReceiver.PersistentReceiverInfo.ResourceInfo {
            private QueueResourceInfoImpl() {
            }

            @Override // com.solace.messaging.util.ManageableReceiver.PersistentReceiverInfo.ResourceInfo
            public boolean isDurable() {
                return PersistentMessageReceiverImpl.this.queue.isDurable();
            }

            @Override // com.solace.messaging.util.ManageableReceiver.PersistentReceiverInfo.ResourceInfo
            public String getName() {
                return PersistentMessageReceiverImpl.this.queue.getSolaceQueue().getName();
            }
        }

        public PersistentReceiverInfoImpl() {
        }

        @Override // com.solace.messaging.util.Identifiable
        public long getId() {
            return PersistentMessageReceiverImpl.this.id;
        }

        @Override // com.solace.messaging.resources.ReceiverInfo
        public String getInstanceName() {
            return PersistentMessageReceiverImpl.this.instanceName;
        }

        @Override // com.solace.messaging.util.ManageableReceiver.PersistentReceiverInfo
        public ManageableReceiver.PersistentReceiverInfo.ResourceInfo getResourceInfo() throws IllegalStateException {
            return this.resourceInfo;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Internal
    @ProviderType
    /* loaded from: input_file:com/solace/messaging/receiver/PersistentMessageReceiverImpl$SolaceQueueHolder.class */
    public static class SolaceQueueHolder implements Queue {
        private final boolean exclusivelyAccessible;
        private final com.solacesystems.jcsmp.Queue solaceQueue;

        private SolaceQueueHolder(com.solacesystems.jcsmp.Queue queue, boolean z) {
            this.solaceQueue = queue;
            this.exclusivelyAccessible = z;
        }

        @Override // com.solace.messaging.resources.Destination
        public String getName() {
            return this.solaceQueue.getName();
        }

        @Override // com.solace.messaging.resources.ResourceAccessibility
        public boolean isExclusivelyAccessible() {
            return this.exclusivelyAccessible;
        }

        @Override // com.solace.messaging.resources.ResourceDurability
        public boolean isDurable() {
            return this.solaceQueue.isDurable();
        }

        public com.solacesystems.jcsmp.Queue getSolaceQueue() {
            return this.solaceQueue;
        }

        static SolaceQueueHolder create(Queue queue, ClientSession clientSession) {
            return !queue.isDurable() ? queue.getName() == null ? new SolaceQueueHolder(QueueImpl.createWithInit(DestinationUtil.createNonDurQueueTrbTopic(clientSession.getVirtualRouterName(), (String) null), false, clientSession.getVirtualRouterName()), queue.isExclusivelyAccessible()) : new SolaceQueueHolder(QueueImpl.createWithInit(DestinationUtil.createNonDurQueueTrbTopic(clientSession.getVirtualRouterName(), queue.getName()), false, clientSession.getVirtualRouterName()), queue.isExclusivelyAccessible()) : new SolaceQueueHolder(QueueImpl.userCreateWithInit(queue.getName(), true, clientSession.getVirtualRouterName()), queue.isExclusivelyAccessible());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @Internal
    @ProviderType
    /* loaded from: input_file:com/solace/messaging/receiver/PersistentMessageReceiverImpl$StopSolaceConsumerTask.class */
    public static class StopSolaceConsumerTask implements Task<PersistentMessageReceiverImpl> {
        private StopSolaceConsumerTask() {
        }

        @Override // com.solace.messaging.util.internal.Task
        public void run(PersistentMessageReceiverImpl persistentMessageReceiverImpl) {
            try {
                if (persistentMessageReceiverImpl.solaceReceiver != null) {
                    try {
                        persistentMessageReceiverImpl.solaceReceiver.closeSync();
                        persistentMessageReceiverImpl.solaceReceiver.setMessageListener((XMLMessageListener) null);
                    } catch (Throwable th) {
                        persistentMessageReceiverImpl.solaceReceiver.setMessageListener((XMLMessageListener) null);
                        throw th;
                    }
                }
            } catch (JCSMPException e) {
                if (PersistentMessageReceiverImpl.logger.isWarnEnabled()) {
                    PersistentMessageReceiverImpl.logger.warn(persistentMessageReceiverImpl.instanceName + " internal Solace receiver failed to stop.", e);
                }
            }
        }
    }

    public PersistentMessageReceiverImpl(MessagingServiceInternalView messagingServiceInternalView, TypedProperties typedProperties, List<TopicSubscription> list, Queue queue) {
        this.receiverConfiguration = typedProperties;
        this.autoAckEnabled = isAutoAckConfigured(this.receiverConfiguration);
        this.serviceInternalView = messagingServiceInternalView;
        this.queue = SolaceQueueHolder.create(queue, this.serviceInternalView.getClientSession());
        this.buffer = ReceiverBuffers.createCapacityAwareBuffer(this.receiverConfiguration, this.flowControlHandler);
        this.consumerProps = createFlowConfiguration(this.receiverConfiguration, this.queue);
        this.asyncConsumerMessageDispatchTask = new AsyncConsumerMessageDispatchTask(this.buffer, this, this.messageHandlersLock, this.notEmpty);
        this.preStartTasks.add(addSubscriptions(new ArrayList(list), this.queue));
        this.messageReceiptFailureNotificationDispatcher = new MessageReceiptFailureNotificationDispatcher(this.receiverInfo);
        this.stopSolaceConsumerTask = new StopSolaceConsumerTask();
    }

    @Override // com.solace.messaging.receiver.PersistentMessageReceiver, com.solace.messaging.util.ManageableReceiver
    public ManageableReceiver.PersistentReceiverInfo receiverInfo() {
        return this.receiverInfo;
    }

    @Override // com.solace.messaging.receiver.PersistentMessageReceiver, com.solace.messaging.receiver.MessageReceiver, com.solace.messaging.util.LifecycleControl
    public PersistentMessageReceiver start() throws PubSubPlusClientException {
        try {
            startAsync().get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PubSubPlusClientException.RequestInterruptedException("Message receiver start was canceled", e);
        } catch (CancellationException e2) {
            throw new PubSubPlusClientException.RequestInterruptedException("Message receiver start was canceled", e2);
        } catch (ExecutionException e3) {
            Throwable cause = e3.getCause();
            if (cause != null) {
                if (cause instanceof PubSubPlusClientException) {
                    throw ((PubSubPlusClientException) cause);
                }
                if (cause instanceof IllegalStateException) {
                    throw ((IllegalStateException) cause);
                }
                throw new PubSubPlusClientException(cause);
            }
            if (logger.isErrorEnabled()) {
                logger.error(this.instanceName + " failed to start", e3);
            }
        }
        return this;
    }

    @Override // com.solace.messaging.util.LifecycleControl
    public void terminate(long j) throws PubSubPlusClientException.IncompleteMessageDeliveryException, PubSubPlusClientException, IllegalStateException {
        if (j == 0) {
            terminateNow();
            return;
        }
        try {
            terminateAsync(j).get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PubSubPlusClientException.RequestInterruptedException("Message receiver termination was interrupted", e);
        } catch (ExecutionException e2) {
            Throwable cause = e2.getCause();
            if (cause == null) {
                throw new PubSubPlusClientException(e2);
            }
            if (!(cause instanceof PubSubPlusClientException)) {
                throw new PubSubPlusClientException(cause);
            }
            throw ((PubSubPlusClientException) cause);
        } catch (Exception e3) {
            if (logger.isWarnEnabled()) {
                logger.warn(this.instanceName + " encountered problem during termination.", e3);
            }
        }
    }

    void terminateNow() throws PubSubPlusClientException.IncompleteMessageDeliveryException, PubSubPlusClientException, IllegalStateException {
        this.stateHolder.set(null, STATE_TERMINATED);
        AtomicInteger atomicInteger = new AtomicInteger(STATE_NOT_STARTED);
        Task<PersistentMessageReceiverImpl> task = persistentMessageReceiverImpl -> {
            try {
                int size = persistentMessageReceiverImpl.buffer.size();
                boolean z = size < STATE_STARTING;
                persistentMessageReceiverImpl.buffer.clear();
                if (!z) {
                    atomicInteger.set(size);
                    if (logger.isWarnEnabled()) {
                        logger.warn(persistentMessageReceiverImpl.instanceName + " non-gracefully terminated before all buffered messages were processed.");
                    }
                }
            } catch (PubSubPlusClientException.RequestInterruptedException e) {
                if (logger.isWarnEnabled()) {
                    logger.warn("Non-graceful termination of " + persistentMessageReceiverImpl.instanceName + " was interrupted");
                }
            }
        };
        try {
            if (logger.isDebugEnabled()) {
                logger.debug(this.instanceName + " is being non gracefully terminated");
            }
            onTerminate(this.stopSolaceConsumerTask, task);
            if (logger.isDebugEnabled()) {
                logger.debug(this.instanceName + " is terminated");
            }
            int i = atomicInteger.get();
            if (i > 0) {
                this.serviceInternalView.getApiMetricsCollector().increaseMetric(Manageable.ApiMetrics.Metric.RECEIVED_MESSAGES_TERMINATION_DISCARDED, i);
                throw new PubSubPlusClientException.IncompleteMessageDeliveryException(String.format("Processing of %d messages could not be completed due to non graceful termination", Integer.valueOf(i)), i);
            }
        } catch (Throwable th) {
            int i2 = atomicInteger.get();
            if (i2 <= 0) {
                throw th;
            }
            this.serviceInternalView.getApiMetricsCollector().increaseMetric(Manageable.ApiMetrics.Metric.RECEIVED_MESSAGES_TERMINATION_DISCARDED, i2);
            throw new PubSubPlusClientException.IncompleteMessageDeliveryException(String.format("Processing of %d messages could not be completed due to non graceful termination", Integer.valueOf(i2)), i2);
        }
    }

    @Override // com.solace.messaging.util.LifecycleControl
    public boolean isRunning() {
        return STATE_STARTED == this.stateHolder.getStamp();
    }

    @Override // com.solace.messaging.util.LifecycleControl
    public boolean isTerminated() {
        return STATE_TERMINATED == this.stateHolder.getStamp();
    }

    @Override // com.solace.messaging.util.LifecycleControl
    public boolean isTerminating() {
        return STATE_TERMINATING == this.stateHolder.getStamp();
    }

    @Override // com.solace.messaging.util.LifecycleControl
    public void setTerminationNotificationListener(LifecycleControl.TerminationNotificationListener terminationNotificationListener) {
        this.terminationNotificationDispatcher.setTerminationNotificationListener(terminationNotificationListener);
    }

    @Override // com.solace.messaging.receiver.MessageReceiver
    public void setReceiveFailureListener(MessageReceiver.ReceiveFailureListener receiveFailureListener) {
        this.messageReceiptFailureNotificationDispatcher.setReceiveFailureListener(receiveFailureListener);
    }

    @Override // com.solace.messaging.receiver.PersistentMessageReceiver, com.solace.messaging.receiver.MessageReceiver, com.solace.messaging.util.AsyncLifecycleControl
    public <PersistentMessageReceiver> CompletableFuture<PersistentMessageReceiver> startAsync() throws PubSubPlusClientException, IllegalStateException {
        int stamp = this.stateHolder.getStamp();
        if (STATE_TERMINATING == stamp || stamp == STATE_TERMINATED) {
            throw new IllegalStateException("Message receiver is already terminated");
        }
        if (!this.serviceInternalView.isConnected()) {
            throw new IllegalStateException("Message receiver can't be started when service is not connected");
        }
        while (this.serviceInternalView.isConnected()) {
            int[] iArr = new int[STATE_STARTING];
            CompletableFuture<PersistentMessageReceiver> completableFuture = this.stateHolder.get(iArr);
            switch (iArr[STATE_NOT_STARTED]) {
                case STATE_NOT_STARTED /* 0 */:
                    ExtendedCompletableFuture extendedCompletableFuture = new ExtendedCompletableFuture();
                    if (this.stateHolder.compareAndSet(null, extendedCompletableFuture, STATE_NOT_STARTED, STATE_STARTING)) {
                        if (logger.isDebugEnabled()) {
                            logger.debug(this.instanceName + " is being started");
                        }
                        try {
                            if (this.stateHolder.compareAndSet(extendedCompletableFuture, extendedCompletableFuture, STATE_STARTING, STATE_STARTED)) {
                                onStart();
                            } else if (this.stateHolder.getStamp() >= STATE_TERMINATING) {
                                onTerminate(null, null);
                                extendedCompletableFuture.completeExceptionally(new CancellationException("Starting of message receiver was interrupted"));
                                return extendedCompletableFuture;
                            }
                            extendedCompletableFuture.complete(this);
                            if (logger.isDebugEnabled()) {
                                logger.debug(this.instanceName + " is started");
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                            this.stateHolder.set(null, STATE_TERMINATED);
                            onTerminate(null, null);
                            extendedCompletableFuture.completeExceptionally(PubSubPlusClientException.of(e));
                            if (logger.isErrorEnabled()) {
                                logger.error(this.instanceName + " failed to start and is terminating", e);
                            }
                        }
                        return ExtendedCompletableFuture.onCancellation(extendedCompletableFuture, (obj, th) -> {
                            this.stateHolder.set(null, STATE_TERMINATED);
                            onTerminate(null, null);
                            if (logger.isDebugEnabled()) {
                                logger.debug(this.instanceName + " async start was canceled");
                            }
                        });
                    }
                case STATE_STARTING /* 1 */:
                case STATE_STARTED /* 2 */:
                    return completableFuture;
                case STATE_TERMINATING /* 3 */:
                case STATE_TERMINATED /* 4 */:
                default:
                    return ExtendedCompletableFuture.failedFuture(new IllegalStateException("Message receiver is already terminated"));
            }
        }
        return ExtendedCompletableFuture.failedFuture(new IllegalStateException("Message receiver can't be started when service is not connected"));
    }

    @Override // com.solace.messaging.receiver.PersistentMessageReceiver
    public InboundMessage receiveMessage() throws PubSubPlusClientException {
        if (!isRunning() && !isTerminating()) {
            throw new IllegalStateException("Message receiver is not started");
        }
        InboundMessage message = this.buffer.consume().getMessage();
        if (this.autoAckEnabled && message != null) {
            ((MessageReceiver.InboundMessageImpl) message).doAck();
        }
        return message;
    }

    @Override // com.solace.messaging.receiver.PersistentMessageReceiver
    public InboundMessage receiveMessage(long j) throws PubSubPlusClientException {
        if (!isRunning() && !isTerminating()) {
            throw new IllegalStateException("Message receiver is not started");
        }
        ReceiverBuffers.Receivable consume = this.buffer.consume(j, TimeUnit.MILLISECONDS);
        InboundMessage message = consume != null ? consume.getMessage() : null;
        if (this.autoAckEnabled && message != null) {
            ((MessageReceiver.InboundMessageImpl) message).doAck();
        }
        return message;
    }

    @Override // com.solace.messaging.receiver.PersistentMessageReceiver
    public InboundMessage receiveOrElse(MessageReceiver.InboundMessageSupplier inboundMessageSupplier) {
        if (!isRunning() && !isTerminating()) {
            throw new IllegalStateException("Message receiver is not started");
        }
        Validation.nullIllegal(inboundMessageSupplier, "Response supplier can't be null");
        ReceiverBuffers.Receivable consumeOrNull = this.buffer.consumeOrNull();
        InboundMessage message = consumeOrNull != null ? consumeOrNull.getMessage() : null;
        if (this.autoAckEnabled && message != null) {
            ((MessageReceiver.InboundMessageImpl) message).doAck();
        }
        return consumeOrNull != null ? message : inboundMessageSupplier.get();
    }

    @Override // com.solace.messaging.receiver.PersistentMessageReceiver
    public void receiveAsync(MessageReceiver.MessageHandler messageHandler) throws PubSubPlusClientException {
        Validation.nullIllegal(messageHandler, "Message handler can't be null");
        ReentrantLock reentrantLock = this.messageHandlersLock;
        reentrantLock.lock();
        try {
            if (!this.messageHandlerRef.compareAndSet(null, new ExecutableMessageHandler(messageHandler, null, logger, this.instanceName))) {
                throw new IllegalStateException("receiveAsync can be called once only for the given receiver instance");
            }
            this.notEmpty.signalAll();
            reentrantLock.unlock();
        } catch (Throwable th) {
            reentrantLock.unlock();
            throw th;
        }
    }

    @Override // com.solace.messaging.receiver.PersistentMessageReceiver
    public void receiveAsync(MessageReceiver.MessageHandler messageHandler, ExecutorService executorService) throws PubSubPlusClientException {
        Validation.nullIllegal(messageHandler, "Message handler can't be null");
        Validation.nullIllegal(executorService, "Executor service can't be null");
        ReentrantLock reentrantLock = this.messageHandlersLock;
        reentrantLock.lock();
        try {
            if (!this.messageHandlerRef.compareAndSet(null, new ExecutableMessageHandler(messageHandler, executorService, logger, this.instanceName))) {
                throw new IllegalStateException("receiveAsync can be called once only for the given receiver instance");
            }
            this.notEmpty.signalAll();
            reentrantLock.unlock();
        } catch (Throwable th) {
            reentrantLock.unlock();
            throw th;
        }
    }

    @Override // com.solace.messaging.receiver.PersistentMessageReceiver, com.solace.messaging.util.AsyncLifecycleControl
    public <PersistentMessageReceiver> void startAsync(CompletionListener<PersistentMessageReceiver> completionListener) throws PubSubPlusClientException, IllegalStateException {
        Validation.nullIllegal(completionListener, "Start listener can't be null");
        startAsync().whenComplete((obj, th) -> {
            Throwable cause;
            if (th == null) {
                cause = null;
            } else {
                try {
                    cause = th.getCause();
                } catch (Exception e) {
                    if (logger.isWarnEnabled()) {
                        logger.warn("Application code throw an unhandled exception by processing async start completion notification", e);
                        return;
                    }
                    return;
                }
            }
            completionListener.onCompletion(obj, cause);
        });
    }

    @Override // com.solace.messaging.receiver.AcknowledgementSupport
    public void ack(InboundMessage inboundMessage) throws PubSubPlusClientException {
        Validation.nullIllegal(inboundMessage, "Can't ack null message");
        if (this.autoAckEnabled && logger.isWarnEnabled()) {
            logger.warn("ack is ignored because the message acknowledgement mode is auto-ack");
        }
        ((MessageReceiver.InboundMessageImpl) inboundMessage).doAck();
    }

    @Override // com.solace.messaging.receiver.AsyncReceiverSubscriptions
    public void addSubscriptionAsync(TopicSubscription topicSubscription, AsyncReceiverSubscriptions.SubscriptionChangeListener subscriptionChangeListener) throws PubSubPlusClientException {
        if (topicSubscription instanceof CachedTopicSubscription) {
            throw new IllegalArgumentException("Cached topic subscriptions are not supported on persistent messaging");
        }
        Validation.nullIllegal(topicSubscription, "Topic subscription can't be null");
        Validation.nullIllegal(subscriptionChangeListener, "Listener can't be null");
        Validation.nullOrEmptyIllegal(topicSubscription.getName(), "Topic subscription name can't be null or empty");
        addSubscriptionAsync0(topicSubscription, this.queue, subscriptionChangeListener, this);
    }

    @Override // com.solace.messaging.receiver.AsyncReceiverSubscriptions
    public void removeSubscriptionAsync(TopicSubscription topicSubscription, AsyncReceiverSubscriptions.SubscriptionChangeListener subscriptionChangeListener) throws PubSubPlusClientException {
        if (topicSubscription instanceof CachedTopicSubscription) {
            throw new IllegalArgumentException("Cached topic subscriptions are not supported on persistent messaging");
        }
        Validation.nullIllegal(topicSubscription, "Topic subscription can't be null");
        Validation.nullIllegal(subscriptionChangeListener, "Listener can't be null");
        Validation.nullOrEmptyIllegal(topicSubscription.getName(), "Topic subscription name can't be null or empty");
        removeSubscriptionAsync0(topicSubscription, this.queue, subscriptionChangeListener, this);
    }

    @Override // com.solace.messaging.receiver.ReceiverFlowControl
    public void pause() throws PubSubPlusClientException {
        if (this.serviceInternalView.isConnected() && isRunning()) {
            try {
                this.solaceReceiver.stopSync();
            } catch (JCSMPInterruptedException e) {
                throw new PubSubPlusClientException("Failed to pause message receiver", e);
            }
        } else {
            if (!this.serviceInternalView.isConnected()) {
                throw new IllegalStateException("Messaging service not connected");
            }
            if (!isRunning()) {
                throw new IllegalStateException("Message receiver is not running");
            }
        }
    }

    @Override // com.solace.messaging.receiver.ReceiverFlowControl
    public void resume() throws PubSubPlusClientException {
        if (!this.serviceInternalView.isConnected() || !isRunning()) {
            if (!this.serviceInternalView.isConnected()) {
                throw new IllegalStateException("Messaging service not connected");
            }
            if (!isRunning()) {
                throw new IllegalStateException("Message receiver is not running");
            }
            return;
        }
        try {
            this.solaceReceiver.startSync();
        } catch (JCSMPException e) {
            throw new PubSubPlusClientException("Failed to restart message receiver", e);
        } catch (JCSMPInterruptedException e2) {
            throw new PubSubPlusClientException("Failed to restart message receiver", e2);
        }
    }

    @Override // com.solace.messaging.receiver.ReceiverSubscriptions
    public void addSubscription(TopicSubscription topicSubscription) throws PubSubPlusClientException, InterruptedException {
        if (topicSubscription instanceof CachedTopicSubscription) {
            throw new IllegalArgumentException("Cached topic subscriptions are not supported on persistent messaging");
        }
        Validation.nullIllegal(topicSubscription, "Topic subscription can't be null");
        Validation.nullOrEmptyIllegal(topicSubscription.getName(), "Topic subscription name can't be null or empty");
        addSubscription0(topicSubscription, this.queue, this);
    }

    @Override // com.solace.messaging.receiver.ReceiverSubscriptions
    public void removeSubscription(TopicSubscription topicSubscription) throws PubSubPlusClientException, InterruptedException {
        if (topicSubscription instanceof CachedTopicSubscription) {
            throw new IllegalArgumentException("Cached topic subscriptions are not supported on persistent messaging");
        }
        Validation.nullIllegal(topicSubscription, "Topic subscription can't be null");
        Validation.nullOrEmptyIllegal(topicSubscription.getName(), "Topic subscription name can't be null or empty");
        removeSubscription0(topicSubscription, this.queue, this);
    }

    /* JADX WARN: Finally extract failed */
    @Override // com.solace.messaging.util.AsyncLifecycleControl
    public CompletableFuture<Void> terminateAsync(long j) throws PubSubPlusClientException, IllegalStateException {
        Validation.smallerThanNumbersIllegal(1L, j, "Grace period < 1");
        AtomicInteger atomicInteger = new AtomicInteger(STATE_NOT_STARTED);
        while (true) {
            int[] iArr = new int[STATE_STARTING];
            CompletableFuture completableFuture = this.stateHolder.get(iArr);
            switch (iArr[STATE_NOT_STARTED]) {
                case STATE_NOT_STARTED /* 0 */:
                    if (!this.stateHolder.compareAndSet(null, null, STATE_NOT_STARTED, STATE_TERMINATED)) {
                        break;
                    } else {
                        onTerminate(null, null);
                        return CompletableFuture.completedFuture(null);
                    }
                case STATE_STARTING /* 1 */:
                    this.stateHolder.set(null, STATE_TERMINATED);
                    completableFuture.cancel(true);
                    return CompletableFuture.completedFuture(null);
                case STATE_STARTED /* 2 */:
                    ExtendedCompletableFuture extendedCompletableFuture = new ExtendedCompletableFuture();
                    if (!this.stateHolder.compareAndSet(completableFuture, extendedCompletableFuture, STATE_STARTED, STATE_TERMINATING)) {
                        break;
                    } else {
                        try {
                            onTerminate(this.stopSolaceConsumerTask, persistentMessageReceiverImpl -> {
                                if (this.buffer.awaitEmpty(j, TimeUnit.MILLISECONDS)) {
                                    return;
                                }
                                if (logger.isWarnEnabled()) {
                                    logger.warn(persistentMessageReceiverImpl.instanceName + " shutdown gracefully before all buffered messages were processed, grace period was not sufficient: " + j);
                                }
                                atomicInteger.set(persistentMessageReceiverImpl.buffer.size());
                                onGracefulTerminateEmptyRemainingBuffer(persistentMessageReceiverImpl.buffer);
                            });
                            this.stateHolder.set(null, STATE_TERMINATED);
                            int i = atomicInteger.get();
                            if (i > 0) {
                                this.serviceInternalView.getApiMetricsCollector().increaseMetric(Manageable.ApiMetrics.Metric.RECEIVED_MESSAGES_TERMINATION_DISCARDED, i);
                                extendedCompletableFuture.completeExceptionally(new PubSubPlusClientException.IncompleteMessageDeliveryException(String.format("Processing of %d messages could not be completed due to expiration of a grace period", Integer.valueOf(i)), i));
                            } else {
                                extendedCompletableFuture.complete(null);
                            }
                            return extendedCompletableFuture;
                        } catch (Throwable th) {
                            this.stateHolder.set(null, STATE_TERMINATED);
                            throw th;
                        }
                    }
                case STATE_TERMINATING /* 3 */:
                case STATE_TERMINATED /* 4 */:
                default:
                    return CompletableFuture.completedFuture(null);
            }
        }
    }

    @Override // com.solace.messaging.util.AsyncLifecycleControl
    public void terminateAsync(CompletionListener<Void> completionListener, long j) throws PubSubPlusClientException, IllegalStateException {
        Validation.nullIllegal(completionListener, "Termination listener can't be null");
        terminateAsync(j).whenComplete((r5, th) -> {
            Throwable cause;
            if (th == null) {
                cause = null;
            } else {
                try {
                    cause = th.getCause();
                } catch (Exception e) {
                    if (logger.isWarnEnabled()) {
                        logger.warn("Application code throw an unhandled exception by processing termination completion notification", e);
                        return;
                    }
                    return;
                }
            }
            completionListener.onCompletion(null, cause);
        });
    }

    ConsumerFlowProperties createFlowConfiguration(TypedProperties typedProperties, SolaceQueueHolder solaceQueueHolder) {
        ConsumerFlowProperties consumerFlowProperties = new ConsumerFlowProperties();
        consumerFlowProperties.setActiveFlowIndication(true);
        String property = typedProperties.getProperty(SolaceProperties.ReceiverProperties.PERSISTENT_MESSAGE_SELECTOR_QUERY);
        if (property != null && !property.isEmpty()) {
            consumerFlowProperties.setSelector(property);
        }
        consumerFlowProperties.setEndpoint(solaceQueueHolder.getSolaceQueue());
        String property2 = typedProperties.getProperty(SolaceProperties.ReceiverProperties.PERSISTENT_MESSAGE_REPLAY_STRATEGY);
        if (property2 != null) {
            if (SolaceConstants.ReceiverConstants.PERSISTENT_REPLAY_ALL.equals(property2)) {
                consumerFlowProperties.setReplayStartLocation(new ReplayStartLocationBeginningImpl());
            } else if (SolaceConstants.ReceiverConstants.PERSISTENT_REPLAY_TIME_BASED.equals(property2)) {
                consumerFlowProperties.setReplayStartLocation(new ReplayStartLocationDateImpl(Date.from(((ZonedDateTime) typedProperties.getObjectProperty(SolaceProperties.ReceiverProperties.PERSISTENT_MESSAGE_REPLAY_STRATEGY_TIME_BASED_START_TIME)).toInstant())));
            } else if (SolaceConstants.ReceiverConstants.PERSISTENT_REPLAY_REPLICATION_GROUP_MESSAGE_ID_BASED.equals(property2)) {
                try {
                    consumerFlowProperties.setReplayStartLocation(ReplicationGroupMessageIdImpl.createReplicationGroupMessageId(typedProperties.getProperty(SolaceProperties.ReceiverProperties.PERSISTENT_REPLAY_REPLICATION_GROUP_MESSAGE_ID)));
                } catch (InvalidPropertiesException e) {
                    logger.error("Malformed group message id bypassed input validation", e);
                }
            }
        }
        consumerFlowProperties.setAckMode(typedProperties.getProperty("message_ack_mode"));
        consumerFlowProperties.setAckThreshold(typedProperties.getIntegerProperty("sub_ack_window_threshold").intValue());
        consumerFlowProperties.setAckTimerInMsecs(typedProperties.getIntegerProperty("sub_ack_time").intValue());
        consumerFlowProperties.setWindowedAckMaxSize(typedProperties.getIntegerProperty("sub_ack_window_size").intValue());
        consumerFlowProperties.setAckTimerInMsecs(typedProperties.getIntegerProperty("sub_ack_time").intValue());
        consumerFlowProperties.setReconnectTries(typedProperties.getIntegerProperty(SolaceProperties.ReceiverProperties.PERSISTENT_RECONNECTION_ATTEMPTS).intValue());
        consumerFlowProperties.setReconnectRetryIntervalInMsecs(typedProperties.getIntegerProperty(SolaceProperties.ReceiverProperties.PERSISTENT_RECONNECTION_ATTEMPTS_WAIT_INTERVAL).intValue());
        return consumerFlowProperties;
    }

    Task<PersistentMessageReceiverImpl> addSubscriptions(List<TopicSubscription> list, SolaceQueueHolder solaceQueueHolder) {
        return list.isEmpty() ? NO_OP_TASK : persistentMessageReceiverImpl -> {
            if (persistentMessageReceiverImpl.isRunning()) {
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    TopicSubscription topicSubscription = (TopicSubscription) it.next();
                    if (!persistentMessageReceiverImpl.serviceInternalView.getClientSession().isConnected()) {
                        throw new IllegalStateException("Subscriptions can't be added while service is not connected");
                    }
                    if (Thread.currentThread().isInterrupted()) {
                        return;
                    }
                    if (topicSubscription != null && solaceQueueHolder != null) {
                        addSubscription0(topicSubscription, solaceQueueHolder, persistentMessageReceiverImpl);
                    }
                }
            }
        };
    }

    void onGracefulTerminateEmptyRemainingBuffer(ReceiverBuffers.ReceiverBuffer receiverBuffer) {
        receiverBuffer.clear();
    }

    void addInitialSubscriptions() {
        this.preStartTasks.removeIf(task -> {
            task.run(this);
            return true;
        });
    }

    void testBrokerCapabilities(List<CapabilityType> list) {
        if (list == null || list.isEmpty() || !this.serviceInternalView.getClientSession().isConnected()) {
            return;
        }
        for (CapabilityType capabilityType : list) {
            if (!this.serviceInternalView.getClientSession().isCapable(capabilityType)) {
                throw new PubSubPlusClientException.ServiceCapabilityException("Service does not support configured capability " + capabilityType, capabilityType);
            }
        }
    }

    void createMissingResources(SolaceQueueHolder solaceQueueHolder, TypedProperties typedProperties) {
        if (SolaceConstants.ReceiverConstants.PERSISTENT_RECEIVER_CREATE_ON_START_MISSING_RESOURCES.equals(typedProperties.getProperty(SolaceProperties.ReceiverProperties.PERSISTENT_MISSING_RESOURCE_CREATION_STRATEGY)) && solaceQueueHolder.isDurable()) {
            EndpointProperties endpointProperties = new EndpointProperties();
            endpointProperties.setPermission(Integer.valueOf(STATE_TERMINATED));
            if (solaceQueueHolder.isExclusivelyAccessible()) {
                endpointProperties.setAccessType(Integer.valueOf(STATE_STARTING));
            } else {
                endpointProperties.setAccessType(Integer.valueOf(STATE_NOT_STARTED));
            }
            try {
                this.serviceInternalView.getClientSession().provision(solaceQueueHolder.getSolaceQueue(), endpointProperties, 1L);
            } catch (JCSMPErrorResponseException e) {
                if (403 == e.getResponseCode()) {
                    throw new PubSubPlusClientException.AuthorizationException("Queue can't be provisioned, user not authorized", e);
                }
            } catch (PropertyMismatchException e2) {
                if (logger.isErrorEnabled()) {
                    logger.error(this.instanceName + " could not provision queue: " + solaceQueueHolder.getSolaceQueue().getName(), e2);
                }
                throw new PubSubPlusClientException.ResourceProvisioningException(String.format("Queue '%s' already provisioned using different property set", solaceQueueHolder.getSolaceQueue().getName()), e2);
            } catch (JCSMPException e3) {
                throw new PubSubPlusClientException("Queue can't be provisioned, see cause for more details", e3);
            }
        }
    }

    void addSubscription0(TopicSubscription topicSubscription, SolaceQueueHolder solaceQueueHolder, PersistentMessageReceiverImpl persistentMessageReceiverImpl) throws PubSubPlusClientException {
        if (!persistentMessageReceiverImpl.serviceInternalView.getClientSession().isConnected()) {
            throw new IllegalStateException("Subscription can't be added when service is not connected");
        }
        ReentrantLock reentrantLock = this.subscriptionsLock;
        try {
            try {
                reentrantLock.lockInterruptibly();
                if (!this.appliedTopicSubscriptions.contains(topicSubscription)) {
                    Subscription createTopicInstance = createTopicInstance(topicSubscription);
                    try {
                        persistentMessageReceiverImpl.serviceInternalView.getClientSession().addSubscription(solaceQueueHolder.getSolaceQueue(), createTopicInstance, STATE_TERMINATED);
                        persistentMessageReceiverImpl.appliedTopicSubscriptions.add(topicSubscription);
                    } catch (JCSMPException e) {
                        if (logger.isErrorEnabled()) {
                            logger.error(this.instanceName + " failed to apply subscription: " + createTopicInstance, e);
                        }
                    }
                }
            } catch (InterruptedException e2) {
                if (logger.isErrorEnabled()) {
                    logger.error(this.instanceName + " failed to apply subscription due to interruption", e2);
                }
                throw new PubSubPlusClientException.RequestInterruptedException("Failed to apply subscription due to interruption", e2);
            }
        } finally {
            reentrantLock.unlock();
        }
    }

    void addSubscriptionAsync0(TopicSubscription topicSubscription, SolaceQueueHolder solaceQueueHolder, AsyncReceiverSubscriptions.SubscriptionChangeListener subscriptionChangeListener, PersistentMessageReceiverImpl persistentMessageReceiverImpl) throws PubSubPlusClientException {
        if (!persistentMessageReceiverImpl.serviceInternalView.getClientSession().isConnected()) {
            throw new IllegalStateException("Subscription can't be added when service is not connected");
        }
        try {
            this.asyncSubscriptionsExecutorService.submit(() -> {
                AsyncReceiverSubscriptions.AddSubscriptionListenerAdapter addSubscriptionListenerAdapter = new AsyncReceiverSubscriptions.AddSubscriptionListenerAdapter(subscriptionChangeListener, topicSubscription);
                try {
                    addSubscription0(topicSubscription, solaceQueueHolder, persistentMessageReceiverImpl);
                    addSubscriptionListenerAdapter.handleSuccess(topicSubscription);
                    return null;
                } catch (Exception e) {
                    addSubscriptionListenerAdapter.handleError(topicSubscription, e);
                    return null;
                }
            });
        } catch (RejectedExecutionException e) {
            if (logger.isErrorEnabled()) {
                logger.error(this.instanceName + " could not schedule adding of subscription: " + topicSubscription);
            }
            throw new PubSubPlusClientException("Adding subscription task was rejected by internal executor service", e);
        }
    }

    void removeSubscriptionAsync0(TopicSubscription topicSubscription, SolaceQueueHolder solaceQueueHolder, AsyncReceiverSubscriptions.SubscriptionChangeListener subscriptionChangeListener, PersistentMessageReceiverImpl persistentMessageReceiverImpl) throws PubSubPlusClientException {
        try {
            this.asyncSubscriptionsExecutorService.submit(() -> {
                this.appliedTopicSubscriptions.removeIf(topicSubscription2 -> {
                    if (!topicSubscription.equals(topicSubscription2)) {
                        return false;
                    }
                    AsyncReceiverSubscriptions.RemoveSubscriptionListenerAdapter removeSubscriptionListenerAdapter = new AsyncReceiverSubscriptions.RemoveSubscriptionListenerAdapter(subscriptionChangeListener, topicSubscription);
                    try {
                        removeSubscription0(topicSubscription, solaceQueueHolder, persistentMessageReceiverImpl);
                        try {
                            removeSubscriptionListenerAdapter.handleSuccess(topicSubscription);
                        } catch (Exception e) {
                            if (logger.isWarnEnabled()) {
                                logger.warn("Application code throw an unhandled exception by processing adding of subscription: " + topicSubscription, e);
                            }
                        }
                        return true;
                    } catch (Exception e2) {
                        try {
                            removeSubscriptionListenerAdapter.handleError(topicSubscription, e2);
                            return true;
                        } catch (Exception e3) {
                            if (!logger.isWarnEnabled()) {
                                return true;
                            }
                            logger.warn("Application code throw an unhandled exception by processing removal of subscription: " + topicSubscription, e2);
                            return true;
                        }
                    }
                });
                return null;
            });
        } catch (RejectedExecutionException e) {
            if (logger.isErrorEnabled()) {
                logger.error(this.instanceName + " could not schedule removing of subscription: " + topicSubscription);
            }
            throw new PubSubPlusClientException("Subscription removal task was rejected by internal executor service", e);
        }
    }

    void removeSubscription0(TopicSubscription topicSubscription, SolaceQueueHolder solaceQueueHolder, PersistentMessageReceiverImpl persistentMessageReceiverImpl) throws PubSubPlusClientException {
        this.appliedTopicSubscriptions.removeIf(topicSubscription2 -> {
            if (!topicSubscription.equals(topicSubscription2)) {
                return false;
            }
            try {
                persistentMessageReceiverImpl.serviceInternalView.getClientSession().removeSubscription(solaceQueueHolder.getSolaceQueue(), createTopicInstance(topicSubscription), STATE_TERMINATED);
                return true;
            } catch (Exception e) {
                if (logger.isErrorEnabled()) {
                    logger.error(this.instanceName + " failed to remove subscription: " + topicSubscription, e);
                }
                throw new PubSubPlusClientException("Un-subscription could not be performed", e);
            }
        });
    }

    boolean isMessageReplayEnabled(TypedProperties typedProperties) {
        String property = typedProperties.getProperty(SolaceProperties.ReceiverProperties.PERSISTENT_MESSAGE_REPLAY_STRATEGY);
        return (property == null || property.isEmpty()) ? false : true;
    }

    void onStart() throws PubSubPlusClientException.ResourceProvisioningException, PubSubPlusClientException.AuthorizationException, PubSubPlusClientException.ServiceCapabilityException, PubSubPlusClientException {
        this.serviceInternalView.addReconnectionListener(this.reconnectionListener);
        this.serviceInternalView.addReconnectionAttemptListener(this.reconnectionAttemptListener);
        this.serviceInternalView.getClientSession().addServiceInterruptionListener(this.serviceInterruptionListener);
        this.serviceInternalView.getClientSession().addClientSessionStateListener(this.closedSessionListener);
        testBrokerCapabilities(getRequiredCapabilities(this.receiverConfiguration, this.queue));
        createMissingResources(this.queue, this.receiverConfiguration);
        boolean isMessageReplayEnabled = isMessageReplayEnabled(this.receiverConfiguration);
        if (isMessageReplayEnabled) {
            addInitialSubscriptions();
        }
        if (this.solaceReceiver != null) {
            this.solaceReceiver.close(true);
        }
        this.solaceReceiver = createSolaceConsumer(this.queue, this);
        if (!isMessageReplayEnabled) {
            addInitialSubscriptions();
        }
        try {
            this.defaultReceiverExecutorService.submit(this.asyncConsumerMessageDispatchTask);
            try {
                this.solaceReceiver.startSync();
            } catch (JCSMPException e) {
                e.printStackTrace();
            }
        } catch (RejectedExecutionException e2) {
            if (logger.isErrorEnabled()) {
                logger.error(this.instanceName + " could not schedule consumer dispatcher task");
            }
            throw new PubSubPlusClientException("Internal Executor service rejected consumer dispatcher task", e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public LifecycleControl.TerminationEvent mapFlowDownException(Exception exc) {
        if (!(exc instanceof JCSMPErrorResponseException)) {
            return exc instanceof PubSubPlusClientException ? new TerminationEventImpl(Instant.now().toEpochMilli(), "Receiver flow is shut down", (PubSubPlusClientException) exc) : new TerminationEventImpl(Instant.now().toEpochMilli(), "Receiver flow is shut down", new PubSubPlusClientException(exc));
        }
        switch (((JCSMPErrorResponseException) exc).getSubcodeEx()) {
            case 78:
            case 79:
            case 80:
            case 81:
            case 82:
            case 83:
            case 84:
                return new TerminationEventImpl(Instant.now().toEpochMilli(), "Receiver flow is down due to replay problem", new PubSubPlusClientException.MessageReplayException(JCSMPErrorResponseSubcodeEx.getSubcodeAsString(((JCSMPErrorResponseException) exc).getSubcodeEx()), exc));
            default:
                return new TerminationEventImpl(Instant.now().toEpochMilli(), "Receiver flow is shut down", new PubSubPlusClientException(exc));
        }
    }

    FlowReceiver createSolaceConsumer(SolaceQueueHolder solaceQueueHolder, final PersistentMessageReceiverImpl persistentMessageReceiverImpl) {
        FlowEventHandler flowEventHandler = new FlowEventHandler() { // from class: com.solace.messaging.receiver.PersistentMessageReceiverImpl.5
            final String instanceReconnected;
            final String instanceReconnecting;

            {
                this.instanceReconnected = PersistentMessageReceiverImpl.this.instanceName + " reconnected";
                this.instanceReconnecting = PersistentMessageReceiverImpl.this.instanceName + " reconnecting";
            }

            public void handleEvent(Object obj, FlowEventArgs flowEventArgs) {
                ClientSession clientSession = PersistentMessageReceiverImpl.this.serviceInternalView.getClientSession();
                String str = "n/a";
                if (clientSession != null) {
                    if (clientSession.getClientChannel() != null && clientSession.getClientChannel().getSmfClient() != null) {
                        str = clientSession.getClientChannel().getSmfClient().getRemoteHost();
                    }
                    if (flowEventArgs.getEvent() == FlowEvent.FLOW_DOWN) {
                        if (clientSession.isReconnecting()) {
                            if (PersistentMessageReceiverImpl.logger.isDebugEnabled()) {
                                PersistentMessageReceiverImpl.logger.debug(PersistentMessageReceiverImpl.this.instanceName + " flow receiver is temporarily down while reconnecting");
                                return;
                            }
                            return;
                        } else {
                            PersistentMessageReceiverImpl.this.stateHolder.set(null, PersistentMessageReceiverImpl.STATE_TERMINATED);
                            PersistentMessageReceiverImpl.this.terminationNotificationDispatcher.onTermination(PersistentMessageReceiverImpl.this.mapFlowDownException(flowEventArgs.getException()));
                            PersistentMessageReceiverImpl.this.terminateOnUnsolicitedInterruption();
                            return;
                        }
                    }
                    if (flowEventArgs.getEvent() == FlowEvent.FLOW_UP) {
                        if (PersistentMessageReceiverImpl.logger.isDebugEnabled()) {
                            PersistentMessageReceiverImpl.logger.debug(PersistentMessageReceiverImpl.this.instanceName + " flow receiver is started");
                            return;
                        }
                        return;
                    }
                    if (flowEventArgs.getEvent() == FlowEvent.FLOW_ACTIVE) {
                        if (PersistentMessageReceiverImpl.logger.isDebugEnabled()) {
                            PersistentMessageReceiverImpl.logger.debug(PersistentMessageReceiverImpl.this.instanceName + " flow receiver is active");
                        }
                    } else if (flowEventArgs.getEvent() == FlowEvent.FLOW_RECONNECTING) {
                        if (PersistentMessageReceiverImpl.logger.isDebugEnabled()) {
                            PersistentMessageReceiverImpl.logger.debug(PersistentMessageReceiverImpl.this.instanceName + " flow receiver is reconnecting");
                        }
                        PersistentMessageReceiverImpl.this.reconnectionAttemptListener.onReconnecting(new ServiceEventImpl(str, flowEventArgs.getException(), this.instanceReconnecting));
                    } else if (flowEventArgs.getEvent() == FlowEvent.FLOW_RECONNECTED) {
                        if (PersistentMessageReceiverImpl.logger.isDebugEnabled()) {
                            PersistentMessageReceiverImpl.logger.debug(PersistentMessageReceiverImpl.this.instanceName + " flow receiver is reconnected");
                        }
                        PersistentMessageReceiverImpl.this.reconnectionListener.onReconnected(new ServiceEventImpl(str, flowEventArgs.getException(), this.instanceReconnected));
                    }
                }
            }
        };
        InboundMessage.SolaceMessageListener solaceMessageListener = new InboundMessage.SolaceMessageListener(solaceQueueHolder.getName()) { // from class: com.solace.messaging.receiver.PersistentMessageReceiverImpl.6
            @Override // com.solace.messaging.receiver.InboundMessage.SolaceMessageListener
            public void onException(PubSubPlusClientException pubSubPlusClientException) {
                persistentMessageReceiverImpl.messageReceiptFailureNotificationDispatcher.onException(pubSubPlusClientException);
                if (PersistentMessageReceiverImpl.logger.isErrorEnabled()) {
                    PersistentMessageReceiverImpl.logger.error(persistentMessageReceiverImpl.instanceName + " encountered problem during message reception.", pubSubPlusClientException);
                }
            }

            @Override // com.solace.messaging.receiver.InboundMessage.SolaceMessageListener
            public void onReceive(InboundMessage inboundMessage) {
                try {
                    persistentMessageReceiverImpl.buffer.insert(ReceiverBuffers.Receivable.of(inboundMessage));
                } catch (Exception e) {
                    PersistentMessageReceiverImpl.this.serviceInternalView.getClientSession().getSessionStats().incStat(StatType.MESSAGES_DISCARDED_INTERNAL);
                    persistentMessageReceiverImpl.messageReceiptFailureNotificationDispatcher.onException(e);
                    if (PersistentMessageReceiverImpl.logger.isErrorEnabled()) {
                        PersistentMessageReceiverImpl.logger.error(persistentMessageReceiverImpl.instanceName + " encountered problem during message reception.", e);
                    }
                }
            }
        };
        try {
            if (solaceQueueHolder.isDurable()) {
                try {
                    return persistentMessageReceiverImpl.serviceInternalView.getClientSession().createFlow(solaceMessageListener, this.consumerProps, null, flowEventHandler);
                } catch (AccessDeniedException e) {
                    throw new PubSubPlusClientException.AuthorizationException("User not authorized to bind a flow to or to a queue", e);
                }
            }
            try {
                EndpointProperties endpointProperties = new EndpointProperties();
                endpointProperties.setAccessType(Integer.valueOf(STATE_STARTING));
                endpointProperties.setPermission(Integer.valueOf(STATE_TERMINATED));
                return persistentMessageReceiverImpl.serviceInternalView.getClientSession().createFlow(solaceMessageListener, this.consumerProps, endpointProperties, flowEventHandler);
            } catch (AccessDeniedException e2) {
                throw new PubSubPlusClientException.AuthorizationException("User not authorized to bind a flow to or to provision a temporary queue", e2);
            }
        } catch (JCSMPException e3) {
            throw new PubSubPlusClientException("Solace consumer flow could not be created", e3);
        } catch (JCSMPErrorResponseException e4) {
            int subcodeEx = e4.getSubcodeEx();
            if (73 <= subcodeEx && 84 >= subcodeEx) {
                throw new PubSubPlusClientException.MessageReplayException(e4.getMessage(), e4);
            }
            if (20 == subcodeEx) {
                throw new PubSubPlusClientException.MissingResourceException(e4.getMessage(), e4);
            }
            throw new PubSubPlusClientException("Solace consumer flow could not be created", e4);
        }
    }

    void onTerminate(Task<PersistentMessageReceiverImpl> task, Task<PersistentMessageReceiverImpl> task2) {
        if (task != null) {
            try {
                task.run(this);
            } finally {
                if (task2 != null) {
                    task2.run(this);
                }
            }
        }
        this.appliedTopicSubscriptions.clear();
        this.preStartTasks.clear();
        this.serviceInternalView.removeReconnectionListener(this.reconnectionListener);
        this.serviceInternalView.removeReconnectionAttemptListener(this.reconnectionAttemptListener);
        this.serviceInternalView.getClientSession().removeServiceInterruptionListener(this.serviceInterruptionListener);
        this.serviceInternalView.getClientSession().removeClientSessionStateListener(this.closedSessionListener);
        if (logger.isDebugEnabled()) {
            logger.debug(this.instanceName + " receiver is shutdown");
        }
        this.buffer.clearDiscardedHandler();
        if (!this.defaultReceiverExecutorService.isShutdown()) {
            this.defaultReceiverExecutorService.shutdown();
        }
    }

    void terminateOnUnsolicitedInterruption() throws PubSubPlusClientException {
        this.stateHolder.set(null, STATE_TERMINATED);
        Task<PersistentMessageReceiverImpl> task = persistentMessageReceiverImpl -> {
            try {
                if (persistentMessageReceiverImpl.solaceReceiver != null) {
                    persistentMessageReceiverImpl.solaceReceiver.stopSync();
                }
            } catch (JCSMPInterruptedException e) {
                if (logger.isWarnEnabled()) {
                    logger.warn(persistentMessageReceiverImpl.instanceName + " could not be stopped after service was interrupted.");
                }
            } finally {
                persistentMessageReceiverImpl.solaceReceiver.setMessageListener((XMLMessageListener) null);
            }
            try {
                int size = persistentMessageReceiverImpl.buffer.size();
                boolean z = size < STATE_STARTING;
                persistentMessageReceiverImpl.buffer.clear();
                if (!z) {
                    this.serviceInternalView.getApiMetricsCollector().increaseMetric(Manageable.ApiMetrics.Metric.RECEIVED_MESSAGES_TERMINATION_DISCARDED, size);
                    if (logger.isWarnEnabled()) {
                        logger.warn(persistentMessageReceiverImpl.instanceName + " non-gracefully terminated before all buffered messages were processed.");
                    }
                }
            } catch (PubSubPlusClientException.RequestInterruptedException e2) {
                if (logger.isWarnEnabled()) {
                    logger.warn("Non-graceful termination of " + persistentMessageReceiverImpl.instanceName + " was interrupted");
                }
            }
        };
        if (logger.isDebugEnabled()) {
            logger.debug(this.instanceName + " is being non gracefully terminated due to service interruption");
        }
        onTerminate(null, task);
        if (logger.isDebugEnabled()) {
            logger.debug(this.instanceName + " is terminated");
        }
    }

    List<CapabilityType> getRequiredCapabilities(TypedProperties typedProperties, Queue queue) {
        LinkedList linkedList = new LinkedList();
        linkedList.add(CapabilityType.PUB_FLOW_GUARANTEED);
        linkedList.add(CapabilityType.QUEUE_SUBSCRIPTIONS);
        linkedList.add(CapabilityType.ACTIVE_FLOW_INDICATION);
        if (SolaceConstants.ReceiverConstants.PERSISTENT_RECEIVER_CREATE_ON_START_MISSING_RESOURCES.equals(typedProperties.getProperty(SolaceProperties.ReceiverProperties.PERSISTENT_MISSING_RESOURCE_CREATION_STRATEGY))) {
            linkedList.add(CapabilityType.ENDPOINT_MANAGEMENT);
            if (!queue.isDurable()) {
                linkedList.add(CapabilityType.TEMP_ENDPOINT);
            }
        }
        if (typedProperties.getProperty(SolaceProperties.ReceiverProperties.PERSISTENT_MESSAGE_REPLAY_STRATEGY) != null) {
            linkedList.add(CapabilityType.MESSAGE_REPLAY);
        }
        return linkedList;
    }

    boolean isAutoAckConfigured(TypedProperties typedProperties) {
        return SolaceConstants.ReceiverConstants.PERSISTENT_RECEIVER_AUTO_ACK.equals(typedProperties.getProperty(SolaceProperties.ReceiverProperties.PERSISTENT_MESSAGE_ACK_STRATEGY));
    }

    Topic createTopicInstance(TopicSubscription topicSubscription) {
        return JCSMPFactory.onlyInstance().createTopic(topicSubscription.getName());
    }
}
