package dk.cloudcreate.essentials.components.foundation.messaging.queue;

import dk.cloudcreate.essentials.components.foundation.Lifecycle;
import dk.cloudcreate.essentials.components.foundation.messaging.RedeliveryPolicy;
import dk.cloudcreate.essentials.components.foundation.transaction.UnitOfWork;
import dk.cloudcreate.essentials.components.foundation.transaction.UnitOfWorkFactory;
import dk.cloudcreate.essentials.shared.FailFast;
import dk.cloudcreate.essentials.shared.MessageFormatter;
import dk.cloudcreate.essentials.shared.concurrent.ThreadFactoryBuilder;
import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:dk/cloudcreate/essentials/components/foundation/messaging/queue/DurableQueueConsumer.class */
public interface DurableQueueConsumer extends Lifecycle {

    /* loaded from: input_file:dk/cloudcreate/essentials/components/foundation/messaging/queue/DurableQueueConsumer$DefaultDurableQueueConsumer.class */
    public static class DefaultDurableQueueConsumer<DURABLE_QUEUES extends DurableQueues, UOW extends UnitOfWork, UOW_FACTORY extends UnitOfWorkFactory<UOW>> implements DurableQueueConsumer {
        private static final Logger log = LoggerFactory.getLogger(DefaultDurableQueueConsumer.class);
        private final RedeliveryPolicy redeliveryPolicy;
        private final QueueName queueName;
        private final QueuedMessageHandler queuedMessageHandler;
        private final ScheduledExecutorService scheduler;
        private final int numberOfParallelMessageConsumers;
        private final DURABLE_QUEUES durableQueues;
        private Consumer<DurableQueueConsumer> removeDurableQueueConsumer;
        private UOW_FACTORY unitOfWorkFactory;
        private volatile boolean started;

        public DefaultDurableQueueConsumer(QueueName queueName, QueuedMessageHandler queuedMessageHandler, RedeliveryPolicy redeliveryPolicy, int i, UOW_FACTORY uow_factory, DURABLE_QUEUES durable_queues, Consumer<DurableQueueConsumer> consumer) {
            this.queueName = (QueueName) FailFast.requireNonNull(queueName, "queueName is missing");
            this.queuedMessageHandler = (QueuedMessageHandler) FailFast.requireNonNull(queuedMessageHandler, "You must specify a queuedMessageHandler");
            this.redeliveryPolicy = (RedeliveryPolicy) FailFast.requireNonNull(redeliveryPolicy, "You must specify a redelivery policy");
            this.durableQueues = (DURABLE_QUEUES) FailFast.requireNonNull(durable_queues, "durableQueues is missing");
            if (durable_queues.getTransactionalMode() == TransactionalMode.FullyTransactional) {
                this.unitOfWorkFactory = (UOW_FACTORY) FailFast.requireNonNull(uow_factory, "You must specify a unitOfWorkFactory");
            }
            this.removeDurableQueueConsumer = (Consumer) FailFast.requireNonNull(consumer, "removeDurableQueueConsumer is missing");
            FailFast.requireTrue(i >= 1, "You must specify a number of parallelMessageConsumers >= 1");
            this.numberOfParallelMessageConsumers = i;
            this.scheduler = Executors.newScheduledThreadPool(this.numberOfParallelMessageConsumers, new ThreadFactoryBuilder().nameFormat("Queue-" + queueName + "-Polling-%d").daemon(true).build());
        }

        @Override // dk.cloudcreate.essentials.components.foundation.Lifecycle
        public void start() {
            if (this.started) {
                return;
            }
            log.info("[{}] Starting {} DurableQueueConsumer threads with polling interval {} (based on initialRedeliveryDelay)", new Object[]{this.queueName, Integer.valueOf(this.numberOfParallelMessageConsumers), this.redeliveryPolicy.initialRedeliveryDelay});
            for (int i = 0; i < this.numberOfParallelMessageConsumers; i++) {
                if (i > 0) {
                    try {
                        Thread.sleep(10L);
                    } catch (InterruptedException e) {
                    }
                }
                this.scheduler.scheduleAtFixedRate(this::pollQueue, this.redeliveryPolicy.initialRedeliveryDelay.toMillis(), this.redeliveryPolicy.initialRedeliveryDelay.toMillis(), TimeUnit.MILLISECONDS);
            }
            this.started = true;
        }

        @Override // dk.cloudcreate.essentials.components.foundation.Lifecycle
        public void stop() {
            if (this.started) {
                log.info("[{}] Stopping DurableQueueConsumer", this.queueName);
                this.scheduler.shutdownNow();
                this.started = false;
                this.removeDurableQueueConsumer.accept(this);
                log.info("[{}] DurableQueueConsumer stopped", this.queueName);
            }
        }

        @Override // dk.cloudcreate.essentials.components.foundation.Lifecycle
        public boolean isStarted() {
            return this.started;
        }

        @Override // dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueueConsumer
        public QueueName queueName() {
            return this.queueName;
        }

        @Override // dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueueConsumer
        public void cancel() {
            stop();
        }

        private void pollQueue() {
            log.trace("[{}] Polling Queue for the next message ready for delivery", this.queueName);
            if (this.durableQueues.getTransactionalMode() != TransactionalMode.FullyTransactional) {
                processNextMessageReadyForDelivery();
            } else {
                if (this.unitOfWorkFactory.getCurrentUnitOfWork().isPresent()) {
                    throw new DurableQueueException(MessageFormatter.msg("Previous UnitOfWork isn't completed/removed: {}", new Object[]{this.unitOfWorkFactory.getCurrentUnitOfWork().get()}), this.queueName);
                }
                this.unitOfWorkFactory.usingUnitOfWork(unitOfWork -> {
                    processNextMessageReadyForDelivery();
                });
            }
        }

        private void processNextMessageReadyForDelivery() {
            try {
                this.durableQueues.getNextMessageReadyForDelivery(this.queueName).map(queuedMessage -> {
                    log.debug("[{}:{}] Delivering message. Total attempts: {}, Redelivery Attempts: {}", new Object[]{this.queueName, queuedMessage.getId(), Integer.valueOf(queuedMessage.getTotalDeliveryAttempts()), Integer.valueOf(queuedMessage.getRedeliveryAttempts())});
                    try {
                        this.queuedMessageHandler.handle(queuedMessage);
                        log.debug("[{}:{}] Message handled successfully. Deleting the message in the Queue Store message. Total attempts: {}, Redelivery Attempts: {}", new Object[]{this.queueName, queuedMessage.getId(), Integer.valueOf(queuedMessage.getTotalDeliveryAttempts()), Integer.valueOf(queuedMessage.getRedeliveryAttempts())});
                        return Boolean.valueOf(this.durableQueues.acknowledgeMessageAsHandled(queuedMessage.getId()));
                    } catch (Exception e) {
                        log.debug(MessageFormatter.msg("[{}:{}] QueueMessageHandler for failed to handle: {}", new Object[]{this.queueName, queuedMessage.getId(), queuedMessage}), e);
                        if (queuedMessage.getTotalDeliveryAttempts() >= this.redeliveryPolicy.maximumNumberOfRedeliveries + 1) {
                            log.debug("[{}:{}] Marking Message as Dead Letter: {}", new Object[]{this.queueName, queuedMessage.getId(), queuedMessage});
                            return Boolean.valueOf(this.durableQueues.markAsDeadLetterMessage(queuedMessage.getId(), e));
                        }
                        Duration calculateNextRedeliveryDelay = this.redeliveryPolicy.calculateNextRedeliveryDelay(queuedMessage.getRedeliveryAttempts());
                        log.debug(MessageFormatter.msg("[{}:{}] Using redeliveryDelay '{}' for QueueEntryId '{}' due to: {}", new Object[]{this.queueName, queuedMessage.getId(), calculateNextRedeliveryDelay, queuedMessage.getId(), e.getMessage()}));
                        return Boolean.valueOf(this.durableQueues.retryMessage(queuedMessage.getId(), e, calculateNextRedeliveryDelay));
                    }
                });
            } catch (Exception e) {
                log.error(MessageFormatter.msg("[{}] Error Polling Queue", new Object[]{this.queueName}), e);
            }
        }
    }

    QueueName queueName();

    void cancel();
}
