package dk.cloudcreate.essentials.components.foundation.messaging.eip.store_and_forward;

import dk.cloudcreate.essentials.components.foundation.fencedlock.FencedLock;
import dk.cloudcreate.essentials.components.foundation.fencedlock.FencedLockManager;
import dk.cloudcreate.essentials.components.foundation.fencedlock.LockCallback;
import dk.cloudcreate.essentials.components.foundation.messaging.RedeliveryPolicy;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueueConsumer;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueues;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueueName;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueuedMessage;
import dk.cloudcreate.essentials.shared.FailFast;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;

/* loaded from: input_file:dk/cloudcreate/essentials/components/foundation/messaging/eip/store_and_forward/Inboxes.class */
public interface Inboxes {

    /* loaded from: input_file:dk/cloudcreate/essentials/components/foundation/messaging/eip/store_and_forward/Inboxes$DurableQueueBasedInboxes.class */
    public static class DurableQueueBasedInboxes implements Inboxes {
        private final DurableQueues durableQueues;
        private final FencedLockManager fencedLockManager;
        private ConcurrentMap<InboxName, Inbox<?>> inboxes = new ConcurrentHashMap();

        /* loaded from: input_file:dk/cloudcreate/essentials/components/foundation/messaging/eip/store_and_forward/Inboxes$DurableQueueBasedInboxes$DurableQueueBasedInbox.class */
        public class DurableQueueBasedInbox<MESSAGE_TYPE> implements Inbox<MESSAGE_TYPE> {
            public final InboxName inboxName;
            public final RedeliveryPolicy redeliveryPolicy;
            public final MessageConsumptionMode messageConsumptionMode;
            public final Consumer<MESSAGE_TYPE> messageConsumer;
            public final int numberOfParallelMessageConsumers;
            public final QueueName inboxQueueName;
            private DurableQueueConsumer durableQueueConsumer;

            public DurableQueueBasedInbox(InboxName inboxName, RedeliveryPolicy redeliveryPolicy, MessageConsumptionMode messageConsumptionMode, int i, Consumer<MESSAGE_TYPE> consumer) {
                this.inboxName = (InboxName) FailFast.requireNonNull(inboxName, "No inboxName provided");
                this.redeliveryPolicy = (RedeliveryPolicy) FailFast.requireNonNull(redeliveryPolicy, "No redeliveryPolicy provided");
                this.messageConsumptionMode = (MessageConsumptionMode) FailFast.requireNonNull(messageConsumptionMode, "No messageConsumptionMode specified");
                this.messageConsumer = (Consumer) FailFast.requireNonNull(consumer, "No messageConsumer provided");
                FailFast.requireTrue(i >= 1, "You must specify a number of parallelMessageConsumers >= 1");
                this.numberOfParallelMessageConsumers = i;
                this.inboxQueueName = inboxName.asQueueName();
                switch (messageConsumptionMode) {
                    case SingleGlobalConsumer:
                        DurableQueueBasedInboxes.this.fencedLockManager.acquireLockAsync(inboxName.asLockName(), new LockCallback() { // from class: dk.cloudcreate.essentials.components.foundation.messaging.eip.store_and_forward.Inboxes.DurableQueueBasedInboxes.DurableQueueBasedInbox.1
                            @Override // dk.cloudcreate.essentials.components.foundation.fencedlock.LockCallback
                            public void lockAcquired(FencedLock fencedLock) {
                                DurableQueueBasedInbox.this.durableQueueConsumer = DurableQueueBasedInbox.this.consumeFromDurableQueue();
                            }

                            @Override // dk.cloudcreate.essentials.components.foundation.fencedlock.LockCallback
                            public void lockReleased(FencedLock fencedLock) {
                                DurableQueueBasedInbox.this.durableQueueConsumer.cancel();
                            }
                        });
                        return;
                    case CompetingConsumers:
                        this.durableQueueConsumer = consumeFromDurableQueue();
                        return;
                    default:
                        throw new IllegalStateException("Unexpected messageConsumptionMode: " + messageConsumptionMode);
                }
            }

            @Override // dk.cloudcreate.essentials.components.foundation.messaging.eip.store_and_forward.Inbox
            public InboxName name() {
                return this.inboxName;
            }

            @Override // dk.cloudcreate.essentials.components.foundation.messaging.eip.store_and_forward.Inbox
            public void addMessageReceived(MESSAGE_TYPE message_type) {
                DurableQueueBasedInboxes.this.durableQueues.queueMessage(this.inboxQueueName, message_type);
            }

            private DurableQueueConsumer consumeFromDurableQueue() {
                return DurableQueueBasedInboxes.this.durableQueues.consumeFromQueue(this.inboxQueueName, this.redeliveryPolicy, this.numberOfParallelMessageConsumers, this::handleMessage);
            }

            private void handleMessage(QueuedMessage queuedMessage) {
                this.messageConsumer.accept(queuedMessage.payload);
            }

            @Override // dk.cloudcreate.essentials.components.foundation.messaging.eip.store_and_forward.Inbox
            public long getNumberOfUndeliveredMessages() {
                return DurableQueueBasedInboxes.this.durableQueues.getTotalMessagesQueuedFor(this.inboxQueueName);
            }

            public String toString() {
                return "DurableQueueBasedInbox{inboxName=" + this.inboxName + ", inboxQueueName=" + this.inboxQueueName + ", messageConsumptionMode=" + this.messageConsumptionMode + ", numberOfParallelMessageConsumers=" + this.numberOfParallelMessageConsumers + "}";
            }
        }

        public DurableQueueBasedInboxes(DurableQueues durableQueues, FencedLockManager fencedLockManager) {
            this.durableQueues = (DurableQueues) FailFast.requireNonNull(durableQueues, "No durableQueues instance provided");
            this.fencedLockManager = (FencedLockManager) FailFast.requireNonNull(fencedLockManager, "No fencedLockManager instance provided");
        }

        @Override // dk.cloudcreate.essentials.components.foundation.messaging.eip.store_and_forward.Inboxes
        public <MESSAGE_TYPE> Inbox<MESSAGE_TYPE> getOrCreateInbox(InboxName inboxName, RedeliveryPolicy redeliveryPolicy, MessageConsumptionMode messageConsumptionMode, int i, Consumer<MESSAGE_TYPE> consumer) {
            FailFast.requireNonNull(inboxName, "No inboxName provided");
            return (Inbox) this.inboxes.computeIfAbsent(inboxName, inboxName2 -> {
                return new DurableQueueBasedInbox(inboxName, redeliveryPolicy, messageConsumptionMode, i, consumer);
            });
        }

        @Override // dk.cloudcreate.essentials.components.foundation.messaging.eip.store_and_forward.Inboxes
        public Collection<Inbox<?>> getInboxes() {
            return this.inboxes.values();
        }
    }

    <MESSAGE_TYPE> Inbox<MESSAGE_TYPE> getOrCreateInbox(InboxName inboxName, RedeliveryPolicy redeliveryPolicy, MessageConsumptionMode messageConsumptionMode, int i, Consumer<MESSAGE_TYPE> consumer);

    Collection<Inbox<?>> getInboxes();

    static Inboxes durableQueueBasedInboxes(DurableQueues durableQueues, FencedLockManager fencedLockManager) {
        return new DurableQueueBasedInboxes(durableQueues, fencedLockManager);
    }
}
