package dk.cloudcreate.essentials.components.foundation.reactive.command;

import dk.cloudcreate.essentials.components.foundation.Lifecycle;
import dk.cloudcreate.essentials.components.foundation.messaging.RedeliveryPolicy;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueueConsumer;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueues;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.Message;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueueName;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueuedMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.TransactionalMode;
import dk.cloudcreate.essentials.reactive.command.AbstractCommandBus;
import dk.cloudcreate.essentials.reactive.command.CommandHandler;
import dk.cloudcreate.essentials.reactive.command.SendAndDontWaitErrorHandler;
import dk.cloudcreate.essentials.reactive.command.interceptor.CommandBusInterceptor;
import dk.cloudcreate.essentials.reactive.command.interceptor.CommandBusInterceptorChain;
import dk.cloudcreate.essentials.shared.FailFast;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:dk/cloudcreate/essentials/components/foundation/reactive/command/DurableLocalCommandBus.class */
public class DurableLocalCommandBus extends AbstractCommandBus implements Lifecycle {
    private static final Logger log = LoggerFactory.getLogger(DurableLocalCommandBus.class);
    public static final QueueName DEFAULT_COMMAND_QUEUE_NAME = QueueName.of("DefaultCommandQueue");
    public static final RedeliveryPolicy DEFAULT_REDELIVERY_POLICY = RedeliveryPolicy.linearBackoff(Duration.ofMillis(150), Duration.ofMillis(1000), 20);
    private DurableQueues durableQueues;
    private int parallelSendAndDontWaitConsumers;
    private QueueName commandQueueName;
    private RedeliveryPolicy commandQueueRedeliveryPolicy;
    private boolean started;
    private DurableQueueConsumer durableQueueConsumer;

    public static DurableLocalCommandBusBuilder builder() {
        return new DurableLocalCommandBusBuilder();
    }

    public DurableLocalCommandBus(DurableQueues durableQueues) {
        super(new SendAndDontWaitErrorHandler.RethrowingSendAndDontWaitErrorHandler(), List.of());
        this.parallelSendAndDontWaitConsumers = 10;
        this.commandQueueName = DEFAULT_COMMAND_QUEUE_NAME;
        this.commandQueueRedeliveryPolicy = DEFAULT_REDELIVERY_POLICY;
        this.durableQueues = (DurableQueues) FailFast.requireNonNull(durableQueues, "No durableQueues instance provided");
    }

    public DurableLocalCommandBus(DurableQueues durableQueues, QueueName queueName, RedeliveryPolicy redeliveryPolicy) {
        super(new SendAndDontWaitErrorHandler.RethrowingSendAndDontWaitErrorHandler(), List.of());
        this.parallelSendAndDontWaitConsumers = 10;
        this.commandQueueName = DEFAULT_COMMAND_QUEUE_NAME;
        this.commandQueueRedeliveryPolicy = DEFAULT_REDELIVERY_POLICY;
        this.durableQueues = (DurableQueues) FailFast.requireNonNull(durableQueues, "No durableQueues instance provided");
        this.commandQueueName = (QueueName) FailFast.requireNonNull(queueName, "No commandQueueName provided");
        this.commandQueueRedeliveryPolicy = (RedeliveryPolicy) FailFast.requireNonNull(redeliveryPolicy, "No commandQueueRedeliveryPolicy provided");
    }

    public DurableLocalCommandBus(DurableQueues durableQueues, SendAndDontWaitErrorHandler sendAndDontWaitErrorHandler) {
        super(sendAndDontWaitErrorHandler, List.of());
        this.parallelSendAndDontWaitConsumers = 10;
        this.commandQueueName = DEFAULT_COMMAND_QUEUE_NAME;
        this.commandQueueRedeliveryPolicy = DEFAULT_REDELIVERY_POLICY;
        this.durableQueues = (DurableQueues) FailFast.requireNonNull(durableQueues, "No durableQueues instance provided");
    }

    public DurableLocalCommandBus(DurableQueues durableQueues, QueueName queueName, SendAndDontWaitErrorHandler sendAndDontWaitErrorHandler) {
        super(sendAndDontWaitErrorHandler, List.of());
        this.parallelSendAndDontWaitConsumers = 10;
        this.commandQueueName = DEFAULT_COMMAND_QUEUE_NAME;
        this.commandQueueRedeliveryPolicy = DEFAULT_REDELIVERY_POLICY;
        this.durableQueues = (DurableQueues) FailFast.requireNonNull(durableQueues, "No durableQueues instance provided");
        this.commandQueueName = (QueueName) FailFast.requireNonNull(queueName, "No commandQueueName provided");
    }

    public DurableLocalCommandBus(DurableQueues durableQueues, List<CommandBusInterceptor> list) {
        super(new SendAndDontWaitErrorHandler.RethrowingSendAndDontWaitErrorHandler(), list);
        this.parallelSendAndDontWaitConsumers = 10;
        this.commandQueueName = DEFAULT_COMMAND_QUEUE_NAME;
        this.commandQueueRedeliveryPolicy = DEFAULT_REDELIVERY_POLICY;
        this.durableQueues = (DurableQueues) FailFast.requireNonNull(durableQueues, "No durableQueues instance provided");
    }

    public DurableLocalCommandBus(DurableQueues durableQueues, QueueName queueName, List<CommandBusInterceptor> list) {
        super(new SendAndDontWaitErrorHandler.RethrowingSendAndDontWaitErrorHandler(), list);
        this.parallelSendAndDontWaitConsumers = 10;
        this.commandQueueName = DEFAULT_COMMAND_QUEUE_NAME;
        this.commandQueueRedeliveryPolicy = DEFAULT_REDELIVERY_POLICY;
        this.durableQueues = (DurableQueues) FailFast.requireNonNull(durableQueues, "No durableQueues instance provided");
        this.commandQueueName = (QueueName) FailFast.requireNonNull(queueName, "No commandQueueName provided");
    }

    public DurableLocalCommandBus(DurableQueues durableQueues, SendAndDontWaitErrorHandler sendAndDontWaitErrorHandler, List<CommandBusInterceptor> list) {
        super(sendAndDontWaitErrorHandler, list);
        this.parallelSendAndDontWaitConsumers = 10;
        this.commandQueueName = DEFAULT_COMMAND_QUEUE_NAME;
        this.commandQueueRedeliveryPolicy = DEFAULT_REDELIVERY_POLICY;
        this.durableQueues = (DurableQueues) FailFast.requireNonNull(durableQueues, "No durableQueues instance provided");
    }

    public DurableLocalCommandBus(DurableQueues durableQueues, int i, QueueName queueName, RedeliveryPolicy redeliveryPolicy, SendAndDontWaitErrorHandler sendAndDontWaitErrorHandler, List<CommandBusInterceptor> list) {
        super(sendAndDontWaitErrorHandler, list);
        this.parallelSendAndDontWaitConsumers = 10;
        this.commandQueueName = DEFAULT_COMMAND_QUEUE_NAME;
        this.commandQueueRedeliveryPolicy = DEFAULT_REDELIVERY_POLICY;
        FailFast.requireTrue(i >= 1, "parallelSendAndDontWaitConsumers is < 1");
        this.durableQueues = (DurableQueues) FailFast.requireNonNull(durableQueues, "No durableQueues instance provided");
        this.parallelSendAndDontWaitConsumers = i;
        this.commandQueueName = (QueueName) FailFast.requireNonNull(queueName, "No commandQueueName provided");
        this.commandQueueRedeliveryPolicy = (RedeliveryPolicy) FailFast.requireNonNull(redeliveryPolicy, "No commandQueueRedeliveryPolicy provided");
    }

    public DurableLocalCommandBus(DurableQueues durableQueues, QueueName queueName, SendAndDontWaitErrorHandler sendAndDontWaitErrorHandler, List<CommandBusInterceptor> list) {
        super(sendAndDontWaitErrorHandler, list);
        this.parallelSendAndDontWaitConsumers = 10;
        this.commandQueueName = DEFAULT_COMMAND_QUEUE_NAME;
        this.commandQueueRedeliveryPolicy = DEFAULT_REDELIVERY_POLICY;
        this.durableQueues = (DurableQueues) FailFast.requireNonNull(durableQueues, "No durableQueues instance provided");
        this.commandQueueName = (QueueName) FailFast.requireNonNull(queueName, "No commandQueueName provided");
    }

    public DurableLocalCommandBus(DurableQueues durableQueues, CommandBusInterceptor... commandBusInterceptorArr) {
        this(durableQueues, (List<CommandBusInterceptor>) List.of((Object[]) commandBusInterceptorArr));
    }

    public DurableLocalCommandBus(DurableQueues durableQueues, QueueName queueName, CommandBusInterceptor... commandBusInterceptorArr) {
        this(durableQueues, (List<CommandBusInterceptor>) List.of((Object[]) commandBusInterceptorArr));
        this.commandQueueName = (QueueName) FailFast.requireNonNull(queueName, "No commandQueueName provided");
    }

    public DurableLocalCommandBus(DurableQueues durableQueues, SendAndDontWaitErrorHandler sendAndDontWaitErrorHandler, CommandBusInterceptor... commandBusInterceptorArr) {
        this(durableQueues, sendAndDontWaitErrorHandler, (List<CommandBusInterceptor>) List.of((Object[]) commandBusInterceptorArr));
    }

    public DurableLocalCommandBus(DurableQueues durableQueues, int i, QueueName queueName, RedeliveryPolicy redeliveryPolicy, SendAndDontWaitErrorHandler sendAndDontWaitErrorHandler, CommandBusInterceptor... commandBusInterceptorArr) {
        this(durableQueues, i, queueName, redeliveryPolicy, sendAndDontWaitErrorHandler, (List<CommandBusInterceptor>) List.of((Object[]) commandBusInterceptorArr));
    }

    @Override // dk.cloudcreate.essentials.components.foundation.Lifecycle
    public void start() {
        if (this.started) {
            return;
        }
        log.info("Starting...");
        this.started = true;
        this.durableQueueConsumer = this.durableQueues.consumeFromQueue(this.commandQueueName, this.commandQueueRedeliveryPolicy, this.parallelSendAndDontWaitConsumers, this::processSendAndDontWaitMessage);
        log.info("Started");
    }

    @Override // dk.cloudcreate.essentials.components.foundation.Lifecycle
    public void stop() {
        if (this.started) {
            log.info("Stopping...");
            this.started = false;
            if (this.durableQueueConsumer != null) {
                this.durableQueueConsumer.stop();
                this.durableQueueConsumer = null;
            }
            log.info("Stopped");
        }
    }

    @Override // dk.cloudcreate.essentials.components.foundation.Lifecycle
    public boolean isStarted() {
        return this.started;
    }

    public <C> void sendAndDontWait(C c) {
        _sendAndDontWait(c, Optional.empty());
    }

    public <C> void sendAndDontWait(C c, Duration duration) {
        _sendAndDontWait(c, Optional.ofNullable(duration));
    }

    private <C> void _sendAndDontWait(C c, Optional<Duration> optional) {
        CommandHandler findCommandHandlerCapableOfHandling = findCommandHandlerCapableOfHandling(c);
        FailFast.requireNonNull(optional, "You must provide a messageDeliveryDelay value");
        if (optional.isPresent()) {
            log.debug("[{}] Queuing Durable delayed {} sendAndDontWait for command of type '{}' to {} '{}'. TransactionalMode: {}", new Object[]{this.commandQueueName, optional, c.getClass().getName(), CommandHandler.class.getSimpleName(), findCommandHandlerCapableOfHandling.toString(), this.durableQueues.getTransactionalMode()});
        } else {
            log.debug("[{}] Queuing Durable sendAndDontWait command of type '{}' to {} '{}'. TransactionalMode: {}", new Object[]{this.commandQueueName, c.getClass().getName(), CommandHandler.class.getSimpleName(), findCommandHandlerCapableOfHandling.toString(), this.durableQueues.getTransactionalMode()});
        }
        if (this.durableQueues.getTransactionalMode() == TransactionalMode.FullyTransactional) {
            this.durableQueues.getUnitOfWorkFactory().get().usingUnitOfWork(() -> {
                this.durableQueues.queueMessage(this.commandQueueName, Message.of(c), (Optional<Duration>) optional);
            });
        } else {
            this.durableQueues.queueMessage(this.commandQueueName, Message.of(c), optional);
        }
    }

    private void processSendAndDontWaitMessage(QueuedMessage queuedMessage) {
        Object payload = queuedMessage.getMessage().getPayload();
        CommandHandler findCommandHandlerCapableOfHandling = findCommandHandlerCapableOfHandling(payload);
        log.debug("[{}] Handling Durable sendAndDontWait command of type '{}' to {} '{}'", new Object[]{queuedMessage.getQueueName(), payload.getClass().getName(), CommandHandler.class.getSimpleName(), findCommandHandlerCapableOfHandling.toString()});
        CommandBusInterceptorChain.newInterceptorChain(queuedMessage, findCommandHandlerCapableOfHandling, this.interceptors, (commandBusInterceptor, commandBusInterceptorChain) -> {
            commandBusInterceptor.interceptSendAndDontWait(queuedMessage, commandBusInterceptorChain);
            return null;
        }, obj -> {
            try {
                return findCommandHandlerCapableOfHandling.handle(payload);
            } catch (Throwable th) {
                this.sendAndDontWaitErrorHandler.handleError(th, queuedMessage, findCommandHandlerCapableOfHandling);
                return null;
            }
        }).proceed();
    }

    public int getParallelSendAndDontWaitConsumers() {
        return this.parallelSendAndDontWaitConsumers;
    }

    public QueueName getCommandQueueName() {
        return this.commandQueueName;
    }

    public RedeliveryPolicy getCommandQueueRedeliveryPolicy() {
        return this.commandQueueRedeliveryPolicy;
    }
}
