package dk.cloudcreate.essentials.components.foundation.messaging.eip.store_and_forward;

import dk.cloudcreate.essentials.components.foundation.fencedlock.FencedLock;
import dk.cloudcreate.essentials.components.foundation.fencedlock.FencedLockManager;
import dk.cloudcreate.essentials.components.foundation.fencedlock.LockCallback;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueueConsumer;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueues;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.Message;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.MessageMetaData;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueueName;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueuedMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.TransactionalMode;
import dk.cloudcreate.essentials.reactive.command.CommandBus;
import dk.cloudcreate.essentials.shared.FailFast;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;

/* loaded from: input_file:dk/cloudcreate/essentials/components/foundation/messaging/eip/store_and_forward/Inboxes.class */
public interface Inboxes {

    /* loaded from: input_file:dk/cloudcreate/essentials/components/foundation/messaging/eip/store_and_forward/Inboxes$DurableQueueBasedInboxes.class */
    public static class DurableQueueBasedInboxes implements Inboxes {
        private final DurableQueues durableQueues;
        private final FencedLockManager fencedLockManager;
        private ConcurrentMap<InboxName, Inbox> inboxes = new ConcurrentHashMap();

        /* loaded from: input_file:dk/cloudcreate/essentials/components/foundation/messaging/eip/store_and_forward/Inboxes$DurableQueueBasedInboxes$DurableQueueBasedInbox.class */
        public class DurableQueueBasedInbox implements Inbox {
            private Consumer<Message> messageConsumer;
            public final QueueName inboxQueueName;
            public final InboxConfig config;
            private DurableQueueConsumer durableQueueConsumer;

            public DurableQueueBasedInbox(DurableQueueBasedInboxes durableQueueBasedInboxes, InboxConfig inboxConfig, Consumer<Message> consumer) {
                this(inboxConfig);
                consume(consumer);
            }

            public DurableQueueBasedInbox(InboxConfig inboxConfig) {
                this.config = (InboxConfig) FailFast.requireNonNull(inboxConfig, "No inbox config provided");
                this.inboxQueueName = inboxConfig.inboxName.asQueueName();
            }

            @Override // dk.cloudcreate.essentials.components.foundation.messaging.eip.store_and_forward.Inbox
            public Inbox consume(Consumer<Message> consumer) {
                if (this.messageConsumer != null) {
                    throw new IllegalStateException("Inbox already has a message consumer");
                }
                setMessageConsumer(consumer);
                startConsuming();
                return this;
            }

            @Override // dk.cloudcreate.essentials.components.foundation.messaging.eip.store_and_forward.Inbox
            public Inbox setMessageConsumer(Consumer<Message> consumer) {
                this.messageConsumer = (Consumer) FailFast.requireNonNull(consumer, "No messageConsumer provided");
                return this;
            }

            @Override // dk.cloudcreate.essentials.components.foundation.messaging.eip.store_and_forward.Inbox
            public Inbox startConsuming() {
                if (this.messageConsumer == null) {
                    throw new IllegalStateException("No message consumer specified. Please call #setMessageConsumer");
                }
                switch (this.config.messageConsumptionMode) {
                    case SingleGlobalConsumer:
                        DurableQueueBasedInboxes.this.fencedLockManager.acquireLockAsync(this.config.inboxName.asLockName(), LockCallback.builder().onLockAcquired(fencedLock -> {
                            this.durableQueueConsumer = consumeFromDurableQueue(fencedLock);
                        }).onLockReleased(fencedLock2 -> {
                            this.durableQueueConsumer.cancel();
                        }).build());
                        break;
                    case GlobalCompetingConsumers:
                        this.durableQueueConsumer = consumeFromDurableQueue(null);
                        break;
                    default:
                        throw new IllegalStateException("Unexpected messageConsumptionMode: " + this.config.messageConsumptionMode);
                }
                return this;
            }

            @Override // dk.cloudcreate.essentials.components.foundation.messaging.eip.store_and_forward.Inbox
            public boolean hasAMessageConsumer() {
                return this.messageConsumer != null;
            }

            @Override // dk.cloudcreate.essentials.components.foundation.messaging.eip.store_and_forward.Inbox
            public boolean isConsumingMessages() {
                return this.durableQueueConsumer != null;
            }

            @Override // dk.cloudcreate.essentials.components.foundation.messaging.eip.store_and_forward.Inbox
            public Inbox stopConsuming() {
                if (this.messageConsumer != null) {
                    switch (this.config.messageConsumptionMode) {
                        case SingleGlobalConsumer:
                            DurableQueueBasedInboxes.this.fencedLockManager.cancelAsyncLockAcquiring(this.config.inboxName.asLockName());
                            break;
                        case GlobalCompetingConsumers:
                            if (this.durableQueueConsumer != null) {
                                this.durableQueueConsumer.cancel();
                                this.durableQueueConsumer = null;
                                break;
                            }
                            break;
                        default:
                            throw new IllegalStateException("Unexpected messageConsumptionMode: " + this.config.messageConsumptionMode);
                    }
                    this.messageConsumer = null;
                }
                return this;
            }

            @Override // dk.cloudcreate.essentials.components.foundation.messaging.eip.store_and_forward.Inbox
            public InboxName name() {
                return this.config.inboxName;
            }

            @Override // dk.cloudcreate.essentials.components.foundation.messaging.eip.store_and_forward.Inbox
            public Inbox addMessageReceived(Message message) {
                if (DurableQueueBasedInboxes.this.durableQueues.getTransactionalMode() == TransactionalMode.FullyTransactional) {
                    DurableQueueBasedInboxes.this.durableQueues.getUnitOfWorkFactory().get().usingUnitOfWork(() -> {
                        DurableQueueBasedInboxes.this.durableQueues.queueMessage(this.inboxQueueName, message);
                    });
                } else {
                    DurableQueueBasedInboxes.this.durableQueues.queueMessage(this.inboxQueueName, message);
                }
                return this;
            }

            private DurableQueueConsumer consumeFromDurableQueue(FencedLock fencedLock) {
                return DurableQueueBasedInboxes.this.durableQueues.consumeFromQueue(this.inboxQueueName, this.config.redeliveryPolicy, this.config.numberOfParallelMessageConsumers, queuedMessage -> {
                    if (this.config.messageConsumptionMode == MessageConsumptionMode.SingleGlobalConsumer) {
                        queuedMessage.getMetaData().put(MessageMetaData.FENCED_LOCK_TOKEN, fencedLock.getCurrentToken().toString());
                    }
                    handleMessage(queuedMessage);
                });
            }

            private void handleMessage(QueuedMessage queuedMessage) {
                if (DurableQueueBasedInboxes.this.durableQueues.getUnitOfWorkFactory().isPresent()) {
                    DurableQueueBasedInboxes.this.durableQueues.getUnitOfWorkFactory().get().usingUnitOfWork(() -> {
                        this.messageConsumer.accept(queuedMessage.getMessage());
                    });
                } else {
                    this.messageConsumer.accept(queuedMessage.getMessage());
                }
            }

            @Override // dk.cloudcreate.essentials.components.foundation.messaging.eip.store_and_forward.Inbox
            public long getNumberOfUndeliveredMessages() {
                return DurableQueueBasedInboxes.this.durableQueues.getTotalMessagesQueuedFor(this.inboxQueueName);
            }

            public String toString() {
                return "DurableQueueBasedInbox{config=" + this.config + ", inboxQueueName=" + this.inboxQueueName + "}";
            }
        }

        public DurableQueueBasedInboxes(DurableQueues durableQueues, FencedLockManager fencedLockManager) {
            this.durableQueues = (DurableQueues) FailFast.requireNonNull(durableQueues, "No durableQueues instance provided");
            this.fencedLockManager = (FencedLockManager) FailFast.requireNonNull(fencedLockManager, "No fencedLockManager instance provided");
        }

        @Override // dk.cloudcreate.essentials.components.foundation.messaging.eip.store_and_forward.Inboxes
        public Inbox getOrCreateInbox(InboxConfig inboxConfig, Consumer<Message> consumer) {
            FailFast.requireNonNull(inboxConfig, "No inboxConfig provided");
            return this.inboxes.computeIfAbsent(inboxConfig.getInboxName(), inboxName -> {
                return new DurableQueueBasedInbox(this, inboxConfig, consumer);
            });
        }

        @Override // dk.cloudcreate.essentials.components.foundation.messaging.eip.store_and_forward.Inboxes
        public Inbox getOrCreateInbox(InboxConfig inboxConfig) {
            FailFast.requireNonNull(inboxConfig, "No inboxConfig provided");
            return this.inboxes.computeIfAbsent(inboxConfig.getInboxName(), inboxName -> {
                return new DurableQueueBasedInbox(inboxConfig);
            });
        }

        @Override // dk.cloudcreate.essentials.components.foundation.messaging.eip.store_and_forward.Inboxes
        public Collection<Inbox> getInboxes() {
            return this.inboxes.values();
        }
    }

    Inbox getOrCreateInbox(InboxConfig inboxConfig);

    Inbox getOrCreateInbox(InboxConfig inboxConfig, Consumer<Message> consumer);

    default Inbox getOrCreateInbox(InboxConfig inboxConfig, CommandBus commandBus) {
        FailFast.requireNonNull(commandBus, "No forwardTo command bus provided");
        return getOrCreateInbox(inboxConfig, message -> {
            commandBus.send(message.getPayload());
        });
    }

    Collection<Inbox> getInboxes();

    static Inboxes durableQueueBasedInboxes(DurableQueues durableQueues, FencedLockManager fencedLockManager) {
        return new DurableQueueBasedInboxes(durableQueues, fencedLockManager);
    }
}
