package dk.cloudcreate.essentials.components.foundation.messaging.queue.micrometer;

import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueues;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueuesInterceptor;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.MessageMetaData;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueueEntryId;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.QueuedMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.AcknowledgeMessageAsHandled;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.DeleteMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.GetDeadLetterMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.GetNextMessageReadyForDelivery;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.GetQueuedMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.MarkAsDeadLetterMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.QueueMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.QueueMessageAsDeadLetterMessage;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.QueueMessages;
import dk.cloudcreate.essentials.components.foundation.messaging.queue.operations.RetryMessage;
import dk.cloudcreate.essentials.shared.FailFast;
import dk.cloudcreate.essentials.shared.interceptor.InterceptorChain;
import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
import io.micrometer.observation.transport.Kind;
import io.micrometer.observation.transport.ReceiverContext;
import io.micrometer.tracing.CurrentTraceContext;
import io.micrometer.tracing.Tracer;
import io.micrometer.tracing.propagation.Propagator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

/* loaded from: input_file:dk/cloudcreate/essentials/components/foundation/messaging/queue/micrometer/DurableQueuesMicrometerTracingInterceptor.class */
public class DurableQueuesMicrometerTracingInterceptor implements DurableQueuesInterceptor {
    public static final String QUEUE_ENTRY_ID = "queueEntryId";
    public static final String QUEUE_NAME = "queueName";
    private final Tracer tracer;
    private final Propagator propagator;
    private final ObservationRegistry observationRegistry;
    private final boolean verboseTracing;
    private final ThreadLocal<Observation.Scope> activeObservationScope = new ThreadLocal<>();
    private DurableQueues durableQueues;

    public DurableQueuesMicrometerTracingInterceptor(Tracer tracer, Propagator propagator, ObservationRegistry observationRegistry, boolean z) {
        this.tracer = (Tracer) FailFast.requireNonNull(tracer, "No tracer instance provided");
        this.propagator = (Propagator) FailFast.requireNonNull(propagator, "No propagator instance provided");
        this.observationRegistry = (ObservationRegistry) FailFast.requireNonNull(observationRegistry, "No observationRegistry instance provided");
        this.verboseTracing = z;
    }

    @Override // dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueuesInterceptor
    public void setDurableQueues(DurableQueues durableQueues) {
        this.durableQueues = (DurableQueues) FailFast.requireNonNull(durableQueues, "No durableQueues instance provided");
    }

    @Override // dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueuesInterceptor
    public QueueEntryId intercept(QueueMessage queueMessage, InterceptorChain<QueueMessage, QueueEntryId, DurableQueuesInterceptor> interceptorChain) {
        storeTraceContext(queueMessage.getMetaData());
        Observation lowCardinalityKeyValue = Observation.createNotStarted("QueueMessage:" + queueMessage.queueName.toString(), this.observationRegistry).lowCardinalityKeyValue(QUEUE_NAME, queueMessage.queueName.toString());
        return (QueueEntryId) lowCardinalityKeyValue.observe(() -> {
            QueueEntryId queueEntryId = (QueueEntryId) interceptorChain.proceed();
            lowCardinalityKeyValue.highCardinalityKeyValue(QUEUE_ENTRY_ID, queueEntryId.toString());
            return queueEntryId;
        });
    }

    @Override // dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueuesInterceptor
    public List<QueueEntryId> intercept(QueueMessages queueMessages, InterceptorChain<QueueMessages, List<QueueEntryId>, DurableQueuesInterceptor> interceptorChain) {
        queueMessages.messages.forEach(message -> {
            storeTraceContext(message.getMetaData());
        });
        Observation lowCardinalityKeyValue = Observation.createNotStarted("QueueMessages:" + queueMessages.queueName.toString(), this.observationRegistry).highCardinalityKeyValue("numberOfMessages", Integer.toString(queueMessages.messages.size())).lowCardinalityKeyValue(QUEUE_NAME, queueMessages.queueName.toString());
        Objects.requireNonNull(interceptorChain);
        return (List) lowCardinalityKeyValue.observe(interceptorChain::proceed);
    }

    @Override // dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueuesInterceptor
    public Optional<QueuedMessage> intercept(GetDeadLetterMessage getDeadLetterMessage, InterceptorChain<GetDeadLetterMessage, Optional<QueuedMessage>, DurableQueuesInterceptor> interceptorChain) {
        if (!this.verboseTracing) {
            return (Optional) interceptorChain.proceed();
        }
        Observation highCardinalityKeyValue = Observation.createNotStarted("GetDeadLetterMessage", this.observationRegistry).highCardinalityKeyValue(QUEUE_ENTRY_ID, getDeadLetterMessage.queueEntryId.toString());
        Objects.requireNonNull(interceptorChain);
        return ((Optional) highCardinalityKeyValue.observe(interceptorChain::proceed)).map(queuedMessage -> {
            return restoreTraceContext(queuedMessage, "DeadLetterMessage");
        });
    }

    @Override // dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueuesInterceptor
    public Optional<QueuedMessage> intercept(GetQueuedMessage getQueuedMessage, InterceptorChain<GetQueuedMessage, Optional<QueuedMessage>, DurableQueuesInterceptor> interceptorChain) {
        if (!this.verboseTracing) {
            return (Optional) interceptorChain.proceed();
        }
        Observation highCardinalityKeyValue = Observation.createNotStarted("GetQueuedMessage", this.observationRegistry).highCardinalityKeyValue(QUEUE_ENTRY_ID, getQueuedMessage.queueEntryId.toString());
        Objects.requireNonNull(interceptorChain);
        return ((Optional) highCardinalityKeyValue.observe(interceptorChain::proceed)).map(queuedMessage -> {
            return restoreTraceContext(queuedMessage, "GetQueuedMessage");
        });
    }

    @Override // dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueuesInterceptor
    public Optional<QueuedMessage> intercept(GetNextMessageReadyForDelivery getNextMessageReadyForDelivery, InterceptorChain<GetNextMessageReadyForDelivery, Optional<QueuedMessage>, DurableQueuesInterceptor> interceptorChain) {
        return ((Optional) interceptorChain.proceed()).map(queuedMessage -> {
            return restoreTraceContext(queuedMessage, "DeliverMessage");
        });
    }

    @Override // dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueuesInterceptor
    public QueueEntryId intercept(QueueMessageAsDeadLetterMessage queueMessageAsDeadLetterMessage, InterceptorChain<QueueMessageAsDeadLetterMessage, QueueEntryId, DurableQueuesInterceptor> interceptorChain) {
        storeTraceContext(queueMessageAsDeadLetterMessage.getMetaData());
        Observation lowCardinalityKeyValue = Observation.createNotStarted("QueueMessageAsDeadLetterMessage:" + queueMessageAsDeadLetterMessage.queueName.toString(), this.observationRegistry).lowCardinalityKeyValue(QUEUE_NAME, queueMessageAsDeadLetterMessage.queueName.toString());
        Objects.requireNonNull(interceptorChain);
        return (QueueEntryId) lowCardinalityKeyValue.observe(interceptorChain::proceed);
    }

    @Override // dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueuesInterceptor
    public Optional<QueuedMessage> intercept(RetryMessage retryMessage, InterceptorChain<RetryMessage, Optional<QueuedMessage>, DurableQueuesInterceptor> interceptorChain) {
        return (Optional) Observation.createNotStarted("RetryMessage", this.observationRegistry).highCardinalityKeyValue(QUEUE_ENTRY_ID, retryMessage.queueEntryId.toString()).observe(() -> {
            Optional optional = (Optional) interceptorChain.proceed();
            closeAnyActiveObservationScope();
            return optional;
        });
    }

    @Override // dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueuesInterceptor
    public Optional<QueuedMessage> intercept(MarkAsDeadLetterMessage markAsDeadLetterMessage, InterceptorChain<MarkAsDeadLetterMessage, Optional<QueuedMessage>, DurableQueuesInterceptor> interceptorChain) {
        return (Optional) Observation.createNotStarted("MarkAsDeadLetterMessage", this.observationRegistry).highCardinalityKeyValue(QUEUE_ENTRY_ID, markAsDeadLetterMessage.queueEntryId.toString()).observe(() -> {
            Optional optional = (Optional) interceptorChain.proceed();
            closeAnyActiveObservationScope();
            return optional;
        });
    }

    @Override // dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueuesInterceptor
    public boolean intercept(AcknowledgeMessageAsHandled acknowledgeMessageAsHandled, InterceptorChain<AcknowledgeMessageAsHandled, Boolean, DurableQueuesInterceptor> interceptorChain) {
        if (this.verboseTracing) {
            return Boolean.TRUE.equals(Observation.createNotStarted("AcknowledgeMessageAsHandled", this.observationRegistry).highCardinalityKeyValue(QUEUE_ENTRY_ID, acknowledgeMessageAsHandled.queueEntryId.toString()).observe(() -> {
                Boolean bool = (Boolean) interceptorChain.proceed();
                closeAnyActiveObservationScope();
                return bool;
            }));
        }
        Boolean bool = (Boolean) interceptorChain.proceed();
        closeAnyActiveObservationScope();
        return bool.booleanValue();
    }

    @Override // dk.cloudcreate.essentials.components.foundation.messaging.queue.DurableQueuesInterceptor
    public boolean intercept(DeleteMessage deleteMessage, InterceptorChain<DeleteMessage, Boolean, DurableQueuesInterceptor> interceptorChain) {
        if (this.verboseTracing) {
            return Boolean.TRUE.equals(Observation.createNotStarted("DeleteMessage", this.observationRegistry).highCardinalityKeyValue(QUEUE_ENTRY_ID, deleteMessage.queueEntryId.toString()).observe(() -> {
                Boolean bool = (Boolean) interceptorChain.proceed();
                closeAnyActiveObservationScope();
                return bool;
            }));
        }
        Boolean bool = (Boolean) interceptorChain.proceed();
        closeAnyActiveObservationScope();
        return bool.booleanValue();
    }

    protected void storeTraceContext(MessageMetaData messageMetaData) {
        CurrentTraceContext currentTraceContext;
        if (messageMetaData == null || (currentTraceContext = this.tracer.currentTraceContext()) == null || currentTraceContext.context() == null) {
            return;
        }
        this.propagator.inject(currentTraceContext.context(), messageMetaData, (v0, v1, v2) -> {
            v0.put(v1, v2);
        });
    }

    protected QueuedMessage restoreTraceContext(QueuedMessage queuedMessage, String str) {
        FailFast.requireNonNull(queuedMessage, "No queuedMessage provided");
        FailFast.requireNonNull(str, "No contextDescription provided");
        closeAnyActiveObservationScope();
        Observation start = Observation.start(str + ":" + queuedMessage.getQueueName().toString(), () -> {
            return createTraceContextForMessage(queuedMessage);
        }, this.observationRegistry);
        start.lowCardinalityKeyValue(QUEUE_NAME, queuedMessage.getQueueName().toString());
        start.highCardinalityKeyValue(QUEUE_ENTRY_ID, queuedMessage.getId().toString());
        start.highCardinalityKeyValue("addedTimestamp", queuedMessage.getAddedTimestamp().toString());
        start.highCardinalityKeyValue("deliveryTimestamp", queuedMessage.getDeliveryTimestamp() != null ? queuedMessage.getDeliveryTimestamp().toString() : "");
        start.highCardinalityKeyValue("totalDeliveryAttempts", Integer.toString(queuedMessage.getTotalDeliveryAttempts()));
        start.highCardinalityKeyValue("redeliveryAttempts", Integer.toString(queuedMessage.getRedeliveryAttempts()));
        this.activeObservationScope.set(start.openScope());
        return queuedMessage;
    }

    private ReceiverContext<MessageMetaData> createTraceContextForMessage(QueuedMessage queuedMessage) {
        FailFast.requireNonNull(queuedMessage, "No queuedMessage provided");
        ReceiverContext<MessageMetaData> receiverContext = new ReceiverContext<>((v0, v1) -> {
            return v0.get(v1);
        }, Kind.CONSUMER);
        receiverContext.setCarrier(queuedMessage.getMetaData());
        return receiverContext;
    }

    private void closeAnyActiveObservationScope() {
        Observation.Scope scope = this.activeObservationScope.get();
        if (scope != null) {
            scope.close();
            scope.getCurrentObservation().stop();
            this.activeObservationScope.remove();
        }
    }
}
