package akka.remote.artery.aeron;

import akka.event.LoggingAdapter;
import akka.remote.artery.EnvelopeBuffer;
import akka.remote.artery.aeron.AeronSource;
import akka.remote.artery.aeron.TaskRunner;
import akka.stream.stage.AsyncCallback;
import akka.stream.stage.GraphStageLogic;
import akka.stream.stage.OutHandler;
import akka.stream.stage.StageLogging;
import io.aeron.Subscription;
import io.aeron.exceptions.DriverTimeoutException;
import org.agrona.hints.ThreadHints;
import scala.collection.immutable.List;
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.concurrent.Promise$;
import scala.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.control.NonFatal$;

/* compiled from: AeronSource.scala */
/* loaded from: input_file:BOOT-INF/lib/akka-remote_2.13-2.6.20.jar:akka/remote/artery/aeron/AeronSource$$anon$2.class */
public final class AeronSource$$anon$2 extends GraphStageLogic implements OutHandler, AeronSource.AeronLifecycle, StageLogging {
    private final Subscription subscription;
    private int backoffCount;
    private long delegateTaskStartTime;
    private long countBeforeDelegate;
    private final AeronSource.MessageHandler messageHandler;
    private final TaskRunner.Add addPollTask;
    private boolean delegatingToTaskRunner;
    private List<Object> pendingUnavailableImages;
    private final AsyncCallback<Object> onUnavailableImageCb;
    private final AsyncCallback<Promise<Object>> getStatusCb;
    private LoggingAdapter akka$stream$stage$StageLogging$$_log;
    private final /* synthetic */ AeronSource $outer;

    @Override // akka.stream.stage.StageLogging
    public LoggingAdapter log() {
        return StageLogging.log$(this);
    }

    @Override // akka.stream.stage.OutHandler
    public void onDownstreamFinish() throws Exception {
        onDownstreamFinish();
    }

    @Override // akka.stream.stage.OutHandler
    public void onDownstreamFinish(Throwable th) throws Exception {
        onDownstreamFinish(th);
    }

    @Override // akka.stream.stage.StageLogging
    public LoggingAdapter akka$stream$stage$StageLogging$$_log() {
        return this.akka$stream$stage$StageLogging$$_log;
    }

    @Override // akka.stream.stage.StageLogging
    public void akka$stream$stage$StageLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.akka$stream$stage$StageLogging$$_log = loggingAdapter;
    }

    private Subscription subscription() {
        return this.subscription;
    }

    private int backoffCount() {
        return this.backoffCount;
    }

    private void backoffCount_$eq(int i) {
        this.backoffCount = i;
    }

    private long delegateTaskStartTime() {
        return this.delegateTaskStartTime;
    }

    private void delegateTaskStartTime_$eq(long j) {
        this.delegateTaskStartTime = j;
    }

    private long countBeforeDelegate() {
        return this.countBeforeDelegate;
    }

    private void countBeforeDelegate_$eq(long j) {
        this.countBeforeDelegate = j;
    }

    private AeronSource.MessageHandler messageHandler() {
        return this.messageHandler;
    }

    private TaskRunner.Add addPollTask() {
        return this.addPollTask;
    }

    private boolean delegatingToTaskRunner() {
        return this.delegatingToTaskRunner;
    }

    private void delegatingToTaskRunner_$eq(boolean z) {
        this.delegatingToTaskRunner = z;
    }

    private List<Object> pendingUnavailableImages() {
        return this.pendingUnavailableImages;
    }

    private void pendingUnavailableImages_$eq(List<Object> list) {
        this.pendingUnavailableImages = list;
    }

    private AsyncCallback<Object> onUnavailableImageCb() {
        return this.onUnavailableImageCb;
    }

    private AsyncCallback<Promise<Object>> getStatusCb() {
        return this.getStatusCb;
    }

    @Override // akka.stream.stage.StageLogging
    public Class<AeronSource> logSource() {
        return AeronSource.class;
    }

    @Override // akka.stream.stage.GraphStageLogic
    public void preStart() {
        this.$outer.akka$remote$artery$aeron$AeronSource$$flightRecorder.aeronSourceStarted(this.$outer.akka$remote$artery$aeron$AeronSource$$channel, this.$outer.akka$remote$artery$aeron$AeronSource$$streamId);
    }

    @Override // akka.stream.stage.GraphStageLogic
    public void postStop() {
        this.$outer.akka$remote$artery$aeron$AeronSource$$taskRunner.command(new TaskRunner.Remove(addPollTask().task()));
        try {
            try {
                subscription().close();
            } catch (DriverTimeoutException e) {
                log().debug("DriverTimeout when closing subscription. {}", e);
            }
        } finally {
            this.$outer.akka$remote$artery$aeron$AeronSource$$flightRecorder.aeronSourceStopped(this.$outer.akka$remote$artery$aeron$AeronSource$$channel, this.$outer.akka$remote$artery$aeron$AeronSource$$streamId);
        }
    }

    @Override // akka.stream.stage.OutHandler
    public void onPull() {
        backoffCount_$eq(this.$outer.akka$remote$artery$aeron$AeronSource$$spinning);
        subscriberLoop();
    }

    private void subscriberLoop() {
        while (true) {
            messageHandler().reset();
            int poll = subscription().poll(messageHandler().fragmentsHandler(), 1);
            EnvelopeBuffer messageReceived = messageHandler().messageReceived();
            messageHandler().reset();
            if (poll > 0) {
                countBeforeDelegate_$eq(countBeforeDelegate() + 1);
                if (messageReceived != null) {
                    onMessage(messageReceived);
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                    return;
                }
            } else {
                backoffCount_$eq(backoffCount() - 1);
                if (backoffCount() <= 0) {
                    this.$outer.akka$remote$artery$aeron$AeronSource$$flightRecorder.aeronSourceDelegateToTaskRunner(countBeforeDelegate());
                    delegatingToTaskRunner_$eq(true);
                    delegateTaskStartTime_$eq(System.nanoTime());
                    this.$outer.akka$remote$artery$aeron$AeronSource$$taskRunner.command(addPollTask());
                    BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                    return;
                }
                ThreadHints.onSpinWait();
            }
        }
    }

    @Override // akka.remote.artery.aeron.AeronSource.AeronLifecycle
    public Future<Object> channelEndpointStatus() {
        Promise<Object> apply = Promise$.MODULE$.apply();
        getStatusCb().invoke(apply);
        return apply.future();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void taskOnMessage(EnvelopeBuffer envelopeBuffer) {
        countBeforeDelegate_$eq(0L);
        delegatingToTaskRunner_$eq(false);
        this.$outer.akka$remote$artery$aeron$AeronSource$$flightRecorder.aeronSourceReturnFromTaskRunner(System.nanoTime() - delegateTaskStartTime());
        freeSessionBuffers();
        onMessage(envelopeBuffer);
    }

    private void onMessage(EnvelopeBuffer envelopeBuffer) {
        this.$outer.akka$remote$artery$aeron$AeronSource$$flightRecorder.aeronSourceReceived(envelopeBuffer.byteBuffer().limit());
        push(this.$outer.out(), envelopeBuffer);
    }

    private void freeSessionBuffers() {
        if (delegatingToTaskRunner()) {
            return;
        }
        loop$1(pendingUnavailableImages());
        pendingUnavailableImages_$eq(package$.MODULE$.Nil());
    }

    @Override // akka.remote.artery.aeron.AeronSource.AeronLifecycle
    public void onUnavailableImage(int i) {
        try {
            onUnavailableImageCb().invoke(BoxesRunTime.boxToInteger(i));
        } catch (Throwable th) {
            if (th == null || NonFatal$.MODULE$.unapply(th).isEmpty()) {
                throw th;
            }
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        }
    }

    public static final /* synthetic */ void $anonfun$getStatusCb$1(AeronSource$$anon$2 aeronSource$$anon$2, Promise promise) {
        promise.success(BoxesRunTime.boxToLong(aeronSource$$anon$2.subscription().channelStatus()));
    }

    /* JADX WARN: Removed duplicated region for block: B:7:0x0034 A[LOOP:0: B:1:0x0000->B:7:0x0034, LOOP_END] */
    /* JADX WARN: Removed duplicated region for block: B:8:0x0062 A[SYNTHETIC] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private final void loop$1(scala.collection.immutable.List r5) {
        /*
            r4 = this;
        L0:
            r0 = r5
            r8 = r0
            scala.package$ r0 = scala.package$.MODULE$
            scala.collection.immutable.Nil$ r0 = r0.Nil()
            r1 = r8
            r9 = r1
            r1 = r0
            if (r1 != 0) goto L1a
        L12:
            r0 = r9
            if (r0 == 0) goto L22
            goto L29
        L1a:
            r1 = r9
            boolean r0 = r0.equals(r1)
            if (r0 == 0) goto L29
        L22:
            scala.runtime.BoxedUnit r0 = scala.runtime.BoxedUnit.UNIT
            r7 = r0
            goto L6c
        L29:
            goto L2c
        L2c:
            r0 = r8
            boolean r0 = r0 instanceof scala.collection.immutable.C$colon$colon
            if (r0 == 0) goto L5f
            r0 = r8
            scala.collection.immutable.$colon$colon r0 = (scala.collection.immutable.C$colon$colon) r0
            r10 = r0
            r0 = r10
            java.lang.Object r0 = r0.mo1689head()
            int r0 = scala.runtime.BoxesRunTime.unboxToInt(r0)
            r11 = r0
            r0 = r10
            scala.collection.immutable.List r0 = r0.next$access$1()
            r12 = r0
            r0 = r4
            akka.remote.artery.aeron.AeronSource$MessageHandler r0 = r0.messageHandler()
            akka.remote.artery.aeron.AeronSource$Fragments r0 = r0.fragmentsHandler()
            r1 = r11
            boolean r0 = r0.freeSessionBuffer(r1)
            r0 = r12
            r5 = r0
            goto L0
        L5f:
            goto L62
        L62:
            scala.MatchError r0 = new scala.MatchError
            r1 = r0
            r2 = r8
            r1.<init>(r2)
            throw r0
        L6c:
            scala.runtime.BoxedUnit r0 = scala.runtime.BoxedUnit.UNIT
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: akka.remote.artery.aeron.AeronSource$$anon$2.loop$1(scala.collection.immutable.List):void");
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public AeronSource$$anon$2(AeronSource aeronSource) {
        super(aeronSource.shape2());
        if (aeronSource == null) {
            throw null;
        }
        this.$outer = aeronSource;
        OutHandler.$init$(this);
        StageLogging.$init$(this);
        this.subscription = aeronSource.akka$remote$artery$aeron$AeronSource$$aeron.addSubscription(aeronSource.akka$remote$artery$aeron$AeronSource$$channel, aeronSource.akka$remote$artery$aeron$AeronSource$$streamId);
        this.backoffCount = aeronSource.akka$remote$artery$aeron$AeronSource$$spinning;
        this.delegateTaskStartTime = 0L;
        this.countBeforeDelegate = 0L;
        this.messageHandler = new AeronSource.MessageHandler(aeronSource.akka$remote$artery$aeron$AeronSource$$pool);
        this.addPollTask = new TaskRunner.Add(AeronSource$.MODULE$.akka$remote$artery$aeron$AeronSource$$pollTask(subscription(), messageHandler(), getAsyncCallback(envelopeBuffer -> {
            this.taskOnMessage(envelopeBuffer);
            return BoxedUnit.UNIT;
        })));
        this.delegatingToTaskRunner = false;
        this.pendingUnavailableImages = package$.MODULE$.Nil();
        this.onUnavailableImageCb = getAsyncCallback(i -> {
            this.pendingUnavailableImages_$eq(this.pendingUnavailableImages().$colon$colon(BoxesRunTime.boxToInteger(i)));
            this.freeSessionBuffers();
        });
        this.getStatusCb = getAsyncCallback(promise -> {
            $anonfun$getStatusCb$1(this, promise);
            return BoxedUnit.UNIT;
        });
        setHandler(aeronSource.out(), this);
    }
}
