package dk.cloudcreate.essentials.components.foundation.messaging.queue;

import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueues;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.ConsumeFromQueue;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.GetNextMessageReadyForDelivery;
import dk.cloudcreate.essentials.components.foundation.transaction.UnitOfWork;
import dk.cloudcreate.essentials.components.foundation.transaction.UnitOfWorkFactory;
import dk.cloudcreate.essentials.shared.FailFast;
import dk.cloudcreate.essentials.shared.MessageFormatter;
import dk.cloudcreate.essentials.shared.concurrent.ThreadFactoryBuilder;
import java.time.Duration;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:dk/cloudcreate/essentials/components/foundation/messaging/queue/DefaultDurableQueueConsumer.class */
public class DefaultDurableQueueConsumer<DURABLE_QUEUES extends DurableQueues, UOW extends UnitOfWork, UOW_FACTORY extends UnitOfWorkFactory<UOW>> implements DurableQueueConsumer, DurableQueueConsumerNotifications {
    private static final Logger log = LoggerFactory.getLogger(DefaultDurableQueueConsumer.class);
    public static final Runnable NO_POSTPROCESSING_AFTER_PROCESS_NEXT_MESSAGE = () -> {
    };
    public final QueueName queueName;
    private final ConsumeFromQueue consumeFromQueue;
    private volatile boolean started;
    private final ScheduledExecutorService scheduler;
    private final DURABLE_QUEUES durableQueues;
    private final Consumer<DurableQueueConsumer> removeDurableQueueConsumer;
    private final UOW_FACTORY unitOfWorkFactory;
    private final QueuePollingOptimizer queuePollingOptimizer;
    private final long pollingIntervalMs;
    private final ConcurrentMap<Thread, OrderedMessage> orderedMessageDeliveryThreads = new ConcurrentHashMap();

    public DefaultDurableQueueConsumer(ConsumeFromQueue consumeFromQueue, UOW_FACTORY uow_factory, DURABLE_QUEUES durable_queues, Consumer<DurableQueueConsumer> consumer, long j, QueuePollingOptimizer queuePollingOptimizer) {
        this.consumeFromQueue = (ConsumeFromQueue) FailFast.requireNonNull(consumeFromQueue, "consumeFromQueue is missing");
        consumeFromQueue.validate();
        this.durableQueues = (DURABLE_QUEUES) FailFast.requireNonNull(durable_queues, "durableQueues is missing");
        if (durable_queues.getTransactionalMode() == TransactionalMode.FullyTransactional) {
            this.unitOfWorkFactory = (UOW_FACTORY) FailFast.requireNonNull(uow_factory, "You must specify a unitOfWorkFactory");
        } else {
            this.unitOfWorkFactory = null;
        }
        this.removeDurableQueueConsumer = (Consumer) FailFast.requireNonNull(consumer, "removeDurableQueueConsumer is missing");
        this.queueName = consumeFromQueue.queueName;
        this.pollingIntervalMs = j;
        if (queuePollingOptimizer != null) {
            this.queuePollingOptimizer = queuePollingOptimizer;
        } else {
            this.queuePollingOptimizer = QueuePollingOptimizer.None();
        }
        this.scheduler = consumeFromQueue.getConsumerExecutorService().orElseGet(() -> {
            return Executors.newScheduledThreadPool(consumeFromQueue.getParallelConsumers(), new ThreadFactoryBuilder().nameFormat("Queue-" + this.queueName + "-Polling-%d").daemon(true).build());
        });
    }

    @Override // dk.cloudcreate.essentials.components.foundation.Lifecycle
    public void start() {
        if (this.started) {
            return;
        }
        log.info("[{}] {} - Starting {} DurableQueueConsumer threads with polling interval {} ms", new Object[]{this.queueName, this.consumeFromQueue.consumerName, Integer.valueOf(this.consumeFromQueue.getParallelConsumers()), Long.valueOf(this.pollingIntervalMs)});
        for (int i = 0; i < this.consumeFromQueue.getParallelConsumers(); i++) {
            if (i > 0) {
                try {
                    Thread.sleep(10L);
                } catch (InterruptedException e) {
                }
            }
            this.scheduler.scheduleAtFixedRate(this::pollQueue, this.pollingIntervalMs, this.pollingIntervalMs, TimeUnit.MILLISECONDS);
        }
        this.started = true;
    }

    @Override // dk.cloudcreate.essentials.components.foundation.Lifecycle
    public void stop() {
        if (this.started) {
            log.info("[{}] {} - Stopping DurableQueueConsumer", this.queueName, this.consumeFromQueue.consumerName);
            this.started = false;
            try {
                this.scheduler.shutdownNow();
            } finally {
                this.removeDurableQueueConsumer.accept(this);
                log.info("[{}] {} - DurableQueueConsumer stopped", this.queueName, this.consumeFromQueue.consumerName);
            }
        }
    }

    @Override // dk.cloudcreate.essentials.components.foundation.Lifecycle
    public boolean isStarted() {
        return this.started;
    }

    @Override // dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueueConsumer
    public QueueName queueName() {
        return this.queueName;
    }

    @Override // dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueueConsumer
    public void cancel() {
        stop();
    }

    /* JADX WARN: Removed duplicated region for block: B:21:0x0216 A[Catch: DurableQueueException -> 0x021f, TryCatch #1 {DurableQueueException -> 0x021f, blocks: (B:7:0x001d, B:9:0x0029, B:11:0x003f, B:13:0x007b, B:15:0x008a, B:16:0x00be, B:18:0x00bf, B:21:0x0216, B:26:0x00d6, B:28:0x00e7, B:30:0x00f3, B:32:0x0103, B:34:0x0113, B:36:0x014a, B:38:0x0123, B:40:0x0171, B:43:0x017a, B:45:0x018b, B:47:0x0197, B:49:0x01a7, B:51:0x01b7, B:53:0x01ee, B:54:0x01c7), top: B:6:0x001d, inners: #0, #2 }] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void pollQueue() {
        /*
            Method dump skipped, instructions count: 581
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: dk.cloudcreate.essentials.components.foundation.messaging.queue.DefaultDurableQueueConsumer.pollQueue():void");
    }

    private Runnable processNextMessageReadyForDelivery() {
        try {
            return this.started ? (Runnable) this.durableQueues.getNextMessageReadyForDelivery(new GetNextMessageReadyForDelivery(this.queueName, resolveMessageKeysToExclude())).map(this::handleMessage).orElseGet(() -> {
                QueuePollingOptimizer queuePollingOptimizer = this.queuePollingOptimizer;
                Objects.requireNonNull(queuePollingOptimizer);
                return queuePollingOptimizer::queuePollingReturnedNoMessages;
            }) : NO_POSTPROCESSING_AFTER_PROCESS_NEXT_MESSAGE;
        } catch (Exception e) {
            log.error(MessageFormatter.msg("[{}] Error Polling Queue", new Object[]{this.queueName}), e);
            return NO_POSTPROCESSING_AFTER_PROCESS_NEXT_MESSAGE;
        }
    }

    private List<String> resolveMessageKeysToExclude() {
        OrderedMessage orderedMessage = this.orderedMessageDeliveryThreads.get(Thread.currentThread());
        HashSet hashSet = new HashSet(this.orderedMessageDeliveryThreads.values());
        hashSet.remove(orderedMessage);
        return (List) hashSet.stream().map((v0) -> {
            return v0.getKey();
        }).collect(Collectors.toList());
    }

    private Runnable handleMessage(QueuedMessage queuedMessage) {
        boolean z = queuedMessage.getMessage() instanceof OrderedMessage;
        Logger logger = log;
        Object[] objArr = new Object[7];
        objArr[0] = this.queueName;
        objArr[1] = queuedMessage.getId();
        objArr[2] = this.consumeFromQueue.consumerName;
        objArr[3] = z ? "Ordered " : "";
        objArr[4] = z ? MessageFormatter.msg(" {}:{}", new Object[]{((OrderedMessage) queuedMessage.getMessage()).getKey(), Long.valueOf(((OrderedMessage) queuedMessage.getMessage()).getOrder())}) : "";
        objArr[5] = Integer.valueOf(queuedMessage.getTotalDeliveryAttempts());
        objArr[6] = Integer.valueOf(queuedMessage.getRedeliveryAttempts());
        logger.debug("[{}:{}] {} - Delivering {}message{}. Total attempts: {}, Redelivery Attempts: {}", objArr);
        if (z) {
            this.orderedMessageDeliveryThreads.put(Thread.currentThread(), (OrderedMessage) queuedMessage.getMessage());
        }
        try {
            this.consumeFromQueue.queueMessageHandler.handle(queuedMessage);
            log.debug("[{}:{}] {} - Message handled successfully. Deleting the message in the Queue Store message. Total attempts: {}, Redelivery Attempts: {}", new Object[]{this.queueName, queuedMessage.getId(), this.consumeFromQueue.consumerName, Integer.valueOf(queuedMessage.getTotalDeliveryAttempts()), Integer.valueOf(queuedMessage.getRedeliveryAttempts())});
            this.durableQueues.acknowledgeMessageAsHandled(queuedMessage.getId());
            this.orderedMessageDeliveryThreads.remove(Thread.currentThread());
            return () -> {
                this.queuePollingOptimizer.queuePollingReturnedMessage(queuedMessage);
            };
        } catch (Exception e) {
            log.debug(MessageFormatter.msg("[{}:{}] {} - QueueMessageHandler for failed to handle message: {}", new Object[]{this.queueName, queuedMessage.getId(), this.consumeFromQueue.consumerName, queuedMessage}), e);
            boolean isPermanentError = this.consumeFromQueue.getRedeliveryPolicy().isPermanentError(queuedMessage, e);
            if (isPermanentError || queuedMessage.getTotalDeliveryAttempts() >= this.consumeFromQueue.getRedeliveryPolicy().maximumNumberOfRedeliveries + 1) {
                log.debug("[{}:{}] {} - Marking Message as Dead Letter. Is Permanent Error: {}. Message: {}", new Object[]{this.queueName, queuedMessage.getId(), this.consumeFromQueue.consumerName, Boolean.valueOf(isPermanentError), queuedMessage});
                try {
                    this.durableQueues.markAsDeadLetterMessage(queuedMessage.getId(), e);
                    this.orderedMessageDeliveryThreads.remove(Thread.currentThread());
                    return () -> {
                        this.queuePollingOptimizer.queuePollingReturnedMessage(queuedMessage);
                    };
                } catch (Exception e2) {
                    log.error(MessageFormatter.msg("[{}:{}] {} - Failed to mark the Message as a Dead Letter Message. Details: Is Permanent Error: {}. Message: {}", new Object[]{this.queueName, queuedMessage.getId(), this.consumeFromQueue.consumerName, Boolean.valueOf(isPermanentError), queuedMessage}), e2);
                    return NO_POSTPROCESSING_AFTER_PROCESS_NEXT_MESSAGE;
                }
            }
            Duration calculateNextRedeliveryDelay = this.consumeFromQueue.getRedeliveryPolicy().calculateNextRedeliveryDelay(queuedMessage.getRedeliveryAttempts());
            log.debug(MessageFormatter.msg("[{}:{}] {} - Using redeliveryDelay '{}' for QueueEntryId '{}' due to: {}", new Object[]{this.queueName, queuedMessage.getId(), this.consumeFromQueue.consumerName, calculateNextRedeliveryDelay, queuedMessage.getId(), e.getMessage()}));
            try {
                this.durableQueues.retryMessage(queuedMessage.getId(), e, calculateNextRedeliveryDelay);
                this.orderedMessageDeliveryThreads.remove(Thread.currentThread());
                return NO_POSTPROCESSING_AFTER_PROCESS_NEXT_MESSAGE;
            } catch (Exception e3) {
                if (e3.getMessage().contains("Interrupted waiting for lock")) {
                    log.debug(MessageFormatter.msg("[{}:{}] {} - Failed to register the message for retry.", new Object[]{this.queueName, queuedMessage.getId(), this.consumeFromQueue.consumerName}), e3);
                } else {
                    log.error(MessageFormatter.msg("[{}:{}] {} - Failed to register the message for retry.", new Object[]{this.queueName, queuedMessage.getId(), this.consumeFromQueue.consumerName}), e3);
                }
                return NO_POSTPROCESSING_AFTER_PROCESS_NEXT_MESSAGE;
            }
        }
    }

    @Override // dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueueConsumerNotifications
    public void messageAdded(QueuedMessage queuedMessage) {
        this.queuePollingOptimizer.messageAdded(queuedMessage);
    }

    public String toString() {
        return "DurableQueueConsumer{, started=" + this.started + this.consumeFromQueue.toString() + "}";
    }
}
