package akka.stream.impl.streamref;

import akka.actor.ActorRef;
import akka.actor.Terminated;
import akka.event.LoggingAdapter;
import akka.stream.Attributes;
import akka.stream.InvalidPartnerActorException;
import akka.stream.InvalidSequenceNumberException;
import akka.stream.Materializer;
import akka.stream.RemoteStreamRefActorTerminatedException;
import akka.stream.StreamRefAttributes;
import akka.stream.StreamRefSettings;
import akka.stream.StreamRefSubscriptionTimeoutException;
import akka.stream.SubscriptionWithCancelException;
import akka.stream.SubscriptionWithCancelException$NoMoreElementsNeeded$;
import akka.stream.impl.FixedSizeBuffer;
import akka.stream.impl.FixedSizeBuffer$;
import akka.stream.impl.streamref.SourceRefStageImpl;
import akka.stream.impl.streamref.StreamRefsProtocol;
import akka.stream.stage.GraphStageLogic;
import akka.stream.stage.OutHandler;
import akka.stream.stage.StageLogging;
import akka.stream.stage.TimerGraphStageLogic;
import akka.util.OptionVal$;
import akka.util.OptionVal$Some$;
import akka.util.PrettyDuration$;
import org.springframework.beans.PropertyAccessor;
import scala.Function1;
import scala.MatchError;
import scala.Tuple2;
import scala.concurrent.duration.FiniteDuration;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: SourceRefImpl.scala */
/* loaded from: input_file:BOOT-INF/lib/akka-stream_2.13-2.6.20.jar:akka/stream/impl/streamref/SourceRefStageImpl$$anon$1.class */
public final class SourceRefStageImpl$$anon$1 extends TimerGraphStageLogic implements StageLogging, SourceRefStageImpl.ActorRefStage, OutHandler {
    private final StreamRefsMaster streamRefsMaster;
    private final StreamRefSettings settings;
    private final StreamRefAttributes.SubscriptionTimeout subscriptionTimeout;
    private final int bufferCapacity;
    private final FiniteDuration demandRedeliveryInterval;
    private final FiniteDuration finalTerminationSignalDeadline;
    private final String stageActorName;
    private final GraphStageLogic.StageActor self;
    private final ActorRef ref;
    private SourceRefStageImpl.State state;
    private long expectingSeqNr;
    private long localCumulativeDemand;
    private int localRemainingRequested;
    private final FixedSizeBuffer.AbstractC0003FixedSizeBuffer<Out> receiveBuffer;
    private final SourceRefStageImpl.WatermarkRequestStrategy requestStrategy;
    private LoggingAdapter akka$stream$stage$StageLogging$$_log;
    private final /* synthetic */ SourceRefStageImpl $outer;

    @Override // akka.stream.stage.OutHandler
    public void onDownstreamFinish() throws Exception {
        onDownstreamFinish();
    }

    @Override // akka.stream.stage.StageLogging
    public LoggingAdapter log() {
        LoggingAdapter log;
        log = log();
        return log;
    }

    @Override // akka.stream.stage.StageLogging
    public LoggingAdapter akka$stream$stage$StageLogging$$_log() {
        return this.akka$stream$stage$StageLogging$$_log;
    }

    @Override // akka.stream.stage.StageLogging
    public void akka$stream$stage$StageLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.akka$stream$stage$StageLogging$$_log = loggingAdapter;
    }

    @Override // akka.stream.stage.StageLogging
    public Class<?> logSource() {
        return SourceRefStageImpl.class;
    }

    @Override // akka.stream.stage.GraphStageLogic
    public String stageActorName() {
        return this.stageActorName;
    }

    @Override // akka.stream.impl.streamref.SourceRefStageImpl.ActorRefStage
    public ActorRef ref() {
        return this.ref;
    }

    private ActorRef selfSender() {
        return ref();
    }

    private SourceRefStageImpl.State state() {
        return this.state;
    }

    private void state_$eq(SourceRefStageImpl.State state) {
        this.state = state;
    }

    private long expectingSeqNr() {
        return this.expectingSeqNr;
    }

    private void expectingSeqNr_$eq(long j) {
        this.expectingSeqNr = j;
    }

    private long localCumulativeDemand() {
        return this.localCumulativeDemand;
    }

    private void localCumulativeDemand_$eq(long j) {
        this.localCumulativeDemand = j;
    }

    private int localRemainingRequested() {
        return this.localRemainingRequested;
    }

    private void localRemainingRequested_$eq(int i) {
        this.localRemainingRequested = i;
    }

    private FixedSizeBuffer.AbstractC0003FixedSizeBuffer<Out> receiveBuffer() {
        return this.receiveBuffer;
    }

    private SourceRefStageImpl.WatermarkRequestStrategy requestStrategy() {
        return this.requestStrategy;
    }

    @Override // akka.stream.stage.GraphStageLogic
    public void preStart() {
        log().debug("[{}] Starting up with, self ref: {}, state: {}, subscription timeout: {}", stageActorName(), this.self.ref(), state(), PrettyDuration$.MODULE$.format(this.subscriptionTimeout.timeout()));
        scheduleOnce(SourceRefStageImpl$.MODULE$.SubscriptionTimeoutTimerKey(), this.subscriptionTimeout.timeout());
    }

    @Override // akka.stream.stage.OutHandler
    public void onPull() {
        tryPush();
        triggerCumulativeDemand();
    }

    public Function1<Tuple2<ActorRef, Object>, BoxedUnit> receiveRemoteMessage() {
        return tuple2 -> {
            $anonfun$receiveRemoteMessage$1(this, tuple2);
            return BoxedUnit.UNIT;
        };
    }

    @Override // akka.stream.stage.TimerGraphStageLogic
    public void onTimer(Object obj) {
        String SubscriptionTimeoutTimerKey = SourceRefStageImpl$.MODULE$.SubscriptionTimeoutTimerKey();
        if (SubscriptionTimeoutTimerKey != null ? SubscriptionTimeoutTimerKey.equals(obj) : obj == null) {
            SourceRefStageImpl.State state = state();
            if (SourceRefStageImpl$AwaitingPartner$.MODULE$.equals(state) ? true : state instanceof SourceRefStageImpl.AwaitingSubscription) {
                throw new StreamRefSubscriptionTimeoutException(new StringBuilder(0).append(new StringBuilder(76).append(PropertyAccessor.PROPERTY_KEY_PREFIX).append(stageActorName()).append("] Remote side did not subscribe (materialize) handed out Sink reference [").append(ref()).append("],").toString()).append(new StringBuilder(30).append("within subscription timeout: ").append(PrettyDuration$.MODULE$.format(this.subscriptionTimeout.timeout())).append("!").toString()).toString());
            }
            log().debug("[{}] Ignoring subscription timeout in state [{}]", stageActorName(), state);
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            return;
        }
        String DemandRedeliveryTimerKey = SourceRefStageImpl$.MODULE$.DemandRedeliveryTimerKey();
        if (DemandRedeliveryTimerKey != null ? DemandRedeliveryTimerKey.equals(obj) : obj == null) {
            SourceRefStageImpl.State state2 = state();
            if (state2 instanceof SourceRefStageImpl.Running) {
                ActorRef partner = ((SourceRefStageImpl.Running) state2).partner();
                log().debug("[{}] Scheduled re-delivery of demand until [{}]", stageActorName(), BoxesRunTime.boxToLong(localCumulativeDemand()));
                partner.$bang(new StreamRefsProtocol.CumulativeDemand(localCumulativeDemand()), selfSender());
                scheduleDemandRedelivery();
                BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
            } else {
                log().debug("[{}] Ignoring demand redelivery timeout in state [{}]", stageActorName(), state2);
                BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
            }
            BoxedUnit boxedUnit5 = BoxedUnit.UNIT;
            return;
        }
        String TerminationDeadlineTimerKey = SourceRefStageImpl$.MODULE$.TerminationDeadlineTimerKey();
        if (TerminationDeadlineTimerKey != null ? TerminationDeadlineTimerKey.equals(obj) : obj == null) {
            SourceRefStageImpl.State state3 = state();
            if (state3 instanceof SourceRefStageImpl.UpstreamTerminated) {
                ActorRef partner2 = ((SourceRefStageImpl.UpstreamTerminated) state3).partner();
                log().debug("[{}] Remote partner [{}] has terminated unexpectedly and no clean completion/failure message was received", stageActorName(), partner2);
                failStage(new RemoteStreamRefActorTerminatedException(new StringBuilder(109).append(new StringBuilder(102).append(PropertyAccessor.PROPERTY_KEY_PREFIX).append(stageActorName()).append("] Remote partner [").append(partner2).append("] has terminated unexpectedly and no clean completion/failure message was received ").toString()).append("(possible reasons: network partition or subscription timeout triggered termination of partner). Tearing down.").toString()));
                BoxedUnit boxedUnit6 = BoxedUnit.UNIT;
            } else {
                if (!SourceRefStageImpl$AwaitingPartner$.MODULE$.equals(state3)) {
                    throw new IllegalStateException(new StringBuilder(50).append("TerminationDeadlineTimerKey can't happen in state ").append(state3).toString());
                }
                log().debug("[{}] Downstream cancelled, but timeout hit before we saw a partner", stageActorName());
                cancelStage(SubscriptionWithCancelException$NoMoreElementsNeeded$.MODULE$);
                BoxedUnit boxedUnit7 = BoxedUnit.UNIT;
            }
            BoxedUnit boxedUnit8 = BoxedUnit.UNIT;
            return;
        }
        String CancellationDeadlineTimerKey = SourceRefStageImpl$.MODULE$.CancellationDeadlineTimerKey();
        if (CancellationDeadlineTimerKey != null ? !CancellationDeadlineTimerKey.equals(obj) : obj != null) {
            throw new IllegalArgumentException(new StringBuilder(19).append("Unknown timer key: ").append(obj).toString());
        }
        SourceRefStageImpl.State state4 = state();
        if (!(state4 instanceof SourceRefStageImpl.WaitingForCancelAck)) {
            throw new IllegalStateException(new StringBuilder(54).append(PropertyAccessor.PROPERTY_KEY_PREFIX).append(stageActorName()).append("] CancellationDeadlineTimerKey can't happen in state ").append(state4).toString());
        }
        SourceRefStageImpl.WaitingForCancelAck waitingForCancelAck = (SourceRefStageImpl.WaitingForCancelAck) state4;
        ActorRef partner3 = waitingForCancelAck.partner();
        Throwable cause = waitingForCancelAck.cause();
        log().debug("[{}] Waiting for remote ack from [{}] for downstream failure timed out, failing stage with original downstream failure", stageActorName(), partner3);
        cancelStage(cause);
        BoxedUnit boxedUnit9 = BoxedUnit.UNIT;
        BoxedUnit boxedUnit10 = BoxedUnit.UNIT;
    }

    @Override // akka.stream.stage.OutHandler
    public void onDownstreamFinish(Throwable th) {
        BoxedUnit boxedUnit;
        SourceRefStageImpl.State state = state();
        if (state instanceof SourceRefStageImpl.Running) {
            triggerCancellationExchange(((SourceRefStageImpl.Running) state).partner(), th);
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            return;
        }
        if (SourceRefStageImpl$AwaitingPartner$.MODULE$.equals(state)) {
            scheduleOnce(SourceRefStageImpl$.MODULE$.TerminationDeadlineTimerKey(), this.finalTerminationSignalDeadline);
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
            return;
        }
        if (state instanceof SourceRefStageImpl.AwaitingSubscription) {
            triggerCancellationExchange(((SourceRefStageImpl.AwaitingSubscription) state).partner(), th);
            BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
            return;
        }
        if (state instanceof SourceRefStageImpl.UpstreamCompleted) {
            if (receiveBuffer().nonEmpty()) {
                log().debug("[{}] Downstream cancelled with elements [{}] in buffer, dropping elements", stageActorName(), BoxesRunTime.boxToInteger(receiveBuffer().used()));
            }
            if (th instanceof SubscriptionWithCancelException.NonFailureCancellation) {
                completeStage();
                BoxedUnit boxedUnit5 = BoxedUnit.UNIT;
            } else {
                failStage(th);
                BoxedUnit boxedUnit6 = BoxedUnit.UNIT;
            }
            BoxedUnit boxedUnit7 = BoxedUnit.UNIT;
            return;
        }
        if (state instanceof SourceRefStageImpl.WaitingForCancelAck) {
            throw new UnsupportedOperationException(new StringBuilder(54).append(PropertyAccessor.PROPERTY_KEY_PREFIX).append(stageActorName()).append("] Didn't expect state ").append(state()).append(" when downstream finished with ").append(th).toString());
        }
        if (!(state instanceof SourceRefStageImpl.UpstreamTerminated)) {
            throw new MatchError(state);
        }
        log().debug("[{}] Downstream cancelled with elements [{}] in buffer", stageActorName(), BoxesRunTime.boxToInteger(receiveBuffer().used()));
        if (receiveBuffer().isEmpty()) {
            failStage(new RemoteStreamRefActorTerminatedException(new StringBuilder(26).append(PropertyAccessor.PROPERTY_KEY_PREFIX).append(stageActorName()).append("] unexpectedly terminated").toString()));
            boxedUnit = BoxedUnit.UNIT;
        } else {
            tryPush();
            boxedUnit = BoxedUnit.UNIT;
        }
    }

    private void triggerCancellationExchange(ActorRef actorRef, Throwable th) {
        Object remoteStreamFailure;
        if (receiveBuffer().nonEmpty()) {
            log().debug("Downstream cancelled with elements [{}] in buffer, dropping elements", BoxesRunTime.boxToInteger(receiveBuffer().used()));
        }
        if (th instanceof SubscriptionWithCancelException.NonFailureCancellation) {
            log().debug("[{}] Deferred stop on downstream cancel", stageActorName());
            remoteStreamFailure = new StreamRefsProtocol.RemoteStreamCompleted(expectingSeqNr());
        } else {
            log().debug("[{}] Deferred stop on downstream failure: {}", stageActorName(), th);
            remoteStreamFailure = new StreamRefsProtocol.RemoteStreamFailure("Downstream failed");
        }
        this.self.unwatch(actorRef);
        actorRef.$bang(remoteStreamFailure, selfSender());
        state_$eq(new SourceRefStageImpl.WaitingForCancelAck(actorRef, th));
        scheduleOnce(SourceRefStageImpl$.MODULE$.CancellationDeadlineTimerKey(), this.subscriptionTimeout.timeout());
        setKeepGoing(true);
    }

    public void triggerCumulativeDemand() {
        int requestDemand;
        if (receiveBuffer().remainingCapacity() - localRemainingRequested() <= 0 || (requestDemand = requestStrategy().requestDemand(receiveBuffer().used() + localRemainingRequested())) <= 0) {
            return;
        }
        SourceRefStageImpl.State state = state();
        if (state instanceof SourceRefStageImpl.Running) {
            ActorRef partner = ((SourceRefStageImpl.Running) state).partner();
            log().debug("[{}] Demanding until [{}] (+{})", stageActorName(), BoxesRunTime.boxToLong(localCumulativeDemand()), BoxesRunTime.boxToInteger(requestDemand));
            sendDemand$1(partner, requestDemand);
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            return;
        }
        if (!(state instanceof SourceRefStageImpl.AwaitingSubscription)) {
            log().debug("[{}] Partner ref not set up in state {}, demanding elements deferred", stageActorName(), state);
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        } else {
            ActorRef partner2 = ((SourceRefStageImpl.AwaitingSubscription) state).partner();
            log().debug("[{}] Demanding, before subscription seen, until [{}] (+{})", stageActorName(), BoxesRunTime.boxToLong(localCumulativeDemand()), BoxesRunTime.boxToInteger(requestDemand));
            sendDemand$1(partner2, requestDemand);
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        }
    }

    private void tryPush() {
        if (receiveBuffer().nonEmpty() && isAvailable(this.$outer.out())) {
            push(this.$outer.out(), receiveBuffer().dequeue());
        } else if (receiveBuffer().isEmpty()) {
            if (!(state() instanceof SourceRefStageImpl.UpstreamCompleted)) {
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                completeStage();
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            }
        }
    }

    private void onReceiveElement(Out out) {
        localRemainingRequested_$eq(localRemainingRequested() - 1);
        if (receiveBuffer().isEmpty() && isAvailable(this.$outer.out())) {
            push(this.$outer.out(), out);
        } else {
            if (receiveBuffer().isFull()) {
                throw new IllegalStateException(new StringBuilder(0).append("Attempted to overflow buffer! ").append(new StringBuilder(32).append("Capacity: ").append(receiveBuffer().capacity()).append(", incoming element: ").append(out).append(", ").toString()).append(new StringBuilder(50).append("localRemainingRequested: ").append(localRemainingRequested()).append(", localCumulativeDemand: ").append(localCumulativeDemand()).toString()).toString());
            }
            receiveBuffer().enqueue(out);
        }
    }

    private void verifyPartner(ActorRef actorRef, ActorRef actorRef2) {
        if (actorRef == null) {
            if (actorRef2 == null) {
                return;
            }
        } else if (actorRef.equals(actorRef2)) {
            return;
        }
        throw new InvalidPartnerActorException(actorRef2, actorRef, new StringBuilder(0).append(new StringBuilder(47).append(PropertyAccessor.PROPERTY_KEY_PREFIX).append(stageActorName()).append("] Received message from UNEXPECTED sender [").append(actorRef).append("]! ").toString()).append(new StringBuilder(72).append("This actor is NOT our trusted remote partner, which is [").append(actorRef2).append("]. Tearing down.").toString()).toString());
    }

    private void observeAndValidateSequenceNr(long j, String str) {
        if (isInvalidSequenceNr(j)) {
            log().warning("[{}] {}, expected {} but was {}", stageActorName(), str, BoxesRunTime.boxToLong(expectingSeqNr()), BoxesRunTime.boxToLong(j));
            throw new InvalidSequenceNumberException(expectingSeqNr(), j, str);
        }
        expectingSeqNr_$eq(expectingSeqNr() + 1);
    }

    private boolean isInvalidSequenceNr(long j) {
        return j != expectingSeqNr();
    }

    private void scheduleDemandRedelivery() {
        scheduleOnce(SourceRefStageImpl$.MODULE$.DemandRedeliveryTimerKey(), this.demandRedeliveryInterval);
    }

    public static final /* synthetic */ void $anonfun$receiveRemoteMessage$1(SourceRefStageImpl$$anon$1 sourceRefStageImpl$$anon$1, Tuple2 tuple2) {
        if (tuple2 != null) {
            ActorRef actorRef = (ActorRef) tuple2.mo5703_1();
            Object mo5702_2 = tuple2.mo5702_2();
            if (mo5702_2 instanceof StreamRefsProtocol.OnSubscribeHandshake) {
                StreamRefsProtocol.OnSubscribeHandshake onSubscribeHandshake = (StreamRefsProtocol.OnSubscribeHandshake) mo5702_2;
                ActorRef targetRef = onSubscribeHandshake.targetRef();
                SourceRefStageImpl.State state = sourceRefStageImpl$$anon$1.state();
                if (SourceRefStageImpl$AwaitingPartner$.MODULE$.equals(state)) {
                    sourceRefStageImpl$$anon$1.cancelTimer(SourceRefStageImpl$.MODULE$.SubscriptionTimeoutTimerKey());
                    sourceRefStageImpl$$anon$1.log().debug("[{}] Received on subscribe handshake {} while awaiting partner from {}", sourceRefStageImpl$$anon$1.stageActorName(), onSubscribeHandshake, targetRef);
                    sourceRefStageImpl$$anon$1.state_$eq(new SourceRefStageImpl.Running(targetRef));
                    sourceRefStageImpl$$anon$1.self.watch(targetRef);
                    sourceRefStageImpl$$anon$1.triggerCumulativeDemand();
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                } else {
                    if (!(state instanceof SourceRefStageImpl.AwaitingSubscription)) {
                        throw new IllegalStateException(new StringBuilder(28).append(PropertyAccessor.PROPERTY_KEY_PREFIX).append(sourceRefStageImpl$$anon$1.stageActorName()).append("] Got unexpected ").append(onSubscribeHandshake).append(" in state ").append(state).toString());
                    }
                    sourceRefStageImpl$$anon$1.verifyPartner(actorRef, ((SourceRefStageImpl.AwaitingSubscription) state).partner());
                    sourceRefStageImpl$$anon$1.cancelTimer(SourceRefStageImpl$.MODULE$.SubscriptionTimeoutTimerKey());
                    sourceRefStageImpl$$anon$1.log().debug("[{}] Received on subscribe handshake {} while awaiting subscription from {}", sourceRefStageImpl$$anon$1.stageActorName(), onSubscribeHandshake, targetRef);
                    sourceRefStageImpl$$anon$1.state_$eq(new SourceRefStageImpl.Running(targetRef));
                    sourceRefStageImpl$$anon$1.triggerCumulativeDemand();
                    BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                }
                BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
                return;
            }
        }
        if (tuple2 != null) {
            ActorRef actorRef2 = (ActorRef) tuple2.mo5703_1();
            Object mo5702_22 = tuple2.mo5702_2();
            if (mo5702_22 instanceof StreamRefsProtocol.SequencedOnNext) {
                StreamRefsProtocol.SequencedOnNext sequencedOnNext = (StreamRefsProtocol.SequencedOnNext) mo5702_22;
                long seqNr = sequencedOnNext.seqNr();
                Object payload = sequencedOnNext.payload();
                if (payload instanceof Object) {
                    sourceRefStageImpl$$anon$1.observeAndValidateSequenceNr(seqNr, "Illegal sequence nr in SequencedOnNext");
                    SourceRefStageImpl.State state2 = sourceRefStageImpl$$anon$1.state();
                    if (state2 instanceof SourceRefStageImpl.AwaitingSubscription) {
                        ActorRef partner = ((SourceRefStageImpl.AwaitingSubscription) state2).partner();
                        sourceRefStageImpl$$anon$1.verifyPartner(actorRef2, partner);
                        sourceRefStageImpl$$anon$1.log().debug("[{}] Received seq {} from {}", sourceRefStageImpl$$anon$1.stageActorName(), sequencedOnNext, actorRef2);
                        sourceRefStageImpl$$anon$1.state_$eq(new SourceRefStageImpl.Running(partner));
                        sourceRefStageImpl$$anon$1.onReceiveElement(payload);
                        sourceRefStageImpl$$anon$1.triggerCumulativeDemand();
                        BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
                    } else if (state2 instanceof SourceRefStageImpl.Running) {
                        sourceRefStageImpl$$anon$1.verifyPartner(actorRef2, ((SourceRefStageImpl.Running) state2).partner());
                        sourceRefStageImpl$$anon$1.onReceiveElement(payload);
                        sourceRefStageImpl$$anon$1.triggerCumulativeDemand();
                        BoxedUnit boxedUnit5 = BoxedUnit.UNIT;
                    } else {
                        if (SourceRefStageImpl$AwaitingPartner$.MODULE$.equals(state2)) {
                            throw new IllegalStateException(new StringBuilder(35).append(PropertyAccessor.PROPERTY_KEY_PREFIX).append(sourceRefStageImpl$$anon$1.stageActorName()).append("] Got ").append(sequencedOnNext).append(" from ").append(actorRef2).append(" while AwaitingPartner").toString());
                        }
                        if (state2 instanceof SourceRefStageImpl.WaitingForCancelAck) {
                            sourceRefStageImpl$$anon$1.verifyPartner(actorRef2, ((SourceRefStageImpl.WaitingForCancelAck) state2).partner());
                            sourceRefStageImpl$$anon$1.log().warning("[{}] Got element from remote but downstream cancelled, dropping element of type {}", sourceRefStageImpl$$anon$1.stageActorName(), payload.getClass());
                            BoxedUnit boxedUnit6 = BoxedUnit.UNIT;
                        } else {
                            if (state2 instanceof SourceRefStageImpl.UpstreamCompleted) {
                                sourceRefStageImpl$$anon$1.verifyPartner(actorRef2, ((SourceRefStageImpl.UpstreamCompleted) state2).partner());
                                throw new IllegalStateException(new StringBuilder(88).append(PropertyAccessor.PROPERTY_KEY_PREFIX).append(sourceRefStageImpl$$anon$1.stageActorName()).append("] Got completion and then received more elements from ").append(actorRef2).append(", this is not supposed to happen.").toString());
                            }
                            if (!(state2 instanceof SourceRefStageImpl.UpstreamTerminated)) {
                                throw new MatchError(state2);
                            }
                            sourceRefStageImpl$$anon$1.verifyPartner(actorRef2, ((SourceRefStageImpl.UpstreamTerminated) state2).partner());
                            sourceRefStageImpl$$anon$1.log().debug("[{}] Received element after partner terminated");
                            sourceRefStageImpl$$anon$1.onReceiveElement(payload);
                            BoxedUnit boxedUnit7 = BoxedUnit.UNIT;
                        }
                    }
                    BoxedUnit boxedUnit8 = BoxedUnit.UNIT;
                    return;
                }
            }
        }
        if (tuple2 != null) {
            ActorRef actorRef3 = (ActorRef) tuple2.mo5703_1();
            Object mo5702_23 = tuple2.mo5702_2();
            if (mo5702_23 instanceof StreamRefsProtocol.RemoteStreamCompleted) {
                long seqNr2 = ((StreamRefsProtocol.RemoteStreamCompleted) mo5702_23).seqNr();
                sourceRefStageImpl$$anon$1.observeAndValidateSequenceNr(seqNr2, "Illegal sequence nr in RemoteSinkCompleted");
                SourceRefStageImpl.State state3 = sourceRefStageImpl$$anon$1.state();
                if (state3 instanceof SourceRefStageImpl.Running) {
                    ActorRef partner2 = ((SourceRefStageImpl.Running) state3).partner();
                    sourceRefStageImpl$$anon$1.verifyPartner(actorRef3, partner2);
                    sourceRefStageImpl$$anon$1.log().debug("[{}] The remote stream has completed, emitting {} elements left in buffer before completing", sourceRefStageImpl$$anon$1.stageActorName(), BoxesRunTime.boxToInteger(sourceRefStageImpl$$anon$1.receiveBuffer().used()));
                    sourceRefStageImpl$$anon$1.self.unwatch(actorRef3);
                    sourceRefStageImpl$$anon$1.state_$eq(new SourceRefStageImpl.UpstreamCompleted(partner2));
                    sourceRefStageImpl$$anon$1.tryPush();
                    BoxedUnit boxedUnit9 = BoxedUnit.UNIT;
                } else {
                    if (!(state3 instanceof SourceRefStageImpl.WaitingForCancelAck)) {
                        throw new IllegalStateException(new StringBuilder(67).append(PropertyAccessor.PROPERTY_KEY_PREFIX).append(sourceRefStageImpl$$anon$1.stageActorName()).append("] Saw RemoteStreamCompleted(").append(seqNr2).append(") while in state ").append(state3).append(", should never happen").toString());
                    }
                    sourceRefStageImpl$$anon$1.log().debug("[{}] Upstream completed while waiting for cancel ack", sourceRefStageImpl$$anon$1.stageActorName());
                    BoxedUnit boxedUnit10 = BoxedUnit.UNIT;
                }
                BoxedUnit boxedUnit11 = BoxedUnit.UNIT;
                return;
            }
        }
        if (tuple2 != null) {
            ActorRef actorRef4 = (ActorRef) tuple2.mo5703_1();
            Object mo5702_24 = tuple2.mo5702_2();
            if (mo5702_24 instanceof StreamRefsProtocol.RemoteStreamFailure) {
                String msg = ((StreamRefsProtocol.RemoteStreamFailure) mo5702_24).msg();
                SourceRefStageImpl.State state4 = sourceRefStageImpl$$anon$1.state();
                if (!(state4 instanceof SourceRefStageImpl.WeKnowPartner)) {
                    throw new IllegalStateException(new StringBuilder(64).append(PropertyAccessor.PROPERTY_KEY_PREFIX).append(sourceRefStageImpl$$anon$1.stageActorName()).append("] got RemoteStreamFailure(").append(msg).append(") when in state ").append(state4).append(", should never happen").toString());
                }
                sourceRefStageImpl$$anon$1.verifyPartner(actorRef4, ((SourceRefStageImpl.WeKnowPartner) state4).partner());
                sourceRefStageImpl$$anon$1.log().debug("[{}] The remote stream has failed, failing (reason: {})", sourceRefStageImpl$$anon$1.stageActorName(), msg);
                sourceRefStageImpl$$anon$1.failStage(new RemoteStreamRefActorTerminatedException(new StringBuilder(36).append(PropertyAccessor.PROPERTY_KEY_PREFIX).append(sourceRefStageImpl$$anon$1.stageActorName()).append("] Remote stream (").append(actorRef4.path()).append(") failed, reason: ").append(msg).toString()));
                BoxedUnit boxedUnit12 = BoxedUnit.UNIT;
                BoxedUnit boxedUnit13 = BoxedUnit.UNIT;
                return;
            }
        }
        if (tuple2 != null) {
            ActorRef actorRef5 = (ActorRef) tuple2.mo5703_1();
            if (StreamRefsProtocol$Ack$.MODULE$.equals(tuple2.mo5702_2())) {
                SourceRefStageImpl.State state5 = sourceRefStageImpl$$anon$1.state();
                if (!(state5 instanceof SourceRefStageImpl.WaitingForCancelAck)) {
                    throw new IllegalStateException(new StringBuilder(28).append(PropertyAccessor.PROPERTY_KEY_PREFIX).append(sourceRefStageImpl$$anon$1.stageActorName()).append("] Got an Ack when in state ").append(state5).toString());
                }
                SourceRefStageImpl.WaitingForCancelAck waitingForCancelAck = (SourceRefStageImpl.WaitingForCancelAck) state5;
                ActorRef partner3 = waitingForCancelAck.partner();
                Throwable cause = waitingForCancelAck.cause();
                sourceRefStageImpl$$anon$1.verifyPartner(actorRef5, partner3);
                sourceRefStageImpl$$anon$1.log().debug("[{}] Got cancellation ack from remote, canceling", sourceRefStageImpl$$anon$1.stageActorName());
                sourceRefStageImpl$$anon$1.cancelStage(cause);
                BoxedUnit boxedUnit14 = BoxedUnit.UNIT;
                BoxedUnit boxedUnit15 = BoxedUnit.UNIT;
                return;
            }
        }
        if (tuple2 != null) {
            Object mo5702_25 = tuple2.mo5702_2();
            if (mo5702_25 instanceof Terminated) {
                ActorRef actor = ((Terminated) mo5702_25).actor();
                SourceRefStageImpl.State state6 = sourceRefStageImpl$$anon$1.state();
                if (!(state6 instanceof SourceRefStageImpl.WeKnowPartner)) {
                    throw new IllegalStateException(new StringBuilder(72).append(PropertyAccessor.PROPERTY_KEY_PREFIX).append(sourceRefStageImpl$$anon$1.stageActorName()).append("] Unexpected deathwatch message for ").append(actor).append(" before we knew partner ref, state ").append(state6).toString());
                }
                SourceRefStageImpl.WeKnowPartner weKnowPartner = (SourceRefStageImpl.WeKnowPartner) state6;
                ActorRef partner4 = weKnowPartner.partner();
                if (partner4 != null ? !partner4.equals(actor) : actor != null) {
                    throw new RemoteStreamRefActorTerminatedException(new StringBuilder(0).append(new StringBuilder(45).append(PropertyAccessor.PROPERTY_KEY_PREFIX).append(sourceRefStageImpl$$anon$1.stageActorName()).append("] Received UNEXPECTED Terminated(").append(actor).append(") message! ").toString()).append(new StringBuilder(73).append("This actor was NOT our trusted remote partner, which was: ").append(weKnowPartner.partner()).append(". Tearing down.").toString()).toString());
                }
                sourceRefStageImpl$$anon$1.scheduleOnce(SourceRefStageImpl$.MODULE$.TerminationDeadlineTimerKey(), sourceRefStageImpl$$anon$1.finalTerminationSignalDeadline);
                sourceRefStageImpl$$anon$1.log().debug("[{}] Partner terminated, starting delayed shutdown, deadline: [{}]", sourceRefStageImpl$$anon$1.stageActorName(), sourceRefStageImpl$$anon$1.finalTerminationSignalDeadline);
                sourceRefStageImpl$$anon$1.state_$eq(new SourceRefStageImpl.UpstreamTerminated(weKnowPartner.partner()));
                BoxedUnit boxedUnit16 = BoxedUnit.UNIT;
                BoxedUnit boxedUnit17 = BoxedUnit.UNIT;
                return;
            }
        }
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        throw new IllegalStateException(new StringBuilder(39).append(PropertyAccessor.PROPERTY_KEY_PREFIX).append(sourceRefStageImpl$$anon$1.stageActorName()).append("] Unexpected message in state ").append(sourceRefStageImpl$$anon$1.state()).append(": ").append(tuple2.mo5702_2()).append(" from ").append((ActorRef) tuple2.mo5703_1()).toString());
    }

    private final void sendDemand$1(ActorRef actorRef, int i) {
        localCumulativeDemand_$eq(localCumulativeDemand() + i);
        localRemainingRequested_$eq(localRemainingRequested() + i);
        actorRef.$bang(new StreamRefsProtocol.CumulativeDemand(localCumulativeDemand()), selfSender());
        scheduleDemandRedelivery();
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public SourceRefStageImpl$$anon$1(SourceRefStageImpl sourceRefStageImpl, Materializer materializer, Attributes attributes) {
        super(sourceRefStageImpl.shape2());
        SourceRefStageImpl.State state;
        if (sourceRefStageImpl == null) {
            throw null;
        }
        this.$outer = sourceRefStageImpl;
        StageLogging.$init$(this);
        OutHandler.$init$(this);
        this.streamRefsMaster = (StreamRefsMaster) StreamRefsMaster$.MODULE$.apply(materializer.system());
        this.settings = materializer.settings().streamRefSettings();
        this.subscriptionTimeout = (StreamRefAttributes.SubscriptionTimeout) attributes.get(new StreamRefAttributes.SubscriptionTimeout(this.settings.subscriptionTimeout()), ClassTag$.MODULE$.apply(StreamRefAttributes.SubscriptionTimeout.class));
        this.bufferCapacity = ((StreamRefAttributes.BufferCapacity) attributes.get(new StreamRefAttributes.BufferCapacity(this.settings.bufferCapacity()), ClassTag$.MODULE$.apply(StreamRefAttributes.BufferCapacity.class))).capacity();
        this.demandRedeliveryInterval = ((StreamRefAttributes.DemandRedeliveryInterval) attributes.get(new StreamRefAttributes.DemandRedeliveryInterval(this.settings.demandRedeliveryInterval()), ClassTag$.MODULE$.apply(StreamRefAttributes.DemandRedeliveryInterval.class))).timeout();
        this.finalTerminationSignalDeadline = ((StreamRefAttributes.FinalTerminationSignalDeadline) attributes.get(new StreamRefAttributes.FinalTerminationSignalDeadline(this.settings.finalTerminationSignalDeadline()), ClassTag$.MODULE$.apply(StreamRefAttributes.FinalTerminationSignalDeadline.class))).timeout();
        this.stageActorName = this.streamRefsMaster.nextSourceRefStageName();
        this.self = getEagerStageActor(materializer, receiveRemoteMessage());
        this.ref = this.self.ref();
        ActorRef actorRef = (ActorRef) OptionVal$Some$.MODULE$.unapply(sourceRefStageImpl.initialPartnerRef());
        if (OptionVal$.MODULE$.isEmpty$extension(actorRef)) {
            state = SourceRefStageImpl$AwaitingPartner$.MODULE$;
        } else {
            ActorRef actorRef2 = (ActorRef) OptionVal$.MODULE$.get$extension(actorRef);
            this.self.watch(actorRef2);
            state = new SourceRefStageImpl.AwaitingSubscription(actorRef2);
        }
        this.state = state;
        this.expectingSeqNr = 0L;
        this.localCumulativeDemand = 0L;
        this.localRemainingRequested = 0;
        this.receiveBuffer = FixedSizeBuffer$.MODULE$.apply(this.bufferCapacity);
        this.requestStrategy = SourceRefStageImpl$WatermarkRequestStrategy$.MODULE$.apply(receiveBuffer().capacity());
        setHandler(sourceRefStageImpl.out(), this);
    }
}
