package dk.cloudcreate.essentials.components.foundation.test.messaging.queue;

import dk.cloudcreate.essentials.components.foundation.messaging.RedeliveryPolicy;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueueConsumer;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueues;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.Message;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.MessageMetaData;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueueName;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueuedMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueuedMessageHandler;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.TransactionalMode;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.ConsumeFromQueue;
import dk.cloudcreate.essentials.components.foundation.test.messaging.queue.test_data.CustomerId;
import dk.cloudcreate.essentials.components.foundation.test.messaging.queue.test_data.OrderEvent;
import dk.cloudcreate.essentials.components.foundation.test.messaging.queue.test_data.OrderId;
import dk.cloudcreate.essentials.components.foundation.test.messaging.queue.test_data.ProductId;
import dk.cloudcreate.essentials.components.foundation.transaction.UnitOfWork;
import dk.cloudcreate.essentials.components.foundation.transaction.UnitOfWorkFactory;
import dk.cloudcreate.essentials.components.foundation.types.CorrelationId;
import dk.cloudcreate.essentials.shared.MessageFormatter;
import dk.cloudcreate.essentials.shared.concurrent.ThreadFactoryBuilder;
import dk.cloudcreate.essentials.shared.time.StopWatch;
import dk.cloudcreate.essentials.shared.time.Timing;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.SoftAssertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:dk/cloudcreate/essentials/components/foundation/test/messaging/queue/LocalCompetingConsumersDurableQueueIT.class */
public abstract class LocalCompetingConsumersDurableQueueIT<DURABLE_QUEUES extends DurableQueues, UOW extends UnitOfWork, UOW_FACTORY extends UnitOfWorkFactory<UOW>> {
    public static final int NUMBER_OF_MESSAGES = 2000;
    public static final int PARALLEL_CONSUMERS = 20;
    private UOW_FACTORY unitOfWorkFactory;
    private DURABLE_QUEUES durableQueues;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:dk/cloudcreate/essentials/components/foundation/test/messaging/queue/LocalCompetingConsumersDurableQueueIT$RecordingQueuedMessageHandler.class */
    public static class RecordingQueuedMessageHandler implements QueuedMessageHandler {
        ConcurrentLinkedQueue<Message> messages = new ConcurrentLinkedQueue<>();

        private RecordingQueuedMessageHandler() {
        }

        public void handle(QueuedMessage queuedMessage) {
            this.messages.add(queuedMessage.getMessage());
        }
    }

    @BeforeEach
    void setup() {
        this.unitOfWorkFactory = createUnitOfWorkFactory();
        resetQueueStorage(this.unitOfWorkFactory);
        this.durableQueues = createDurableQueues(this.unitOfWorkFactory);
        this.durableQueues.start();
    }

    @AfterEach
    void cleanup() {
        if (this.durableQueues != null) {
            this.durableQueues.stop();
        }
    }

    protected abstract DURABLE_QUEUES createDurableQueues(UOW_FACTORY uow_factory);

    protected abstract UOW_FACTORY createUnitOfWorkFactory();

    protected abstract void resetQueueStorage(UOW_FACTORY uow_factory);

    protected Timing usingDurableQueue(String str, Runnable runnable) {
        StopWatch start = StopWatch.start(str);
        if (this.durableQueues.getTransactionalMode() == TransactionalMode.FullyTransactional) {
            this.unitOfWorkFactory.usingUnitOfWork(unitOfWork -> {
                runnable.run();
            });
        } else {
            runnable.run();
        }
        Timing stop = start.stop();
        System.out.println(stop);
        return stop;
    }

    @Test
    void verify_queued_messages_are_dequeued_in_order() throws InterruptedException {
        Random random = new Random();
        QueueName of = QueueName.of("TestQueue");
        this.durableQueues.purgeQueue(of);
        ArrayList arrayList = new ArrayList(2000);
        for (int i = 0; i < 2000; i++) {
            arrayList.add(i % 2 == 0 ? Message.of(new OrderEvent.OrderAdded(OrderId.random(), CustomerId.random(), random.nextInt()), MessageMetaData.of("correlation_id", CorrelationId.random(), "trace_id", UUID.randomUUID().toString())) : i % 3 == 0 ? Message.of(new OrderEvent.ProductAddedToOrder(OrderId.random(), ProductId.random(), random.nextInt()), MessageMetaData.of("correlation_id", CorrelationId.random(), "trace_id", UUID.randomUUID().toString())) : Message.of(new OrderEvent.OrderAccepted(OrderId.random()), MessageMetaData.of("correlation_id", CorrelationId.random(), "trace_id", UUID.randomUUID().toString())));
        }
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(usingDurableQueue(MessageFormatter.msg("Queuing {} messages", new Object[]{2000}), () -> {
            this.durableQueues.queueMessages(of, arrayList);
        }));
        Assertions.assertThat(this.durableQueues.getTotalMessagesQueuedFor(of)).isEqualTo(2000);
        RecordingQueuedMessageHandler recordingQueuedMessageHandler = new RecordingQueuedMessageHandler();
        StopWatch start = StopWatch.start(MessageFormatter.msg("Consuming {} messages using {} parallel consumers", new Object[]{2000, 20}));
        DurableQueueConsumer consumeFromQueue = this.durableQueues.consumeFromQueue(ConsumeFromQueue.builder().setQueueName(of).setRedeliveryPolicy(RedeliveryPolicy.fixedBackoff(Duration.ofMillis(1L), 5)).setQueueMessageHandler(recordingQueuedMessageHandler).setParallelConsumers(20).setConsumerExecutorService(Executors.newScheduledThreadPool(20, ThreadFactoryBuilder.builder().daemon(true).nameFormat(of + "-Consume-Messages-%d").build())).build());
        Awaitility.waitAtMost(Duration.ofSeconds(30L)).untilAsserted(() -> {
            Assertions.assertThat(recordingQueuedMessageHandler.messages.size()).isEqualTo(2000);
        });
        Timing stop = start.stop();
        arrayList2.add(stop);
        System.out.println(stop);
        ArrayList arrayList3 = new ArrayList(recordingQueuedMessageHandler.messages);
        SoftAssertions softAssertions = new SoftAssertions();
        softAssertions.assertThat(arrayList3.stream().distinct().count()).isEqualTo(2000L);
        softAssertions.assertThat(arrayList3).containsAll(arrayList);
        softAssertions.assertThat(arrayList).containsAll(arrayList3);
        softAssertions.assertAll();
        Awaitility.await().during(Duration.ofMillis(1990L)).atMost(Duration.ofSeconds(2000L)).until(() -> {
            return Boolean.valueOf(recordingQueuedMessageHandler.messages.size() == 2000);
        });
        consumeFromQueue.cancel();
        System.out.println(arrayList2);
    }
}
