package com.javanut.gl.impl.stage;

import com.javanut.gl.api.MsgRuntime;
import com.javanut.gl.impl.BuilderImpl;
import com.javanut.gl.impl.schema.TrafficAckSchema;
import com.javanut.gl.impl.schema.TrafficOrderSchema;
import com.javanut.gl.impl.schema.TrafficReleaseSchema;
import com.javanut.pronghorn.pipe.FragmentWriter;
import com.javanut.pronghorn.pipe.Pipe;
import com.javanut.pronghorn.stage.PronghornStage;
import com.javanut.pronghorn.stage.scheduling.GraphManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/javanut/gl/impl/stage/TrafficCopStage.class */
public class TrafficCopStage extends PronghornStage {
    private Pipe<TrafficOrderSchema> primaryIn;
    private Pipe<TrafficAckSchema>[] ackIn;
    private Pipe<TrafficReleaseSchema>[] goOut;
    private static final Logger logger;
    private int ackExpectedOn;
    private GraphManager graphManager;
    private final long msAckTimeout;
    private long ackExpectedTime;
    private int goPendingOnPipe;
    private int goPendingOnPipeCount;
    private MsgRuntime<?, ?, ?> runtime;
    private BuilderImpl builder;
    private boolean shutdownInProgress;
    static final /* synthetic */ boolean $assertionsDisabled;

    public static TrafficCopStage newInstance(GraphManager graphManager, long j, Pipe<TrafficOrderSchema> pipe, Pipe<TrafficAckSchema>[] pipeArr, Pipe<TrafficReleaseSchema>[] pipeArr2, MsgRuntime<?, ?, ?> msgRuntime, BuilderImpl builderImpl) {
        return new TrafficCopStage(graphManager, j, pipe, pipeArr, pipeArr2, msgRuntime, builderImpl);
    }

    public TrafficCopStage(GraphManager graphManager, long j, Pipe<TrafficOrderSchema> pipe, Pipe<TrafficAckSchema>[] pipeArr, Pipe<TrafficReleaseSchema>[] pipeArr2, MsgRuntime<?, ?, ?> msgRuntime, BuilderImpl builderImpl) {
        super(graphManager, join(pipeArr, new Pipe[]{pipe}), pipeArr2);
        this.ackExpectedOn = -1;
        this.goPendingOnPipe = -1;
        this.goPendingOnPipeCount = 0;
        if (!$assertionsDisabled && pipeArr.length != pipeArr2.length) {
            throw new AssertionError();
        }
        this.msAckTimeout = j;
        this.primaryIn = pipe;
        this.ackIn = pipeArr;
        this.goOut = pipeArr2;
        this.graphManager = graphManager;
        this.builder = builderImpl;
        this.runtime = msgRuntime;
        this.supportsBatchedPublish = false;
        this.supportsBatchedRelease = false;
        GraphManager.addNota(graphManager, "DOT_BACKGROUND", "cadetblue2", this);
        GraphManager.addNota(graphManager, "TRIGGER", "TRIGGER", this);
    }

    public String toString() {
        String pronghornStage = super.toString();
        int i = this.ackExpectedOn;
        return pronghornStage + (i >= 0 && i < this.ackIn.length && null != this.ackIn[i] ? " AckExpectedOn:" + i + " " + GraphManager.getRingProducer(this.graphManager, this.ackIn[i].id) : "");
    }

    public void run() {
        if (!this.shutdownInProgress) {
            int i = 100;
            do {
                if (this.ackExpectedOn >= 0 && null != this.ackIn[this.ackExpectedOn]) {
                    if (!Pipe.hasContentToRead(this.ackIn[this.ackExpectedOn])) {
                        if (System.currentTimeMillis() > this.ackExpectedTime) {
                            this.shutdownInProgress = true;
                            logger.info(" *** Expected to get ack back from " + GraphManager.getRingProducer(this.graphManager, this.ackIn[this.ackExpectedOn].id) + " within " + this.msAckTimeout + "ms \nExpected ack on pipe:" + this.ackIn[this.ackExpectedOn]);
                        }
                        detectShutdownInProgress();
                        return;
                    }
                    Pipe.skipNextFragment(this.ackIn[this.ackExpectedOn]);
                    this.ackExpectedOn = -1;
                }
                if (-1 == this.goPendingOnPipe) {
                    if (!Pipe.hasContentToRead(this.primaryIn) || this.builder.isChannelBlocked(this.primaryIn.id)) {
                        return;
                    }
                    int takeMsgIdx = Pipe.takeMsgIdx(this.primaryIn);
                    if (0 == takeMsgIdx) {
                        int takeInt = Pipe.takeInt(this.primaryIn);
                        this.ackExpectedOn = takeInt;
                        this.goPendingOnPipe = takeInt;
                        if (!$assertionsDisabled && this.goPendingOnPipe >= this.goOut.length) {
                            throw new AssertionError("Go command is out of bounds " + this.goPendingOnPipe + " vs " + this.goOut.length);
                        }
                        if (!$assertionsDisabled && this.goPendingOnPipe < 0) {
                            throw new AssertionError("Go command pipe must be positive");
                        }
                        this.goPendingOnPipeCount = Pipe.takeInt(this.primaryIn);
                        Pipe.confirmLowLevelRead(this.primaryIn, Pipe.sizeOf(this.primaryIn, 0));
                        Pipe.releaseReadLock(this.primaryIn);
                        this.ackExpectedTime = this.msAckTimeout > 0 ? this.msAckTimeout + System.currentTimeMillis() : Long.MAX_VALUE;
                    } else if (4 == takeMsgIdx) {
                        this.builder.blockChannelDuration(Pipe.takeLong(this.primaryIn), this.primaryIn.id);
                        Pipe.confirmLowLevelRead(this.primaryIn, Pipe.sizeOf(this.primaryIn, 4));
                        Pipe.releaseReadLock(this.primaryIn);
                    } else {
                        if (7 != takeMsgIdx) {
                            if (!$assertionsDisabled && -1 != takeMsgIdx) {
                                throw new AssertionError("Expected end of stream however got unsupported message: " + takeMsgIdx);
                            }
                            this.shutdownInProgress = true;
                            Pipe.confirmLowLevelRead(this.primaryIn, 2L);
                            Pipe.releaseReadLock(this.primaryIn);
                            return;
                        }
                        this.builder.blockChannelUntil(this.primaryIn.id, Pipe.takeLong(this.primaryIn));
                        Pipe.confirmLowLevelRead(this.primaryIn, Pipe.sizeOf(this.primaryIn, 7));
                        Pipe.releaseReadLock(this.primaryIn);
                    }
                }
                if (this.goPendingOnPipe != -1) {
                    if (Pipe.peekMsg(this.primaryIn, 0) && Pipe.peekInt(this.primaryIn, 1) == this.goPendingOnPipe && Pipe.hasContentToRead(this.primaryIn)) {
                        int takeMsgIdx2 = Pipe.takeMsgIdx(this.primaryIn);
                        if (takeMsgIdx2 != 0) {
                            if (!$assertionsDisabled && -1 != takeMsgIdx2) {
                                throw new AssertionError("Expected end of stream however got unsupported message: " + takeMsgIdx2);
                            }
                            this.shutdownInProgress = true;
                            Pipe.confirmLowLevelRead(this.primaryIn, 2L);
                            Pipe.releaseReadLock(this.primaryIn);
                            return;
                        }
                        int takeInt2 = Pipe.takeInt(this.primaryIn);
                        int takeInt3 = Pipe.takeInt(this.primaryIn);
                        if (!$assertionsDisabled && takeInt2 != this.goPendingOnPipe) {
                            throw new AssertionError();
                        }
                        this.goPendingOnPipeCount += takeInt3;
                        Pipe.confirmLowLevelRead(this.primaryIn, Pipe.sizeOf(this.primaryIn, 0));
                        Pipe.releaseReadLock(this.primaryIn);
                    }
                    Pipe<TrafficReleaseSchema> pipe = this.goOut[this.goPendingOnPipe];
                    if (null == pipe || !Pipe.hasRoomForWrite(pipe)) {
                        return;
                    }
                    FragmentWriter.writeI(pipe, 0, this.goPendingOnPipeCount);
                    this.goPendingOnPipe = -1;
                }
                i--;
            } while (i >= 0);
            return;
        }
        int length = this.goOut.length;
        while (true) {
            length--;
            if (length < 0) {
                Pipe.publishEOF(this.goOut);
                requestShutdown();
                this.runtime.shutdownRuntime();
                return;
            } else if (null != this.goOut[length] && !Pipe.hasRoomForWrite(this.goOut[length])) {
                return;
            }
        }
    }

    private void detectShutdownInProgress() {
        if (Pipe.peekMsg(this.primaryIn, -1)) {
            Pipe.takeMsgIdx(this.primaryIn);
            Pipe.confirmLowLevelRead(this.primaryIn, 2L);
            Pipe.releaseReadLock(this.primaryIn);
            this.shutdownInProgress = true;
        }
    }

    static {
        $assertionsDisabled = !TrafficCopStage.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(TrafficCopStage.class);
    }
}
