package dk.cloudcreate.essentials.components.foundation.reactive.command;

import dk.cloudcreate.essentials.components.foundation.messaging.RedeliveryPolicy;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueueConsumer;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueues;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.Message;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueueName;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueuedMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.TransactionalMode;
import dk.cloudcreate.essentials.reactive.command.AbstractCommandBus;
import dk.cloudcreate.essentials.reactive.command.CommandHandler;
import dk.cloudcreate.essentials.reactive.command.SendAndDontWaitErrorHandler;
import dk.cloudcreate.essentials.reactive.command.interceptor.CommandBusInterceptor;
import dk.cloudcreate.essentials.reactive.command.interceptor.CommandBusInterceptorChain;
import dk.cloudcreate.essentials.shared.FailFast;
import dk.cloudcreate.essentials.shared.MessageFormatter;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:dk/cloudcreate/essentials/components/foundation/reactive/command/DurableLocalCommandBus.class */
public class DurableLocalCommandBus extends AbstractCommandBus {
    private static final Logger log = LoggerFactory.getLogger(DurableLocalCommandBus.class);
    private DurableQueues durableQueues;
    private int parallelSendAndDontWaitConsumers;
    private CommandQueueNameSelector commandQueueNameSelector;
    private CommandQueueRedeliveryPolicyResolver commandQueueRedeliveryPolicyResolver;
    private ConcurrentMap<QueueName, DurableQueueConsumer> queueConsumers;

    public static DurableLocalCommandBusBuilder builder() {
        return new DurableLocalCommandBusBuilder();
    }

    public DurableLocalCommandBus(DurableQueues durableQueues) {
        super(new SendAndDontWaitErrorHandler.RethrowingSendAndDontWaitErrorHandler(), List.of());
        this.parallelSendAndDontWaitConsumers = 10;
        this.commandQueueNameSelector = CommandQueueNameSelector.defaultCommandQueueForAllCommands();
        this.commandQueueRedeliveryPolicyResolver = CommandQueueRedeliveryPolicyResolver.sameReliveryPolicyForAllCommandQueues(RedeliveryPolicy.linearBackoff(Duration.ofMillis(150L), Duration.ofMillis(1000L), 20));
        this.queueConsumers = new ConcurrentHashMap();
        this.durableQueues = (DurableQueues) FailFast.requireNonNull(durableQueues, "No durableQueues instance provided");
    }

    public DurableLocalCommandBus(DurableQueues durableQueues, CommandQueueNameSelector commandQueueNameSelector, CommandQueueRedeliveryPolicyResolver commandQueueRedeliveryPolicyResolver) {
        super(new SendAndDontWaitErrorHandler.RethrowingSendAndDontWaitErrorHandler(), List.of());
        this.parallelSendAndDontWaitConsumers = 10;
        this.commandQueueNameSelector = CommandQueueNameSelector.defaultCommandQueueForAllCommands();
        this.commandQueueRedeliveryPolicyResolver = CommandQueueRedeliveryPolicyResolver.sameReliveryPolicyForAllCommandQueues(RedeliveryPolicy.linearBackoff(Duration.ofMillis(150L), Duration.ofMillis(1000L), 20));
        this.queueConsumers = new ConcurrentHashMap();
        this.durableQueues = (DurableQueues) FailFast.requireNonNull(durableQueues, "No durableQueues instance provided");
        this.commandQueueNameSelector = (CommandQueueNameSelector) FailFast.requireNonNull(commandQueueNameSelector, "No durableQueueNameSelector provided");
        this.commandQueueRedeliveryPolicyResolver = (CommandQueueRedeliveryPolicyResolver) FailFast.requireNonNull(commandQueueRedeliveryPolicyResolver, "No commandQueueRedeliveryPolicyResolver provided");
    }

    public DurableLocalCommandBus(DurableQueues durableQueues, SendAndDontWaitErrorHandler sendAndDontWaitErrorHandler) {
        super(sendAndDontWaitErrorHandler, List.of());
        this.parallelSendAndDontWaitConsumers = 10;
        this.commandQueueNameSelector = CommandQueueNameSelector.defaultCommandQueueForAllCommands();
        this.commandQueueRedeliveryPolicyResolver = CommandQueueRedeliveryPolicyResolver.sameReliveryPolicyForAllCommandQueues(RedeliveryPolicy.linearBackoff(Duration.ofMillis(150L), Duration.ofMillis(1000L), 20));
        this.queueConsumers = new ConcurrentHashMap();
        this.durableQueues = (DurableQueues) FailFast.requireNonNull(durableQueues, "No durableQueues instance provided");
    }

    public DurableLocalCommandBus(DurableQueues durableQueues, CommandQueueNameSelector commandQueueNameSelector, SendAndDontWaitErrorHandler sendAndDontWaitErrorHandler) {
        super(sendAndDontWaitErrorHandler, List.of());
        this.parallelSendAndDontWaitConsumers = 10;
        this.commandQueueNameSelector = CommandQueueNameSelector.defaultCommandQueueForAllCommands();
        this.commandQueueRedeliveryPolicyResolver = CommandQueueRedeliveryPolicyResolver.sameReliveryPolicyForAllCommandQueues(RedeliveryPolicy.linearBackoff(Duration.ofMillis(150L), Duration.ofMillis(1000L), 20));
        this.queueConsumers = new ConcurrentHashMap();
        this.durableQueues = (DurableQueues) FailFast.requireNonNull(durableQueues, "No durableQueues instance provided");
        this.commandQueueNameSelector = (CommandQueueNameSelector) FailFast.requireNonNull(commandQueueNameSelector, "No durableQueueNameSelector provided");
    }

    public DurableLocalCommandBus(DurableQueues durableQueues, List<CommandBusInterceptor> list) {
        super(new SendAndDontWaitErrorHandler.RethrowingSendAndDontWaitErrorHandler(), list);
        this.parallelSendAndDontWaitConsumers = 10;
        this.commandQueueNameSelector = CommandQueueNameSelector.defaultCommandQueueForAllCommands();
        this.commandQueueRedeliveryPolicyResolver = CommandQueueRedeliveryPolicyResolver.sameReliveryPolicyForAllCommandQueues(RedeliveryPolicy.linearBackoff(Duration.ofMillis(150L), Duration.ofMillis(1000L), 20));
        this.queueConsumers = new ConcurrentHashMap();
        this.durableQueues = (DurableQueues) FailFast.requireNonNull(durableQueues, "No durableQueues instance provided");
    }

    public DurableLocalCommandBus(DurableQueues durableQueues, CommandQueueNameSelector commandQueueNameSelector, List<CommandBusInterceptor> list) {
        super(new SendAndDontWaitErrorHandler.RethrowingSendAndDontWaitErrorHandler(), list);
        this.parallelSendAndDontWaitConsumers = 10;
        this.commandQueueNameSelector = CommandQueueNameSelector.defaultCommandQueueForAllCommands();
        this.commandQueueRedeliveryPolicyResolver = CommandQueueRedeliveryPolicyResolver.sameReliveryPolicyForAllCommandQueues(RedeliveryPolicy.linearBackoff(Duration.ofMillis(150L), Duration.ofMillis(1000L), 20));
        this.queueConsumers = new ConcurrentHashMap();
        this.durableQueues = (DurableQueues) FailFast.requireNonNull(durableQueues, "No durableQueues instance provided");
        this.commandQueueNameSelector = (CommandQueueNameSelector) FailFast.requireNonNull(commandQueueNameSelector, "No durableQueueNameSelector provided");
    }

    public DurableLocalCommandBus(DurableQueues durableQueues, SendAndDontWaitErrorHandler sendAndDontWaitErrorHandler, List<CommandBusInterceptor> list) {
        super(sendAndDontWaitErrorHandler, list);
        this.parallelSendAndDontWaitConsumers = 10;
        this.commandQueueNameSelector = CommandQueueNameSelector.defaultCommandQueueForAllCommands();
        this.commandQueueRedeliveryPolicyResolver = CommandQueueRedeliveryPolicyResolver.sameReliveryPolicyForAllCommandQueues(RedeliveryPolicy.linearBackoff(Duration.ofMillis(150L), Duration.ofMillis(1000L), 20));
        this.queueConsumers = new ConcurrentHashMap();
        this.durableQueues = (DurableQueues) FailFast.requireNonNull(durableQueues, "No durableQueues instance provided");
    }

    public DurableLocalCommandBus(DurableQueues durableQueues, int i, CommandQueueNameSelector commandQueueNameSelector, CommandQueueRedeliveryPolicyResolver commandQueueRedeliveryPolicyResolver, SendAndDontWaitErrorHandler sendAndDontWaitErrorHandler, List<CommandBusInterceptor> list) {
        super(sendAndDontWaitErrorHandler, list);
        this.parallelSendAndDontWaitConsumers = 10;
        this.commandQueueNameSelector = CommandQueueNameSelector.defaultCommandQueueForAllCommands();
        this.commandQueueRedeliveryPolicyResolver = CommandQueueRedeliveryPolicyResolver.sameReliveryPolicyForAllCommandQueues(RedeliveryPolicy.linearBackoff(Duration.ofMillis(150L), Duration.ofMillis(1000L), 20));
        this.queueConsumers = new ConcurrentHashMap();
        FailFast.requireTrue(i >= 1, "parallelSendAndDontWaitConsumers is < 1");
        this.durableQueues = (DurableQueues) FailFast.requireNonNull(durableQueues, "No durableQueues instance provided");
        this.parallelSendAndDontWaitConsumers = i;
        this.commandQueueNameSelector = (CommandQueueNameSelector) FailFast.requireNonNull(commandQueueNameSelector, "No durableQueueNameSelector provided");
        this.commandQueueRedeliveryPolicyResolver = (CommandQueueRedeliveryPolicyResolver) FailFast.requireNonNull(commandQueueRedeliveryPolicyResolver, "No commandQueueRedeliveryPolicyResolver provided");
    }

    public DurableLocalCommandBus(DurableQueues durableQueues, CommandQueueNameSelector commandQueueNameSelector, SendAndDontWaitErrorHandler sendAndDontWaitErrorHandler, List<CommandBusInterceptor> list) {
        super(sendAndDontWaitErrorHandler, list);
        this.parallelSendAndDontWaitConsumers = 10;
        this.commandQueueNameSelector = CommandQueueNameSelector.defaultCommandQueueForAllCommands();
        this.commandQueueRedeliveryPolicyResolver = CommandQueueRedeliveryPolicyResolver.sameReliveryPolicyForAllCommandQueues(RedeliveryPolicy.linearBackoff(Duration.ofMillis(150L), Duration.ofMillis(1000L), 20));
        this.queueConsumers = new ConcurrentHashMap();
        this.durableQueues = (DurableQueues) FailFast.requireNonNull(durableQueues, "No durableQueues instance provided");
        this.commandQueueNameSelector = (CommandQueueNameSelector) FailFast.requireNonNull(commandQueueNameSelector, "No durableQueueNameSelector provided");
    }

    public DurableLocalCommandBus(DurableQueues durableQueues, CommandBusInterceptor... commandBusInterceptorArr) {
        this(durableQueues, (List<CommandBusInterceptor>) List.of((Object[]) commandBusInterceptorArr));
    }

    public DurableLocalCommandBus(DurableQueues durableQueues, CommandQueueNameSelector commandQueueNameSelector, CommandBusInterceptor... commandBusInterceptorArr) {
        this(durableQueues, (List<CommandBusInterceptor>) List.of((Object[]) commandBusInterceptorArr));
        this.commandQueueNameSelector = (CommandQueueNameSelector) FailFast.requireNonNull(commandQueueNameSelector, "No durableQueueNameSelector provided");
    }

    public DurableLocalCommandBus(DurableQueues durableQueues, SendAndDontWaitErrorHandler sendAndDontWaitErrorHandler, CommandBusInterceptor... commandBusInterceptorArr) {
        this(durableQueues, sendAndDontWaitErrorHandler, (List<CommandBusInterceptor>) List.of((Object[]) commandBusInterceptorArr));
    }

    public DurableLocalCommandBus(DurableQueues durableQueues, int i, CommandQueueNameSelector commandQueueNameSelector, CommandQueueRedeliveryPolicyResolver commandQueueRedeliveryPolicyResolver, SendAndDontWaitErrorHandler sendAndDontWaitErrorHandler, CommandBusInterceptor... commandBusInterceptorArr) {
        this(durableQueues, i, commandQueueNameSelector, commandQueueRedeliveryPolicyResolver, sendAndDontWaitErrorHandler, (List<CommandBusInterceptor>) List.of((Object[]) commandBusInterceptorArr));
    }

    public <C> void sendAndDontWait(C c) {
        _sendAndDontWait(c, Optional.empty());
    }

    public <C> void sendAndDontWait(C c, Duration duration) {
        _sendAndDontWait(c, Optional.ofNullable(duration));
    }

    private <C> void _sendAndDontWait(C c, Optional<Duration> optional) {
        CommandHandler findCommandHandlerCapableOfHandling = findCommandHandlerCapableOfHandling(c);
        FailFast.requireNonNull(optional, "You must provide a messageDeliveryDelay value");
        QueueName selectDurableQueueNameFor = this.commandQueueNameSelector.selectDurableQueueNameFor(c, findCommandHandlerCapableOfHandling, optional);
        if (selectDurableQueueNameFor == null) {
            throw new IllegalStateException(MessageFormatter.msg("{} selected a <null> QueueName for the combination of CommandHandler: {},  messageDeliveryDelay: {}, Command: {}", new Object[]{CommandQueueNameSelector.class.getSimpleName(), findCommandHandlerCapableOfHandling.getClass().getName(), optional, c}));
        }
        this.queueConsumers.computeIfAbsent(selectDurableQueueNameFor, queueName -> {
            return this.durableQueues.consumeFromQueue(selectDurableQueueNameFor, this.commandQueueRedeliveryPolicyResolver.resolveRedeliveryPolicyFor(selectDurableQueueNameFor), this.parallelSendAndDontWaitConsumers, this::processSendAndDontWaitMessage);
        });
        if (optional.isPresent()) {
            log.debug("[{}] Queuing Durable delayed {} sendAndDontWait for command of type '{}' to {} '{}'. TransactionalMode: {}", new Object[]{selectDurableQueueNameFor, optional, c.getClass().getName(), CommandHandler.class.getSimpleName(), findCommandHandlerCapableOfHandling.toString(), this.durableQueues.getTransactionalMode()});
        } else {
            log.debug("[{}] Queuing Durable sendAndDontWait command of type '{}' to {} '{}'. TransactionalMode: {}", new Object[]{selectDurableQueueNameFor, c.getClass().getName(), CommandHandler.class.getSimpleName(), findCommandHandlerCapableOfHandling.toString(), this.durableQueues.getTransactionalMode()});
        }
        if (this.durableQueues.getTransactionalMode() == TransactionalMode.FullyTransactional) {
            this.durableQueues.getUnitOfWorkFactory().get().usingUnitOfWork(() -> {
                this.durableQueues.queueMessage(selectDurableQueueNameFor, Message.of(c), (Optional<Duration>) optional);
            });
        } else {
            this.durableQueues.queueMessage(selectDurableQueueNameFor, Message.of(c), optional);
        }
    }

    private void processSendAndDontWaitMessage(QueuedMessage queuedMessage) {
        Object payload = queuedMessage.getMessage().getPayload();
        CommandHandler findCommandHandlerCapableOfHandling = findCommandHandlerCapableOfHandling(payload);
        log.debug("[{}] Handling Durable sendAndDontWait command of type '{}' to {} '{}'", new Object[]{queuedMessage.getQueueName(), payload.getClass().getName(), CommandHandler.class.getSimpleName(), findCommandHandlerCapableOfHandling.toString()});
        CommandBusInterceptorChain.newInterceptorChain(queuedMessage, findCommandHandlerCapableOfHandling, this.interceptors, (commandBusInterceptor, commandBusInterceptorChain) -> {
            commandBusInterceptor.interceptSendAndDontWait(queuedMessage, commandBusInterceptorChain);
            return null;
        }, obj -> {
            try {
                return findCommandHandlerCapableOfHandling.handle(payload);
            } catch (Exception e) {
                this.sendAndDontWaitErrorHandler.handleError(e, queuedMessage, findCommandHandlerCapableOfHandling);
                return null;
            }
        }).proceed();
        if (this.durableQueues.getTransactionalMode() == TransactionalMode.SingleOperationTransaction) {
            this.durableQueues.acknowledgeMessageAsHandled(queuedMessage.getId());
        }
    }

    public int getParallelSendAndDontWaitConsumers() {
        return this.parallelSendAndDontWaitConsumers;
    }

    public CommandQueueNameSelector getCommandQueueNameSelector() {
        return this.commandQueueNameSelector;
    }

    public CommandQueueRedeliveryPolicyResolver getCommandQueueRedeliveryPolicyResolver() {
        return this.commandQueueRedeliveryPolicyResolver;
    }
}
