package dk.cloudcreate.essentials.components.foundation.test.messaging.queue;

import dk.cloudcreate.essentials.components.foundation.messaging.RedeliveryPolicy;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueueConsumer;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueues;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueueName;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueuedMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueuedMessageHandler;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.TransactionalMode;
import dk.cloudcreate.essentials.components.foundation.test.messaging.queue.test_data.CustomerId;
import dk.cloudcreate.essentials.components.foundation.test.messaging.queue.test_data.OrderEvent;
import dk.cloudcreate.essentials.components.foundation.test.messaging.queue.test_data.OrderId;
import dk.cloudcreate.essentials.components.foundation.test.messaging.queue.test_data.ProductId;
import dk.cloudcreate.essentials.components.foundation.transaction.UnitOfWork;
import dk.cloudcreate.essentials.components.foundation.transaction.UnitOfWorkFactory;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.SoftAssertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:dk/cloudcreate/essentials/components/foundation/test/messaging/queue/DistributedCompetingConsumersDurableQueuesIT.class */
public abstract class DistributedCompetingConsumersDurableQueuesIT<DURABLE_QUEUES extends DurableQueues, UOW extends UnitOfWork, UOW_FACTORY extends UnitOfWorkFactory<UOW>> {
    public static final int NUMBER_OF_MESSAGES = 1000;
    public static final int PARALLEL_CONSUMERS = 20;
    private UOW_FACTORY unitOfWorkFactory;
    private DURABLE_QUEUES durableQueues1;
    private DURABLE_QUEUES durableQueues2;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:dk/cloudcreate/essentials/components/foundation/test/messaging/queue/DistributedCompetingConsumersDurableQueuesIT$RecordingQueuedMessageHandler.class */
    public static class RecordingQueuedMessageHandler implements QueuedMessageHandler {
        ConcurrentLinkedQueue<OrderEvent> messages = new ConcurrentLinkedQueue<>();

        private RecordingQueuedMessageHandler() {
        }

        public void handle(QueuedMessage queuedMessage) {
            this.messages.add((OrderEvent) queuedMessage.getPayload());
        }
    }

    @BeforeEach
    void setup() {
        this.unitOfWorkFactory = createUnitOfWorkFactory();
        resetQueueStorage(this.unitOfWorkFactory);
        this.durableQueues1 = createDurableQueues(this.unitOfWorkFactory);
        this.durableQueues1.start();
        this.durableQueues2 = createDurableQueues(this.unitOfWorkFactory);
        this.durableQueues2.start();
    }

    @AfterEach
    void cleanup() {
        if (this.durableQueues1 != null) {
            this.durableQueues1.stop();
        }
        if (this.durableQueues2 != null) {
            this.durableQueues2.stop();
        }
    }

    protected abstract DURABLE_QUEUES createDurableQueues(UOW_FACTORY uow_factory);

    protected abstract UOW_FACTORY createUnitOfWorkFactory();

    protected abstract void resetQueueStorage(UOW_FACTORY uow_factory);

    protected void usingDurableQueue(Runnable runnable) {
        if (this.durableQueues1.getTransactionalMode() == TransactionalMode.FullyTransactional) {
            this.unitOfWorkFactory.usingUnitOfWork(unitOfWork -> {
                runnable.run();
            });
        } else {
            runnable.run();
        }
    }

    @Test
    void verify_queued_messages_are_dequeued_in_order() {
        Random random = new Random();
        QueueName of = QueueName.of("TestQueue");
        this.durableQueues1.purgeQueue(of);
        ArrayList arrayList = new ArrayList(1000);
        for (int i = 0; i < 1000; i++) {
            arrayList.add(i % 2 == 0 ? new OrderEvent.OrderAdded(OrderId.random(), CustomerId.random(), random.nextInt()) : i % 3 == 0 ? new OrderEvent.ProductAddedToOrder(OrderId.random(), ProductId.random(), random.nextInt()) : new OrderEvent.OrderAccepted(OrderId.random()));
        }
        usingDurableQueue(() -> {
            this.durableQueues1.queueMessages(of, arrayList);
        });
        Assertions.assertThat(this.durableQueues1.getTotalMessagesQueuedFor(of)).isEqualTo(1000);
        Assertions.assertThat(this.durableQueues2.getTotalMessagesQueuedFor(of)).isEqualTo(1000);
        RecordingQueuedMessageHandler recordingQueuedMessageHandler = new RecordingQueuedMessageHandler();
        RecordingQueuedMessageHandler recordingQueuedMessageHandler2 = new RecordingQueuedMessageHandler();
        DurableQueueConsumer consumeFromQueue = this.durableQueues1.consumeFromQueue(of, RedeliveryPolicy.fixedBackoff(Duration.ofMillis(1L), 5), 10, recordingQueuedMessageHandler);
        DurableQueueConsumer consumeFromQueue2 = this.durableQueues2.consumeFromQueue(of, RedeliveryPolicy.fixedBackoff(Duration.ofMillis(1L), 5), 10, recordingQueuedMessageHandler2);
        Awaitility.waitAtMost(Duration.ofSeconds(30L)).untilAsserted(() -> {
            Assertions.assertThat(recordingQueuedMessageHandler.messages.size() + recordingQueuedMessageHandler2.messages.size()).isEqualTo(1000);
        });
        Assertions.assertThat(recordingQueuedMessageHandler.messages.size()).isGreaterThan(0);
        Assertions.assertThat(recordingQueuedMessageHandler2.messages.size()).isGreaterThan(0);
        ArrayList arrayList2 = new ArrayList(recordingQueuedMessageHandler.messages);
        arrayList2.addAll(recordingQueuedMessageHandler2.messages);
        SoftAssertions softAssertions = new SoftAssertions();
        softAssertions.assertThat(arrayList2.stream().distinct().count()).isEqualTo(1000L);
        softAssertions.assertThat(arrayList2).containsAll(arrayList);
        softAssertions.assertThat(arrayList).containsAll(arrayList2);
        softAssertions.assertAll();
        Awaitility.await().during(Duration.ofMillis(1990L)).atMost(Duration.ofSeconds(2000L)).until(() -> {
            return Boolean.valueOf(recordingQueuedMessageHandler.messages.size() + recordingQueuedMessageHandler2.messages.size() == 1000);
        });
        consumeFromQueue.cancel();
        consumeFromQueue2.cancel();
    }
}
