package misk.jobqueue.sqs;

import com.amazonaws.http.timers.client.ClientExecutionTimeoutException;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.google.common.util.concurrent.ServiceManager;
import com.google.inject.Provider;
import com.squareup.moshi.Moshi;
import io.opentracing.Span;
import io.opentracing.Tracer;
import io.opentracing.tag.StringTag;
import io.opentracing.tag.Tags;
import io.prometheus.client.Counter;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import java.time.Clock;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import kotlin.Metadata;
import kotlin.Unit;
import kotlin.collections.CollectionsKt;
import kotlin.jvm.functions.Function0;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.internal.DefaultConstructorMarker;
import kotlin.jvm.internal.Intrinsics;
import kotlin.jvm.internal.Reflection;
import kotlin.jvm.internal.SourceDebugExtension;
import misk.backoff.Backoff;
import misk.feature.Feature;
import misk.feature.FeatureFlags;
import misk.jobqueue.JobConsumer;
import misk.jobqueue.JobHandler;
import misk.jobqueue.QueueName;
import misk.jobqueue.sqs.SqsJobConsumer;
import misk.metrics.Histogram;
import misk.tasks.RepeatedTaskQueue;
import misk.tasks.Status;
import misk.time.TimedKt;
import mu.KLogger;
import mu.KotlinLogging;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.MDC;
import wisp.tracing.TracerExtKt;

/* compiled from: SqsJobConsumer.kt */
@Singleton
@Metadata(mv = {1, 8, 0}, k = 1, xi = 48, d1 = {"��z\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0004\n\u0002\u0010\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0004\b\u0001\u0018�� *2\u00020\u0001:\u0002*+Bs\b\u0001\u0012\b\b\u0001\u0010\u0002\u001a\u00020\u0003\u0012\b\b\u0001\u0010\u0004\u001a\u00020\u0005\u0012\b\b\u0001\u0010\u0006\u001a\u00020\u0003\u0012\u0006\u0010\u0007\u001a\u00020\b\u0012\u0006\u0010\t\u001a\u00020\n\u0012\u0006\u0010\u000b\u001a\u00020\f\u0012\u0006\u0010\r\u001a\u00020\u000e\u0012\u0006\u0010\u000f\u001a\u00020\u0010\u0012\f\u0010\u0011\u001a\b\u0012\u0004\u0012\u00020\u00130\u0012\u0012\u0006\u0010\u0014\u001a\u00020\u0015\u0012\u0006\u0010\u0016\u001a\u00020\u0017\u0012\u0006\u0010\u0018\u001a\u00020\u0019¢\u0006\u0002\u0010\u001aJ\u0019\u0010!\u001a\u00060 R\u00020��2\u0006\u0010\"\u001a\u00020\u001fH��¢\u0006\u0002\b#J\u0006\u0010$\u001a\u00020%J\u0018\u0010&\u001a\u00020%2\u0006\u0010\"\u001a\u00020\u001f2\u0006\u0010'\u001a\u00020(H\u0016J\u0010\u0010)\u001a\u00020%2\u0006\u0010\"\u001a\u00020\u001fH\u0016R\u000e\u0010\u0016\u001a\u00020\u0017X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\t\u001a\u00020\nX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0002\u001a\u00020\u0003X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u000b\u001a\u00020\fX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\r\u001a\u00020\u000eX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u000f\u001a\u00020\u0010X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u001b\u001a\u00020\u001cX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0006\u001a\u00020\u0003X\u0082\u0004¢\u0006\u0002\n��R\u0014\u0010\u0011\u001a\b\u0012\u0004\u0012\u00020\u00130\u0012X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0007\u001a\u00020\bX\u0082\u0004¢\u0006\u0002\n��R\u001e\u0010\u001d\u001a\u0012\u0012\u0004\u0012\u00020\u001f\u0012\b\u0012\u00060 R\u00020��0\u001eX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0004\u001a\u00020\u0005X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0014\u001a\u00020\u0015X\u0082\u0004¢\u0006\u0002\n��¨\u0006,"}, d2 = {"Lmisk/jobqueue/sqs/SqsJobConsumer;", "Lmisk/jobqueue/JobConsumer;", "handlingThreads", "Ljava/util/concurrent/ExecutorService;", "taskQueue", "Lmisk/tasks/RepeatedTaskQueue;", "receivingThreads", "sqsConsumerAllocator", "Lmisk/jobqueue/sqs/SqsConsumerAllocator;", "featureFlags", "Lmisk/feature/FeatureFlags;", "metrics", "Lmisk/jobqueue/sqs/SqsMetrics;", "moshi", "Lcom/squareup/moshi/Moshi;", "queues", "Lmisk/jobqueue/sqs/QueueResolver;", "serviceManagerProvider", "Lcom/google/inject/Provider;", "Lcom/google/common/util/concurrent/ServiceManager;", "tracer", "Lio/opentracing/Tracer;", "clock", "Ljava/time/Clock;", "awsSqsJobQueueConfig", "Lmisk/jobqueue/sqs/AwsSqsJobQueueConfig;", "(Ljava/util/concurrent/ExecutorService;Lmisk/tasks/RepeatedTaskQueue;Ljava/util/concurrent/ExecutorService;Lmisk/jobqueue/sqs/SqsConsumerAllocator;Lmisk/feature/FeatureFlags;Lmisk/jobqueue/sqs/SqsMetrics;Lcom/squareup/moshi/Moshi;Lmisk/jobqueue/sqs/QueueResolver;Lcom/google/inject/Provider;Lio/opentracing/Tracer;Ljava/time/Clock;Lmisk/jobqueue/sqs/AwsSqsJobQueueConfig;)V", "receiverPolicy", "Lmisk/jobqueue/sqs/AwsSqsJobReceiverPolicy;", "subscriptions", "Ljava/util/concurrent/ConcurrentHashMap;", "Lmisk/jobqueue/QueueName;", "Lmisk/jobqueue/sqs/SqsJobConsumer$QueueReceiver;", "getReceiver", "queueName", "getReceiver$misk_aws", "shutDown", "", "subscribe", "handler", "Lmisk/jobqueue/JobHandler;", "unsubscribe", "Companion", "QueueReceiver", "misk-aws"})
@SourceDebugExtension({"SMAP\nSqsJobConsumer.kt\nKotlin\n*S Kotlin\n*F\n+ 1 SqsJobConsumer.kt\nmisk/jobqueue/sqs/SqsJobConsumer\n+ 2 Logging.kt\nwisp/logging/LoggingKt\n*L\n1#1,246:1\n12#2:247\n*S KotlinDebug\n*F\n+ 1 SqsJobConsumer.kt\nmisk/jobqueue/sqs/SqsJobConsumer\n*L\n230#1:247\n*E\n"})
/* loaded from: input_file:misk/jobqueue/sqs/SqsJobConsumer.class */
public final class SqsJobConsumer implements JobConsumer {

    @NotNull
    public static final Companion Companion = new Companion(null);

    @NotNull
    private final ExecutorService handlingThreads;

    @NotNull
    private final RepeatedTaskQueue taskQueue;

    @NotNull
    private final ExecutorService receivingThreads;

    @NotNull
    private final SqsConsumerAllocator sqsConsumerAllocator;

    @NotNull
    private final FeatureFlags featureFlags;

    @NotNull
    private final SqsMetrics metrics;

    @NotNull
    private final Moshi moshi;

    @NotNull
    private final QueueResolver queues;

    @NotNull
    private final Provider<ServiceManager> serviceManagerProvider;

    @NotNull
    private final Tracer tracer;

    @NotNull
    private final Clock clock;

    @NotNull
    private final AwsSqsJobReceiverPolicy receiverPolicy;

    @NotNull
    private final ConcurrentHashMap<QueueName, QueueReceiver> subscriptions;

    @NotNull
    private static final KLogger log;

    @NotNull
    private static final Feature POD_CONSUMERS_PER_QUEUE;

    @NotNull
    private static final Feature POD_MAX_JOBQUEUE_CONSUMERS;

    @NotNull
    private static final Feature CONSUMERS_PER_QUEUE;

    @NotNull
    private static final Feature CONSUMERS_BATCH_SIZE;

    @NotNull
    private static final StringTag ORIGINAL_TRACE_ID_TAG;

    @NotNull
    private static final String SQS_JOB_ID_MDC = "sqs_job_id";

    @NotNull
    private static final String SQS_QUEUE_TYPE_MDC = "misk.job_queue.queue_type";

    @NotNull
    private static final String SQS_JOB_ID_STRUCTURED_MDC = "misk.job_queue.job_id";

    @NotNull
    private static final String SQS_QUEUE_NAME_MDC = "misk.job_queue.queue_name";

    @NotNull
    private static final String SQS_QUEUE_TYPE = "aws-sqs";

    @NotNull
    private static final String SQS_ATTRIBUTE_SENT_TIMESTAMP = "SentTimestamp";

    @NotNull
    private static final String SQS_ATTRIBUTE_APPROX_RECEIVE_COUNT = "ApproximateReceiveCount";

    /* compiled from: SqsJobConsumer.kt */
    @Metadata(mv = {1, 8, 0}, k = 1, xi = 48, d1 = {"��*\n\u0002\u0018\u0002\n\u0002\u0010��\n\u0002\b\u0002\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0018\u0002\n\u0002\b\u0005\n\u0002\u0010\u000e\n\u0002\b\u0007\n\u0002\u0018\u0002\n��\b\u0086\u0003\u0018��2\u00020\u0001B\u0007\b\u0002¢\u0006\u0002\u0010\u0002R\u0014\u0010\u0003\u001a\u00020\u0004X\u0080\u0004¢\u0006\b\n��\u001a\u0004\b\u0005\u0010\u0006R\u0014\u0010\u0007\u001a\u00020\u0004X\u0080\u0004¢\u0006\b\n��\u001a\u0004\b\b\u0010\u0006R\u000e\u0010\t\u001a\u00020\nX\u0082\u0004¢\u0006\u0002\n��R\u0014\u0010\u000b\u001a\u00020\u0004X\u0080\u0004¢\u0006\b\n��\u001a\u0004\b\f\u0010\u0006R\u0014\u0010\r\u001a\u00020\u0004X\u0080\u0004¢\u0006\b\n��\u001a\u0004\b\u000e\u0010\u0006R\u000e\u0010\u000f\u001a\u00020\u0010X\u0082T¢\u0006\u0002\n��R\u000e\u0010\u0011\u001a\u00020\u0010X\u0082T¢\u0006\u0002\n��R\u000e\u0010\u0012\u001a\u00020\u0010X\u0082T¢\u0006\u0002\n��R\u000e\u0010\u0013\u001a\u00020\u0010X\u0082T¢\u0006\u0002\n��R\u000e\u0010\u0014\u001a\u00020\u0010X\u0082T¢\u0006\u0002\n��R\u000e\u0010\u0015\u001a\u00020\u0010X\u0082T¢\u0006\u0002\n��R\u000e\u0010\u0016\u001a\u00020\u0010X\u0082T¢\u0006\u0002\n��R\u000e\u0010\u0017\u001a\u00020\u0018X\u0082\u0004¢\u0006\u0002\n��¨\u0006\u0019"}, d2 = {"Lmisk/jobqueue/sqs/SqsJobConsumer$Companion;", "", "()V", "CONSUMERS_BATCH_SIZE", "Lmisk/feature/Feature;", "getCONSUMERS_BATCH_SIZE$misk_aws", "()Lmisk/feature/Feature;", "CONSUMERS_PER_QUEUE", "getCONSUMERS_PER_QUEUE$misk_aws", "ORIGINAL_TRACE_ID_TAG", "Lio/opentracing/tag/StringTag;", "POD_CONSUMERS_PER_QUEUE", "getPOD_CONSUMERS_PER_QUEUE$misk_aws", "POD_MAX_JOBQUEUE_CONSUMERS", "getPOD_MAX_JOBQUEUE_CONSUMERS$misk_aws", "SQS_ATTRIBUTE_APPROX_RECEIVE_COUNT", "", "SQS_ATTRIBUTE_SENT_TIMESTAMP", "SQS_JOB_ID_MDC", "SQS_JOB_ID_STRUCTURED_MDC", "SQS_QUEUE_NAME_MDC", "SQS_QUEUE_TYPE", "SQS_QUEUE_TYPE_MDC", "log", "Lmu/KLogger;", "misk-aws"})
    /* loaded from: input_file:misk/jobqueue/sqs/SqsJobConsumer$Companion.class */
    public static final class Companion {
        private Companion() {
        }

        @NotNull
        public final Feature getPOD_CONSUMERS_PER_QUEUE$misk_aws() {
            return SqsJobConsumer.POD_CONSUMERS_PER_QUEUE;
        }

        @NotNull
        public final Feature getPOD_MAX_JOBQUEUE_CONSUMERS$misk_aws() {
            return SqsJobConsumer.POD_MAX_JOBQUEUE_CONSUMERS;
        }

        @NotNull
        public final Feature getCONSUMERS_PER_QUEUE$misk_aws() {
            return SqsJobConsumer.CONSUMERS_PER_QUEUE;
        }

        @NotNull
        public final Feature getCONSUMERS_BATCH_SIZE$misk_aws() {
            return SqsJobConsumer.CONSUMERS_BATCH_SIZE;
        }

        public /* synthetic */ Companion(DefaultConstructorMarker defaultConstructorMarker) {
            this();
        }
    }

    /* compiled from: SqsJobConsumer.kt */
    @Metadata(mv = {1, 8, 0}, k = 1, xi = 48, d1 = {"��F\n\u0002\u0018\u0002\n\u0002\u0010��\n��\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\b\u0002\n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n��\n\u0002\u0010\b\n��\n\u0002\u0010 \n\u0002\u0018\u0002\n��\n\u0002\u0018\u0002\n\u0002\u0018\u0002\n\u0002\b\u0004\n\u0002\u0010\u0002\n��\b\u0080\u0004\u0018��2\u00020\u0001B\u0015\u0012\u0006\u0010\u0002\u001a\u00020\u0003\u0012\u0006\u0010\u0004\u001a\u00020\u0005¢\u0006\u0002\u0010\u0006J\b\u0010\u000b\u001a\u00020\fH\u0002J\u000e\u0010\r\u001a\b\u0012\u0004\u0012\u00020\u000f0\u000eH\u0002J\"\u0010\u0010\u001a\u000e\u0012\n\u0012\b\u0012\u0004\u0012\u00020\u00120\u00110\u000e2\f\u0010\u0013\u001a\b\u0012\u0004\u0012\u00020\u000f0\u000eH\u0002J\u000e\u0010\u0014\u001a\b\u0012\u0004\u0012\u00020\u00120\u000eH\u0002J\u0006\u0010\u0015\u001a\u00020\u0012J\u0006\u0010\u0016\u001a\u00020\u0017R\u000e\u0010\u0004\u001a\u00020\u0005X\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\u0007\u001a\u00020\bX\u0082\u0004¢\u0006\u0002\n��R\u000e\u0010\t\u001a\u00020\nX\u0082\u0004¢\u0006\u0002\n��¨\u0006\u0018"}, d2 = {"Lmisk/jobqueue/sqs/SqsJobConsumer$QueueReceiver;", "", "queueName", "Lmisk/jobqueue/QueueName;", "handler", "Lmisk/jobqueue/JobHandler;", "(Lmisk/jobqueue/sqs/SqsJobConsumer;Lmisk/jobqueue/QueueName;Lmisk/jobqueue/JobHandler;)V", "queue", "Lmisk/jobqueue/sqs/ResolvedQueue;", "shouldKeepRunning", "Ljava/util/concurrent/atomic/AtomicBoolean;", "batchSize", "", "fetchMessages", "", "Lmisk/jobqueue/sqs/SqsJob;", "handleMessages", "Ljava/util/concurrent/CompletableFuture;", "Lmisk/tasks/Status;", "messages", "receive", "run", "stop", "", "misk-aws"})
    @SourceDebugExtension({"SMAP\nSqsJobConsumer.kt\nKotlin\n*S Kotlin\n*F\n+ 1 SqsJobConsumer.kt\nmisk/jobqueue/sqs/SqsJobConsumer$QueueReceiver\n+ 2 ArraysJVM.kt\nkotlin/collections/ArraysKt__ArraysJVMKt\n+ 3 _Collections.kt\nkotlin/collections/CollectionsKt___CollectionsKt\n*L\n1#1,246:1\n37#2,2:247\n37#2,2:253\n1549#3:249\n1620#3,3:250\n1549#3:255\n1620#3,3:256\n*S KotlinDebug\n*F\n+ 1 SqsJobConsumer.kt\nmisk/jobqueue/sqs/SqsJobConsumer$QueueReceiver\n*L\n109#1:247,2\n178#1:253,2\n164#1:249\n164#1:250,3\n184#1:255\n184#1:256,3\n*E\n"})
    /* loaded from: input_file:misk/jobqueue/sqs/SqsJobConsumer$QueueReceiver.class */
    public final class QueueReceiver {

        @NotNull
        private final JobHandler handler;

        @NotNull
        private final ResolvedQueue queue;

        @NotNull
        private final AtomicBoolean shouldKeepRunning;
        final /* synthetic */ SqsJobConsumer this$0;

        public QueueReceiver(@NotNull SqsJobConsumer sqsJobConsumer, @NotNull QueueName queueName, JobHandler jobHandler) {
            Intrinsics.checkNotNullParameter(queueName, "queueName");
            Intrinsics.checkNotNullParameter(jobHandler, "handler");
            this.this$0 = sqsJobConsumer;
            this.handler = jobHandler;
            this.queue = this.this$0.queues.getForReceiving(queueName);
            this.shouldKeepRunning = new AtomicBoolean(false);
        }

        public final void stop() {
            this.shouldKeepRunning.set(false);
        }

        @NotNull
        public final Status run() {
            if (!this.shouldKeepRunning.get()) {
                Status status = Status.NO_RESCHEDULE;
            }
            int computeSqsConsumersForPod = this.this$0.sqsConsumerAllocator.computeSqsConsumersForPod(this.queue.getName(), this.this$0.receiverPolicy);
            SqsJobConsumer sqsJobConsumer = this.this$0;
            ArrayList arrayList = new ArrayList(computeSqsConsumersForPod);
            for (int i = 0; i < computeSqsConsumersForPod; i++) {
                arrayList.add(CompletableFuture.supplyAsync(() -> {
                    return run$lambda$1$lambda$0(r0);
                }, sqsJobConsumer.receivingThreads));
            }
            final ArrayList arrayList2 = arrayList;
            CompletableFuture[] completableFutureArr = (CompletableFuture[]) arrayList2.toArray(new CompletableFuture[0]);
            CompletableFuture<Void> allOf = CompletableFuture.allOf((CompletableFuture[]) Arrays.copyOf(completableFutureArr, completableFutureArr.length));
            Function1<Void, Status> function1 = new Function1<Void, Status>() { // from class: misk.jobqueue.sqs.SqsJobConsumer$QueueReceiver$run$1
                /* JADX INFO: Access modifiers changed from: package-private */
                /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                /* JADX WARN: Multi-variable type inference failed */
                {
                    super(1);
                }

                public final Status invoke(Void r5) {
                    List<CompletableFuture<List<Status>>> list = arrayList2;
                    ArrayList arrayList3 = new ArrayList();
                    Iterator<T> it = list.iterator();
                    while (it.hasNext()) {
                        Object join = ((CompletableFuture) it.next()).join();
                        Intrinsics.checkNotNullExpressionValue(join, "it.join()");
                        CollectionsKt.addAll(arrayList3, (List) join);
                    }
                    ArrayList arrayList4 = arrayList3;
                    Iterator it2 = arrayList4.iterator();
                    while (it2.hasNext()) {
                        if (!CollectionsKt.listOf(new Status[]{Status.FAILED, Status.OK, Status.NO_WORK}).contains((Status) it2.next())) {
                            throw new IllegalStateException("Check failed.".toString());
                        }
                    }
                    Status status2 = Status.NO_WORK;
                    for (Object obj : arrayList4) {
                        Status status3 = status2;
                        Status status4 = (Status) obj;
                        status2 = status4 == Status.FAILED ? status4 : (status4 != Status.OK || status3 == Status.FAILED) ? (status4 == Status.NO_WORK && status3 == Status.NO_WORK) ? status4 : status3 : status4;
                    }
                    return status2;
                }
            };
            Object join = allOf.thenApply((v1) -> {
                return run$lambda$2(r1, v1);
            }).join();
            Intrinsics.checkNotNullExpressionValue(join, "futures = List(size) {\n …         }\n      }.join()");
            return (Status) join;
        }

        private final List<SqsJob> fetchMessages() {
            List emptyList;
            try {
                emptyList = (List) this.this$0.metrics.getSqsReceiveTime().timedMills(new String[]{this.queue.getQueueName(), this.queue.getQueueName()}, new Function0<List<Message>>() { // from class: misk.jobqueue.sqs.SqsJobConsumer$QueueReceiver$fetchMessages$messages$1
                    /* JADX INFO: Access modifiers changed from: package-private */
                    {
                        super(0);
                    }

                    /* renamed from: invoke, reason: merged with bridge method [inline-methods] */
                    public final List<Message> m17invoke() {
                        ResolvedQueue resolvedQueue;
                        resolvedQueue = SqsJobConsumer.QueueReceiver.this.queue;
                        final SqsJobConsumer.QueueReceiver queueReceiver = SqsJobConsumer.QueueReceiver.this;
                        return (List) resolvedQueue.call(new Function1<AmazonSQS, List<Message>>() { // from class: misk.jobqueue.sqs.SqsJobConsumer$QueueReceiver$fetchMessages$messages$1.1
                            {
                                super(1);
                            }

                            public final List<Message> invoke(@NotNull AmazonSQS amazonSQS) {
                                ResolvedQueue resolvedQueue2;
                                int batchSize;
                                Intrinsics.checkNotNullParameter(amazonSQS, "client");
                                ReceiveMessageRequest withMessageAttributeNames = new ReceiveMessageRequest().withAttributeNames(new String[]{"All"}).withMessageAttributeNames(new String[]{"All"});
                                resolvedQueue2 = SqsJobConsumer.QueueReceiver.this.queue;
                                ReceiveMessageRequest withQueueUrl = withMessageAttributeNames.withQueueUrl(resolvedQueue2.getUrl());
                                batchSize = SqsJobConsumer.QueueReceiver.this.batchSize();
                                return amazonSQS.receiveMessage(withQueueUrl.withMaxNumberOfMessages(Integer.valueOf(batchSize))).getMessages();
                            }
                        });
                    }
                });
            } catch (ClientExecutionTimeoutException e) {
                SqsJobConsumer.log.info("timed out long polling for messages from " + this.queue.getQueueName());
                emptyList = CollectionsKt.emptyList();
            }
            List<Message> list = emptyList;
            for (Message message : list) {
                try {
                    Object obj = message.getAttributes().get(SqsJobConsumer.SQS_ATTRIBUTE_SENT_TIMESTAMP);
                    Intrinsics.checkNotNull(obj);
                    long parseLong = Long.parseLong((String) obj);
                    Object obj2 = message.getAttributes().get(SqsJobConsumer.SQS_ATTRIBUTE_APPROX_RECEIVE_COUNT);
                    Intrinsics.checkNotNull(obj2);
                    if (Long.parseLong((String) obj2) <= 1) {
                        this.this$0.metrics.getQueueProcessingLag().record(this.this$0.clock.instant().minusMillis(parseLong).toEpochMilli(), new String[]{this.queue.getQueueName(), this.queue.getQueueName()});
                    }
                } catch (NullPointerException e2) {
                    SqsJobConsumer.log.warn("Message " + message.getMessageId() + " was missing SentTimestamp or ApproximateReceiveCount");
                } catch (NumberFormatException e3) {
                    SqsJobConsumer.log.warn("Message " + message.getMessageId() + " had invalid SentTimestamp format");
                }
            }
            Intrinsics.checkNotNullExpressionValue(list, "messages");
            List<Message> list2 = list;
            SqsJobConsumer sqsJobConsumer = this.this$0;
            ArrayList arrayList = new ArrayList(CollectionsKt.collectionSizeOrDefault(list2, 10));
            for (Message message2 : list2) {
                QueueName name = this.queue.getName();
                QueueResolver queueResolver = sqsJobConsumer.queues;
                SqsMetrics sqsMetrics = sqsJobConsumer.metrics;
                Moshi moshi = sqsJobConsumer.moshi;
                Intrinsics.checkNotNullExpressionValue(message2, "it");
                arrayList.add(new SqsJob(name, queueResolver, sqsMetrics, moshi, message2));
            }
            return arrayList;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public final int batchSize() {
            return this.this$0.featureFlags.getInt(SqsJobConsumer.Companion.getCONSUMERS_BATCH_SIZE$misk_aws(), this.queue.getQueueName());
        }

        private final List<Status> receive() {
            List<SqsJob> fetchMessages = fetchMessages();
            if (fetchMessages.isEmpty()) {
                return CollectionsKt.listOf(Status.NO_WORK);
            }
            final List<CompletableFuture<Status>> handleMessages = handleMessages(fetchMessages);
            CompletableFuture[] completableFutureArr = (CompletableFuture[]) handleMessages.toArray(new CompletableFuture[0]);
            CompletableFuture<Void> allOf = CompletableFuture.allOf((CompletableFuture[]) Arrays.copyOf(completableFutureArr, completableFutureArr.length));
            Function1<Void, List<? extends Status>> function1 = new Function1<Void, List<? extends Status>>() { // from class: misk.jobqueue.sqs.SqsJobConsumer$QueueReceiver$receive$1
                /* JADX INFO: Access modifiers changed from: package-private */
                /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                /* JADX WARN: Multi-variable type inference failed */
                {
                    super(1);
                }

                public final List<Status> invoke(Void r6) {
                    List<CompletableFuture<Status>> list = handleMessages;
                    ArrayList arrayList = new ArrayList(CollectionsKt.collectionSizeOrDefault(list, 10));
                    Iterator<T> it = list.iterator();
                    while (it.hasNext()) {
                        arrayList.add((Status) ((CompletableFuture) it.next()).join());
                    }
                    return arrayList;
                }
            };
            Object join = allOf.thenApply((v1) -> {
                return receive$lambda$4(r1, v1);
            }).join();
            Intrinsics.checkNotNullExpressionValue(join, "futures = handleMessages…t.join() }\n      }.join()");
            return (List) join;
        }

        private final List<CompletableFuture<Status>> handleMessages(List<SqsJob> list) {
            List<SqsJob> list2 = list;
            SqsJobConsumer sqsJobConsumer = this.this$0;
            ArrayList arrayList = new ArrayList(CollectionsKt.collectionSizeOrDefault(list2, 10));
            for (SqsJob sqsJob : list2) {
                arrayList.add(CompletableFuture.supplyAsync(() -> {
                    return handleMessages$lambda$6$lambda$5(r0, r1, r2);
                }, sqsJobConsumer.handlingThreads));
            }
            return arrayList;
        }

        private static final List run$lambda$1$lambda$0(QueueReceiver queueReceiver) {
            Intrinsics.checkNotNullParameter(queueReceiver, "this$0");
            return queueReceiver.receive();
        }

        private static final Status run$lambda$2(Function1 function1, Object obj) {
            Intrinsics.checkNotNullParameter(function1, "$tmp0");
            return (Status) function1.invoke(obj);
        }

        private static final List receive$lambda$4(Function1 function1, Object obj) {
            Intrinsics.checkNotNullParameter(function1, "$tmp0");
            return (List) function1.invoke(obj);
        }

        private static final Status handleMessages$lambda$6$lambda$5(final SqsJobConsumer sqsJobConsumer, final QueueReceiver queueReceiver, final SqsJob sqsJob) {
            Intrinsics.checkNotNullParameter(sqsJobConsumer, "this$0");
            Intrinsics.checkNotNullParameter(queueReceiver, "this$1");
            Intrinsics.checkNotNullParameter(sqsJob, "$message");
            ((Counter.Child) sqsJobConsumer.metrics.getJobsReceived().labels(new String[]{queueReceiver.queue.getQueueName(), queueReceiver.queue.getQueueName()})).inc();
            return (Status) TracerExtKt.traceWithNewRootSpan$default(sqsJobConsumer.tracer, "handle-job-" + queueReceiver.queue.getQueueName(), (Map) null, false, new Function1<Span, Status>() { // from class: misk.jobqueue.sqs.SqsJobConsumer$QueueReceiver$handleMessages$1$1$1
                /* JADX INFO: Access modifiers changed from: package-private */
                /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                {
                    super(1);
                }

                @NotNull
                public final Status invoke(@NotNull Span span) {
                    ResolvedQueue resolvedQueue;
                    ResolvedQueue resolvedQueue2;
                    Status status;
                    ResolvedQueue resolvedQueue3;
                    ResolvedQueue resolvedQueue4;
                    StringTag stringTag;
                    Intrinsics.checkNotNullParameter(span, "span");
                    String str = SqsJob.this.getAttributes().get(SqsJob.ORIGINAL_TRACE_ID_ATTR);
                    if (str != null) {
                        stringTag = SqsJobConsumer.ORIGINAL_TRACE_ID_TAG;
                        stringTag.set(span, str);
                    }
                    try {
                        try {
                            MDC.put("sqs_job_id", SqsJob.this.getId());
                            MDC.put("misk.job_queue.job_id", SqsJob.this.getId());
                            MDC.put("misk.job_queue.queue_name", SqsJob.this.getQueueName().getValue());
                            MDC.put("misk.job_queue.queue_type", "aws-sqs");
                            final SqsJobConsumer.QueueReceiver queueReceiver2 = queueReceiver;
                            final SqsJob sqsJob2 = SqsJob.this;
                            Duration duration = (Duration) TimedKt.timed(new Function0<Unit>() { // from class: misk.jobqueue.sqs.SqsJobConsumer$QueueReceiver$handleMessages$1$1$1.2
                                /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
                                {
                                    super(0);
                                }

                                public final void invoke() {
                                    JobHandler jobHandler;
                                    jobHandler = SqsJobConsumer.QueueReceiver.this.handler;
                                    jobHandler.handleJob(sqsJob2);
                                }

                                /* renamed from: invoke, reason: collision with other method in class */
                                public /* bridge */ /* synthetic */ Object m18invoke() {
                                    invoke();
                                    return Unit.INSTANCE;
                                }
                            }).component1();
                            Histogram handlerDispatchTime = sqsJobConsumer.metrics.getHandlerDispatchTime();
                            double millis = duration.toMillis();
                            resolvedQueue3 = queueReceiver.queue;
                            resolvedQueue4 = queueReceiver.queue;
                            handlerDispatchTime.record(millis, new String[]{resolvedQueue3.getQueueName(), resolvedQueue4.getQueueName()});
                            status = Status.OK;
                            MDC.remove("sqs_job_id");
                            MDC.remove("misk.job_queue.job_id");
                            MDC.remove("misk.job_queue.queue_name");
                            MDC.remove("misk.job_queue.queue_type");
                        } catch (Throwable th) {
                            KLogger kLogger = SqsJobConsumer.log;
                            final SqsJobConsumer.QueueReceiver queueReceiver3 = queueReceiver;
                            kLogger.error(th, new Function0<Object>() { // from class: misk.jobqueue.sqs.SqsJobConsumer$QueueReceiver$handleMessages$1$1$1.3
                                {
                                    super(0);
                                }

                                @Nullable
                                public final Object invoke() {
                                    ResolvedQueue resolvedQueue5;
                                    resolvedQueue5 = SqsJobConsumer.QueueReceiver.this.queue;
                                    return "error handling job from " + resolvedQueue5.getQueueName();
                                }
                            });
                            Counter handlerFailures = sqsJobConsumer.metrics.getHandlerFailures();
                            resolvedQueue = queueReceiver.queue;
                            resolvedQueue2 = queueReceiver.queue;
                            ((Counter.Child) handlerFailures.labels(new String[]{resolvedQueue.getQueueName(), resolvedQueue2.getQueueName()})).inc();
                            Tags.ERROR.set(span, true);
                            status = Status.FAILED;
                            MDC.remove("sqs_job_id");
                            MDC.remove("misk.job_queue.job_id");
                            MDC.remove("misk.job_queue.queue_name");
                            MDC.remove("misk.job_queue.queue_type");
                        }
                        return status;
                    } catch (Throwable th2) {
                        MDC.remove("sqs_job_id");
                        MDC.remove("misk.job_queue.job_id");
                        MDC.remove("misk.job_queue.queue_name");
                        MDC.remove("misk.job_queue.queue_type");
                        throw th2;
                    }
                }
            }, 6, (Object) null);
        }
    }

    @Inject
    public SqsJobConsumer(@ForSqsHandling @NotNull ExecutorService executorService, @ForSqsHandling @NotNull RepeatedTaskQueue repeatedTaskQueue, @ForSqsReceiving @NotNull ExecutorService executorService2, @NotNull SqsConsumerAllocator sqsConsumerAllocator, @NotNull FeatureFlags featureFlags, @NotNull SqsMetrics sqsMetrics, @NotNull Moshi moshi, @NotNull QueueResolver queueResolver, @NotNull Provider<ServiceManager> provider, @NotNull Tracer tracer, @NotNull Clock clock, @NotNull AwsSqsJobQueueConfig awsSqsJobQueueConfig) {
        Intrinsics.checkNotNullParameter(executorService, "handlingThreads");
        Intrinsics.checkNotNullParameter(repeatedTaskQueue, "taskQueue");
        Intrinsics.checkNotNullParameter(executorService2, "receivingThreads");
        Intrinsics.checkNotNullParameter(sqsConsumerAllocator, "sqsConsumerAllocator");
        Intrinsics.checkNotNullParameter(featureFlags, "featureFlags");
        Intrinsics.checkNotNullParameter(sqsMetrics, "metrics");
        Intrinsics.checkNotNullParameter(moshi, "moshi");
        Intrinsics.checkNotNullParameter(queueResolver, "queues");
        Intrinsics.checkNotNullParameter(provider, "serviceManagerProvider");
        Intrinsics.checkNotNullParameter(tracer, "tracer");
        Intrinsics.checkNotNullParameter(clock, "clock");
        Intrinsics.checkNotNullParameter(awsSqsJobQueueConfig, "awsSqsJobQueueConfig");
        this.handlingThreads = executorService;
        this.taskQueue = repeatedTaskQueue;
        this.receivingThreads = executorService2;
        this.sqsConsumerAllocator = sqsConsumerAllocator;
        this.featureFlags = featureFlags;
        this.metrics = sqsMetrics;
        this.moshi = moshi;
        this.queues = queueResolver;
        this.serviceManagerProvider = provider;
        this.tracer = tracer;
        this.clock = clock;
        this.receiverPolicy = awsSqsJobQueueConfig.getAws_sqs_job_receiver_policy();
        this.subscriptions = new ConcurrentHashMap<>();
    }

    public void subscribe(@NotNull final QueueName queueName, @NotNull JobHandler jobHandler) {
        Intrinsics.checkNotNullParameter(queueName, "queueName");
        Intrinsics.checkNotNullParameter(jobHandler, "handler");
        final QueueReceiver queueReceiver = new QueueReceiver(this, queueName, jobHandler);
        if (!(this.subscriptions.putIfAbsent(queueName, queueReceiver) == null)) {
            throw new IllegalStateException(("already subscribed to queue " + queueName.getValue()).toString());
        }
        log.info(new Function0<Object>() { // from class: misk.jobqueue.sqs.SqsJobConsumer$subscribe$2
            /* JADX INFO: Access modifiers changed from: package-private */
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(0);
            }

            @Nullable
            public final Object invoke() {
                return "subscribing to queue " + queueName.getValue();
            }
        });
        RepeatedTaskQueue repeatedTaskQueue = this.taskQueue;
        Duration duration = Duration.ZERO;
        Intrinsics.checkNotNullExpressionValue(duration, "ZERO");
        RepeatedTaskQueue.scheduleWithBackoff$default(repeatedTaskQueue, duration, (Duration) null, (Backoff) null, (Backoff) null, new Function0<Status>() { // from class: misk.jobqueue.sqs.SqsJobConsumer$subscribe$3
            /* JADX INFO: Access modifiers changed from: package-private */
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(0);
            }

            @NotNull
            /* renamed from: invoke, reason: merged with bridge method [inline-methods] */
            public final Status m19invoke() {
                Provider provider;
                provider = SqsJobConsumer.this.serviceManagerProvider;
                return ((ServiceManager) provider.get()).isHealthy() ? queueReceiver.run() : Status.NO_WORK;
            }
        }, 14, (Object) null);
    }

    public void unsubscribe(@NotNull QueueName queueName) {
        Intrinsics.checkNotNullParameter(queueName, "queueName");
        QueueReceiver queueReceiver = this.subscriptions.get(queueName);
        if (queueReceiver != null) {
            queueReceiver.stop();
        }
    }

    @NotNull
    public final QueueReceiver getReceiver$misk_aws(@NotNull QueueName queueName) {
        Intrinsics.checkNotNullParameter(queueName, "queueName");
        QueueReceiver queueReceiver = this.subscriptions.get(queueName);
        Intrinsics.checkNotNull(queueReceiver);
        return queueReceiver;
    }

    public final void shutDown() {
        this.receivingThreads.shutdown();
        this.handlingThreads.shutdown();
        this.handlingThreads.awaitTermination(10L, TimeUnit.SECONDS);
    }

    static {
        KotlinLogging kotlinLogging = KotlinLogging.INSTANCE;
        String qualifiedName = Reflection.getOrCreateKotlinClass(SqsJobConsumer.class).getQualifiedName();
        Intrinsics.checkNotNull(qualifiedName);
        log = kotlinLogging.logger(qualifiedName);
        POD_CONSUMERS_PER_QUEUE = new Feature("pod-jobqueue-consumers");
        POD_MAX_JOBQUEUE_CONSUMERS = new Feature("pod-max-jobqueue-consumers");
        CONSUMERS_PER_QUEUE = new Feature("jobqueue-consumers");
        CONSUMERS_BATCH_SIZE = new Feature("jobqueue-consumers-fetch-batch-size");
        ORIGINAL_TRACE_ID_TAG = new StringTag("original.trace_id");
    }
}
