package dk.cloudcreate.essentials.components.foundation.test.messaging.queue;

import dk.cloudcreate.essentials.components.foundation.messaging.RedeliveryPolicy;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueueConsumer;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueues;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.Message;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.MessageMetaData;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.NextQueuedMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.OrderedMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueueEntryId;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueueName;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueuedMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueuedMessageHandler;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.TransactionalMode;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.ConsumeFromQueue;
import dk.cloudcreate.essentials.components.foundation.test.messaging.queue.test_data.CustomerId;
import dk.cloudcreate.essentials.components.foundation.test.messaging.queue.test_data.OrderEvent;
import dk.cloudcreate.essentials.components.foundation.test.messaging.queue.test_data.OrderId;
import dk.cloudcreate.essentials.components.foundation.test.messaging.queue.test_data.ProductId;
import dk.cloudcreate.essentials.components.foundation.test.reactive.command.AbstractDurableLocalCommandBusIT;
import dk.cloudcreate.essentials.components.foundation.transaction.UnitOfWork;
import dk.cloudcreate.essentials.components.foundation.transaction.UnitOfWorkFactory;
import dk.cloudcreate.essentials.components.foundation.types.CorrelationId;
import dk.cloudcreate.essentials.shared.collections.Lists;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:dk/cloudcreate/essentials/components/foundation/test/messaging/queue/DurableQueuesIT.class */
public abstract class DurableQueuesIT<DURABLE_QUEUES extends DurableQueues, UOW extends UnitOfWork, UOW_FACTORY extends UnitOfWorkFactory<UOW>> {
    protected UOW_FACTORY unitOfWorkFactory;
    protected DURABLE_QUEUES durableQueues;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:dk/cloudcreate/essentials/components/foundation/test/messaging/queue/DurableQueuesIT$RecordingQueuedMessageHandler.class */
    public static class RecordingQueuedMessageHandler implements QueuedMessageHandler {
        Consumer<Message> functionLogic;
        ConcurrentLinkedQueue<Message> messages = new ConcurrentLinkedQueue<>();

        RecordingQueuedMessageHandler() {
        }

        RecordingQueuedMessageHandler(Consumer<Message> consumer) {
            this.functionLogic = consumer;
        }

        public void handle(QueuedMessage queuedMessage) {
            this.messages.add(queuedMessage.getMessage());
            if (this.functionLogic != null) {
                this.functionLogic.accept(queuedMessage.getMessage());
            }
        }
    }

    @BeforeEach
    void setup() {
        this.unitOfWorkFactory = createUnitOfWorkFactory();
        resetQueueStorage(this.unitOfWorkFactory);
        this.durableQueues = createDurableQueues(this.unitOfWorkFactory);
        this.durableQueues.start();
    }

    @AfterEach
    void cleanup() {
        if (this.durableQueues != null) {
            this.durableQueues.stop();
        }
    }

    protected abstract DURABLE_QUEUES createDurableQueues(UOW_FACTORY uow_factory);

    protected abstract UOW_FACTORY createUnitOfWorkFactory();

    protected abstract void resetQueueStorage(UOW_FACTORY uow_factory);

    protected <R> R withDurableQueue(Supplier<R> supplier) {
        return this.durableQueues.getTransactionalMode() == TransactionalMode.FullyTransactional ? (R) this.unitOfWorkFactory.withUnitOfWork(unitOfWork -> {
            return supplier.get();
        }) : supplier.get();
    }

    protected void usingDurableQueue(Runnable runnable) {
        if (this.durableQueues.getTransactionalMode() == TransactionalMode.FullyTransactional) {
            this.unitOfWorkFactory.usingUnitOfWork(unitOfWork -> {
                runnable.run();
            });
        } else {
            runnable.run();
        }
    }

    @Test
    void test_simple_enqueueing_and_afterwards_querying_queued_messages() {
        QueueName of = QueueName.of("TestQueue");
        Message of2 = Message.of(new OrderEvent.OrderAdded(OrderId.random(), CustomerId.random(), 1234L), MessageMetaData.of("correlation_id", CorrelationId.random(), "trace_id", UUID.randomUUID().toString()));
        QueueEntryId queueEntryId = (QueueEntryId) withDurableQueue(() -> {
            return this.durableQueues.queueMessage(of, of2);
        });
        Assertions.assertThat(this.durableQueues.getQueueNameFor(queueEntryId)).isEqualTo(Optional.of(of));
        Assertions.assertThat(this.durableQueues.getQueueNameFor(QueueEntryId.random())).isEmpty();
        Assertions.assertThat(this.durableQueues.getQueueNames()).isEqualTo(Set.of(of));
        Message of3 = Message.of(new OrderEvent.ProductAddedToOrder(OrderId.random(), ProductId.random(), 2), MessageMetaData.of("correlation_id", CorrelationId.random(), "trace_id", UUID.randomUUID().toString()));
        QueueEntryId queueEntryId2 = (QueueEntryId) withDurableQueue(() -> {
            return this.durableQueues.queueMessage(of, of3);
        });
        Message of4 = Message.of(new OrderEvent.OrderAccepted(OrderId.random()));
        QueueEntryId queueEntryId3 = (QueueEntryId) withDurableQueue(() -> {
            return this.durableQueues.queueMessage(of, of4);
        });
        Assertions.assertThat(this.durableQueues.getTotalMessagesQueuedFor(of)).isEqualTo(3L);
        List queuedMessages = this.durableQueues.getQueuedMessages(of, DurableQueues.QueueingSortOrder.ASC, 0L, 20L);
        Assertions.assertThat(queuedMessages).hasSize(3);
        Assertions.assertThat((QueuedMessage) this.durableQueues.getQueuedMessage(queueEntryId).get()).isEqualTo(queuedMessages.get(0));
        Assertions.assertThat(((QueuedMessage) queuedMessages.get(0)).getMessage()).usingRecursiveComparison().isEqualTo(of2);
        Assertions.assertThat(((QueuedMessage) queuedMessages.get(0)).getId()).isEqualTo(queueEntryId);
        Assertions.assertThat(((QueuedMessage) queuedMessages.get(0)).getQueueName()).isEqualTo(of);
        Assertions.assertThat(((QueuedMessage) queuedMessages.get(0)).getAddedTimestamp()).isBefore(OffsetDateTime.now(Clock.systemUTC()));
        Assertions.assertThat(((QueuedMessage) queuedMessages.get(0)).getNextDeliveryTimestamp()).isBefore(OffsetDateTime.now(Clock.systemUTC()));
        Assertions.assertThat(((QueuedMessage) queuedMessages.get(0)).isDeadLetterMessage()).isFalse();
        Assertions.assertThat(((QueuedMessage) queuedMessages.get(0)).getLastDeliveryError()).isEqualTo((String) null);
        Assertions.assertThat(((QueuedMessage) queuedMessages.get(0)).getRedeliveryAttempts()).isEqualTo(0);
        Assertions.assertThat(((QueuedMessage) queuedMessages.get(0)).getTotalDeliveryAttempts()).isEqualTo(0);
        Assertions.assertThat((QueuedMessage) this.durableQueues.getQueuedMessage(queueEntryId2).get()).isEqualTo(queuedMessages.get(1));
        Assertions.assertThat(((QueuedMessage) queuedMessages.get(1)).getMessage()).usingRecursiveComparison().isEqualTo(of3);
        Assertions.assertThat(((QueuedMessage) queuedMessages.get(1)).getId()).isEqualTo(queueEntryId2);
        Assertions.assertThat(((QueuedMessage) queuedMessages.get(1)).getQueueName()).isEqualTo(of);
        Assertions.assertThat(((QueuedMessage) queuedMessages.get(1)).getAddedTimestamp()).isBefore(OffsetDateTime.now(Clock.systemUTC()));
        Assertions.assertThat(((QueuedMessage) queuedMessages.get(1)).getNextDeliveryTimestamp()).isBefore(OffsetDateTime.now(Clock.systemUTC()));
        Assertions.assertThat(((QueuedMessage) queuedMessages.get(1)).isDeadLetterMessage()).isFalse();
        Assertions.assertThat(((QueuedMessage) queuedMessages.get(1)).getLastDeliveryError()).isEqualTo((String) null);
        Assertions.assertThat(((QueuedMessage) queuedMessages.get(1)).getRedeliveryAttempts()).isEqualTo(0);
        Assertions.assertThat(((QueuedMessage) queuedMessages.get(1)).getTotalDeliveryAttempts()).isEqualTo(0);
        Assertions.assertThat((QueuedMessage) this.durableQueues.getQueuedMessage(queueEntryId3).get()).isEqualTo(queuedMessages.get(2));
        Assertions.assertThat(((QueuedMessage) queuedMessages.get(2)).getMessage()).usingRecursiveComparison().isEqualTo(of4);
        Assertions.assertThat(((QueuedMessage) queuedMessages.get(2)).getId()).isEqualTo(queueEntryId3);
        Assertions.assertThat(((QueuedMessage) queuedMessages.get(2)).getQueueName()).isEqualTo(of);
        Assertions.assertThat(((QueuedMessage) queuedMessages.get(2)).getAddedTimestamp()).isBefore(OffsetDateTime.now(Clock.systemUTC()));
        Assertions.assertThat(((QueuedMessage) queuedMessages.get(2)).getNextDeliveryTimestamp()).isBefore(OffsetDateTime.now(Clock.systemUTC()));
        Assertions.assertThat(((QueuedMessage) queuedMessages.get(2)).isDeadLetterMessage()).isFalse();
        Assertions.assertThat(((QueuedMessage) queuedMessages.get(2)).getLastDeliveryError()).isEqualTo((String) null);
        Assertions.assertThat(((QueuedMessage) queuedMessages.get(2)).getRedeliveryAttempts()).isEqualTo(0);
        Assertions.assertThat(((QueuedMessage) queuedMessages.get(2)).getTotalDeliveryAttempts()).isEqualTo(0);
        List queryForMessagesSoonReadyForDelivery = this.durableQueues.queryForMessagesSoonReadyForDelivery(of, Instant.now().minusSeconds(2L), 10);
        Assertions.assertThat(queryForMessagesSoonReadyForDelivery).hasSize(3);
        Assertions.assertThat(((NextQueuedMessage) queryForMessagesSoonReadyForDelivery.get(0)).id).isEqualTo(((QueuedMessage) queuedMessages.get(0)).getId());
        Assertions.assertThat(((NextQueuedMessage) queryForMessagesSoonReadyForDelivery.get(0)).addedTimestamp).isEqualTo(((QueuedMessage) queuedMessages.get(0)).getAddedTimestamp().toInstant());
        Assertions.assertThat(((NextQueuedMessage) queryForMessagesSoonReadyForDelivery.get(0)).nextDeliveryTimestamp).isEqualTo(((QueuedMessage) queuedMessages.get(0)).getNextDeliveryTimestamp().toInstant());
        Assertions.assertThat(((NextQueuedMessage) queryForMessagesSoonReadyForDelivery.get(1)).id).isEqualTo(((QueuedMessage) queuedMessages.get(1)).getId());
        Assertions.assertThat(((NextQueuedMessage) queryForMessagesSoonReadyForDelivery.get(1)).addedTimestamp).isEqualTo(((QueuedMessage) queuedMessages.get(1)).getAddedTimestamp().toInstant());
        Assertions.assertThat(((NextQueuedMessage) queryForMessagesSoonReadyForDelivery.get(1)).nextDeliveryTimestamp).isEqualTo(((QueuedMessage) queuedMessages.get(1)).getNextDeliveryTimestamp().toInstant());
        Assertions.assertThat(((NextQueuedMessage) queryForMessagesSoonReadyForDelivery.get(2)).id).isEqualTo(((QueuedMessage) queuedMessages.get(2)).getId());
        Assertions.assertThat(((NextQueuedMessage) queryForMessagesSoonReadyForDelivery.get(2)).addedTimestamp).isEqualTo(((QueuedMessage) queuedMessages.get(2)).getAddedTimestamp().toInstant());
        Assertions.assertThat(((NextQueuedMessage) queryForMessagesSoonReadyForDelivery.get(2)).nextDeliveryTimestamp).isEqualTo(((QueuedMessage) queuedMessages.get(2)).getNextDeliveryTimestamp().toInstant());
        Assertions.assertThat(this.durableQueues.queryForMessagesSoonReadyForDelivery(of, Instant.now().minusSeconds(2L), 2)).hasSize(2);
        Assertions.assertThat(this.durableQueues.purgeQueue(of)).isEqualTo(3);
        Assertions.assertThat(this.durableQueues.getTotalMessagesQueuedFor(of)).isEqualTo(0L);
    }

    @Test
    void verify_queued_messages_are_dequeued_in_order() {
        QueueName of = QueueName.of("TestQueue");
        this.durableQueues.purgeQueue(of);
        Message of2 = Message.of(new OrderEvent.OrderAdded(OrderId.random(), CustomerId.random(), 1234L), MessageMetaData.of("correlation_id", CorrelationId.random(), "trace_id", UUID.randomUUID().toString()));
        Assertions.assertThat(this.durableQueues.getQueueNames()).isEqualTo(Set.of(of));
        Message of3 = Message.of(new OrderEvent.ProductAddedToOrder(OrderId.random(), ProductId.random(), 2), MessageMetaData.of("correlation_id", CorrelationId.random(), "trace_id", UUID.randomUUID().toString()));
        Message of4 = Message.of(new OrderEvent.OrderAccepted(OrderId.random()));
        Assertions.assertThat(this.durableQueues.getTotalMessagesQueuedFor(of)).isEqualTo(3L);
        Assertions.assertThat(this.durableQueues.getTotalDeadLetterMessagesQueuedFor(of)).isEqualTo(0L);
        RecordingQueuedMessageHandler recordingQueuedMessageHandler = new RecordingQueuedMessageHandler();
        DurableQueueConsumer consumeFromQueue = this.durableQueues.consumeFromQueue(ConsumeFromQueue.builder().setQueueName(of).setRedeliveryPolicy(RedeliveryPolicy.fixedBackoff().setRedeliveryDelay(Duration.ofMillis(200L)).setMaximumNumberOfRedeliveries(5).build()).setParallelConsumers(1).setQueueMessageHandler(recordingQueuedMessageHandler).build());
        Assertions.assertThat(this.durableQueues.getQueueNames()).isEqualTo(Set.of(of));
        Awaitility.waitAtMost(Duration.ofSeconds(2L)).untilAsserted(() -> {
            Assertions.assertThat(recordingQueuedMessageHandler.messages).hasSize(3);
        });
        Awaitility.waitAtMost(Duration.ofSeconds(2L)).untilAsserted(() -> {
            Assertions.assertThat(this.durableQueues.getTotalMessagesQueuedFor(of)).isEqualTo(0L);
        });
        Assertions.assertThat(this.durableQueues.getTotalDeadLetterMessagesQueuedFor(of)).isEqualTo(0L);
        ArrayList arrayList = new ArrayList(recordingQueuedMessageHandler.messages);
        Assertions.assertThat((Message) arrayList.get(0)).usingRecursiveComparison().isEqualTo(of2);
        Assertions.assertThat((Message) arrayList.get(1)).usingRecursiveComparison().isEqualTo(of3);
        Assertions.assertThat((Message) arrayList.get(2)).usingRecursiveComparison().isEqualTo(of4);
        Assertions.assertThat(this.durableQueues.getQueueNames()).isEqualTo(Set.of(of));
        consumeFromQueue.cancel();
        Assertions.assertThat(this.durableQueues.getQueueNames()).isEqualTo(Set.of());
    }

    @Test
    void verify_a_message_queues_as_a_dead_letter_message_is_marked_as_such_and_will_not_be_delivered_to_the_consumer() {
        QueueName of = QueueName.of("TestQueue");
        Message of2 = Message.of(new OrderEvent.OrderAdded(OrderId.random(), CustomerId.random(), 1234L), MessageMetaData.of("correlation_id", CorrelationId.random(), "trace_id", UUID.randomUUID().toString()));
        QueueEntryId queueEntryId = (QueueEntryId) withDurableQueue(() -> {
            return this.durableQueues.queueMessageAsDeadLetterMessage(of, of2, new RuntimeException(AbstractDurableLocalCommandBusIT.ON_PURPOSE));
        });
        Assertions.assertThat(this.durableQueues.getQueueNames()).isEqualTo(Set.of(of));
        Assertions.assertThat(this.durableQueues.getQueueNameFor(queueEntryId)).isEqualTo(Optional.of(of));
        Assertions.assertThat(this.durableQueues.getTotalMessagesQueuedFor(of)).isEqualTo(0L);
        Assertions.assertThat(this.durableQueues.getTotalDeadLetterMessagesQueuedFor(of)).isEqualTo(1L);
        List deadLetterMessages = this.durableQueues.getDeadLetterMessages(of, DurableQueues.QueueingSortOrder.ASC, 0L, 20L);
        Assertions.assertThat(deadLetterMessages).hasSize(1);
        Assertions.assertThat(((QueuedMessage) deadLetterMessages.get(0)).getId()).isEqualTo(queueEntryId);
        RecordingQueuedMessageHandler recordingQueuedMessageHandler = new RecordingQueuedMessageHandler();
        DurableQueueConsumer consumeFromQueue = this.durableQueues.consumeFromQueue(of, RedeliveryPolicy.fixedBackoff(Duration.ofMillis(200L), 5), 1, recordingQueuedMessageHandler);
        Awaitility.await().during(Duration.ofMillis(1990L)).atMost(Duration.ofSeconds(2000L)).until(() -> {
            return Boolean.valueOf(recordingQueuedMessageHandler.messages.size() == 0);
        });
        consumeFromQueue.cancel();
        Assertions.assertThat(this.durableQueues.getQueueNames()).isEqualTo(Set.of(of));
    }

    @Test
    void verify_a_that_as_long_as_an_ordered_message_with_same_key_and_a_lower_key_order_exists_as_a_dead_letter_message_then_no_further_messages_with_the_same_key_will_be_delivered() {
        QueueName of = QueueName.of("TestQueue");
        String str = "Key1";
        List of2 = List.of("Key1Msg1", "Key1Msg2", "Key1Msg3", "Key1Msg4", "Key1Msg5");
        String str2 = "Key2";
        List of3 = List.of("Key2Msg1", "Key2Msg2", "Key2Msg3", "Key2Msg4", "Key2Msg5");
        usingDurableQueue(() -> {
            Lists.toIndexedStream(of2).forEach(pair -> {
                if (((String) pair._2).equals("Key1Msg3")) {
                    this.durableQueues.queueMessageAsDeadLetterMessage(of, OrderedMessage.of(pair._2, str, ((Integer) pair._1).intValue()), new RuntimeException(AbstractDurableLocalCommandBusIT.ON_PURPOSE));
                } else {
                    this.durableQueues.queueMessage(of, OrderedMessage.of(pair._2, str, ((Integer) pair._1).intValue()), Duration.ofMillis(100L));
                }
            });
            Lists.toIndexedStream(of3).forEach(pair2 -> {
                this.durableQueues.queueMessage(of, OrderedMessage.of(pair2._2, str2, ((Integer) pair2._1).intValue()), Duration.ofMillis(100L));
            });
        });
        Assertions.assertThat(this.durableQueues.getTotalMessagesQueuedFor(of)).isEqualTo((of2.size() + of3.size()) - 1);
        List deadLetterMessages = this.durableQueues.getDeadLetterMessages(of, DurableQueues.QueueingSortOrder.ASC, 0L, 20L);
        Assertions.assertThat(deadLetterMessages).hasSize(1);
        Assertions.assertThat(((QueuedMessage) deadLetterMessages.get(0)).getPayload()).isEqualTo(of2.get(2));
        QueueEntryId id = ((QueuedMessage) deadLetterMessages.get(0)).getId();
        RecordingQueuedMessageHandler recordingQueuedMessageHandler = new RecordingQueuedMessageHandler();
        DurableQueueConsumer consumeFromQueue = this.durableQueues.consumeFromQueue(of, RedeliveryPolicy.fixedBackoff(Duration.ofMillis(200L), 1), 2, recordingQueuedMessageHandler);
        Awaitility.waitAtMost(Duration.ofSeconds(5000L)).untilAsserted(() -> {
            Assertions.assertThat(recordingQueuedMessageHandler.messages).hasSize(of3.size() + 2);
        });
        Assertions.assertThat(recordingQueuedMessageHandler.messages.stream().map(message -> {
            return (String) message.getPayload();
        })).containsOnly(new String[]{"Key1Msg1", "Key1Msg2", "Key2Msg1", "Key2Msg2", "Key2Msg3", "Key2Msg4", "Key2Msg5"});
        recordingQueuedMessageHandler.messages.clear();
        usingDurableQueue(() -> {
            this.durableQueues.resurrectDeadLetterMessage(id, Duration.ofMillis(10L));
        });
        Awaitility.waitAtMost(Duration.ofSeconds(2000L)).untilAsserted(() -> {
            List deadLetterMessages2 = this.durableQueues.getDeadLetterMessages(of, DurableQueues.QueueingSortOrder.ASC, 0L, 20L);
            if (!deadLetterMessages2.isEmpty()) {
                deadLetterMessages2.forEach(queuedMessage -> {
                    System.out.println("Resurrected new DeadLetterMessage: " + queuedMessage);
                    usingDurableQueue(() -> {
                        this.durableQueues.resurrectDeadLetterMessage(queuedMessage.getId(), Duration.ofMillis(10L));
                    });
                });
            }
            Assertions.assertThat(recordingQueuedMessageHandler.messages).hasSize(3);
        });
        Assertions.assertThat(recordingQueuedMessageHandler.messages.stream().map(message2 -> {
            return (String) message2.getPayload();
        })).containsOnly(new String[]{"Key1Msg3", "Key1Msg4", "Key1Msg5"});
        consumeFromQueue.cancel();
    }

    @Test
    void verify_failed_messages_are_redelivered() {
        QueueName of = QueueName.of("TestQueue");
        Message of2 = Message.of(new OrderEvent.OrderAdded(OrderId.random(), CustomerId.random(), 12345L), MessageMetaData.of("correlation_id", CorrelationId.random(), "trace_id", UUID.randomUUID().toString()));
        usingDurableQueue(() -> {
            this.durableQueues.queueMessage(of, of2);
        });
        Assertions.assertThat(this.durableQueues.getTotalMessagesQueuedFor(of)).isEqualTo(1L);
        AtomicInteger atomicInteger = new AtomicInteger();
        RecordingQueuedMessageHandler recordingQueuedMessageHandler = new RecordingQueuedMessageHandler(message -> {
            if (atomicInteger.incrementAndGet() <= 3) {
                throw new RuntimeException("Thrown on purpose. Delivery count: " + atomicInteger);
            }
        });
        DurableQueueConsumer consumeFromQueue = this.durableQueues.consumeFromQueue(of, RedeliveryPolicy.fixedBackoff(Duration.ofMillis(200L), 5), 1, recordingQueuedMessageHandler);
        Awaitility.waitAtMost(Duration.ofSeconds(2L)).untilAsserted(() -> {
            Assertions.assertThat(recordingQueuedMessageHandler.messages).hasSize(4);
        });
        ArrayList arrayList = new ArrayList(recordingQueuedMessageHandler.messages);
        Assertions.assertThat((Message) arrayList.get(0)).usingRecursiveComparison().isEqualTo(of2);
        Assertions.assertThat((Message) arrayList.get(1)).usingRecursiveComparison().isEqualTo(of2);
        Assertions.assertThat((Message) arrayList.get(2)).usingRecursiveComparison().isEqualTo(of2);
        Assertions.assertThat((Message) arrayList.get(3)).usingRecursiveComparison().isEqualTo(of2);
        consumeFromQueue.cancel();
    }

    @Test
    void verify_a_message_that_failed_too_many_times_is_marked_as_dead_letter_message_AND_the_message_can_be_resurrected() {
        QueueName of = QueueName.of("TestQueue");
        Message of2 = Message.of(new OrderEvent.OrderAdded(OrderId.random(), CustomerId.random(), 123456L), MessageMetaData.of("correlation_id", CorrelationId.random(), "trace_id", UUID.randomUUID().toString()));
        QueueEntryId queueEntryId = (QueueEntryId) withDurableQueue(() -> {
            return this.durableQueues.queueMessage(of, of2);
        });
        Assertions.assertThat(this.durableQueues.getTotalMessagesQueuedFor(of)).isEqualTo(1L);
        AtomicInteger atomicInteger = new AtomicInteger();
        RecordingQueuedMessageHandler recordingQueuedMessageHandler = new RecordingQueuedMessageHandler(message -> {
            if (atomicInteger.incrementAndGet() <= 6) {
                throw new RuntimeException("Thrown on purpose. Delivery count: " + atomicInteger);
            }
        });
        DurableQueueConsumer consumeFromQueue = this.durableQueues.consumeFromQueue(of, RedeliveryPolicy.fixedBackoff(Duration.ofMillis(200L), 5), 1, recordingQueuedMessageHandler);
        Awaitility.waitAtMost(Duration.ofSeconds(4L)).untilAsserted(() -> {
            Assertions.assertThat(recordingQueuedMessageHandler.messages).hasSize(6);
        });
        ArrayList arrayList = new ArrayList(recordingQueuedMessageHandler.messages);
        Assertions.assertThat((Message) arrayList.get(0)).usingRecursiveComparison().isEqualTo(of2);
        Assertions.assertThat((Message) arrayList.get(1)).usingRecursiveComparison().isEqualTo(of2);
        Assertions.assertThat((Message) arrayList.get(2)).usingRecursiveComparison().isEqualTo(of2);
        Assertions.assertThat((Message) arrayList.get(3)).usingRecursiveComparison().isEqualTo(of2);
        Assertions.assertThat((Message) arrayList.get(4)).usingRecursiveComparison().isEqualTo(of2);
        Assertions.assertThat((Message) arrayList.get(5)).usingRecursiveComparison().isEqualTo(of2);
        Assertions.assertThat(this.durableQueues.getTotalMessagesQueuedFor(of)).isEqualTo(0L);
        Optional optional = (Optional) withDurableQueue(() -> {
            return this.durableQueues.getDeadLetterMessage(queueEntryId);
        });
        Assertions.assertThat(optional).isPresent();
        Assertions.assertThat(((QueuedMessage) optional.get()).getMessage()).usingRecursiveComparison().isEqualTo(of2);
        Assertions.assertThat((QueuedMessage) optional.get()).usingRecursiveComparison().isEqualTo((QueuedMessage) this.durableQueues.getDeadLetterMessages(of, DurableQueues.QueueingSortOrder.ASC, 0L, 20L).get(0));
        Assertions.assertThat((Optional) withDurableQueue(() -> {
            return this.durableQueues.resurrectDeadLetterMessage(queueEntryId, Duration.ofMillis(1000L));
        })).isPresent();
        Assertions.assertThat(this.durableQueues.getTotalMessagesQueuedFor(of)).isEqualTo(1L);
        Awaitility.waitAtMost(Duration.ofSeconds(4L)).untilAsserted(() -> {
            Assertions.assertThat(recordingQueuedMessageHandler.messages).hasSize(7);
        });
        Assertions.assertThat((Message) new ArrayList(recordingQueuedMessageHandler.messages).get(6)).usingRecursiveComparison().isEqualTo(of2);
        Awaitility.waitAtMost(Duration.ofMillis(500L)).untilAsserted(() -> {
            Assertions.assertThat(this.durableQueues.getQueuedMessages(of, DurableQueues.QueueingSortOrder.ASC, 0L, 20L)).isEmpty();
        });
        Assertions.assertThat(this.durableQueues.getTotalMessagesQueuedFor(of)).isEqualTo(0L);
        Assertions.assertThat(this.durableQueues.getDeadLetterMessages(of, DurableQueues.QueueingSortOrder.ASC, 0L, 20L)).isEmpty();
        consumeFromQueue.cancel();
    }
}
