package com.javanut.gl.impl.stage;

import com.javanut.gl.api.MsgRuntime;
import com.javanut.gl.impl.BuilderImpl;
import com.javanut.gl.impl.schema.IngressMessages;
import com.javanut.gl.impl.schema.MessagePubSub;
import com.javanut.gl.impl.schema.MessageSubscription;
import com.javanut.gl.impl.schema.TrafficAckSchema;
import com.javanut.gl.impl.schema.TrafficReleaseSchema;
import com.javanut.gl.impl.stage.MessagePubSubImpl;
import com.javanut.pronghorn.pipe.Pipe;
import com.javanut.pronghorn.pipe.PipeReader;
import com.javanut.pronghorn.pipe.util.hash.IntHashTable;
import com.javanut.pronghorn.stage.scheduling.GraphManager;
import com.javanut.pronghorn.util.Appendables;
import java.util.Arrays;

/* loaded from: input_file:com/javanut/gl/impl/stage/MessagePubSubTrafficStage.class */
public class MessagePubSubTrafficStage extends AbstractTrafficOrderedStage {
    MessagePubSubImpl data;
    static final /* synthetic */ boolean $assertionsDisabled;

    public MessagePubSubTrafficStage(GraphManager graphManager, MsgRuntime<?, ?, ?> msgRuntime, IntHashTable intHashTable, BuilderImpl builderImpl, Pipe<IngressMessages>[] pipeArr, Pipe<MessagePubSub>[] pipeArr2, Pipe<TrafficReleaseSchema>[] pipeArr3, Pipe<TrafficAckSchema>[] pipeArr4, Pipe<MessageSubscription>[] pipeArr5) {
        super(graphManager, msgRuntime, builderImpl, join(pipeArr, pipeArr2), pipeArr3, pipeArr4, pipeArr5);
        this.data = new MessagePubSubImpl(false, -1, new MessagePubSubTrace());
        this.data.construct(this, graphManager, intHashTable, builderImpl, pipeArr, pipeArr2, pipeArr3, pipeArr4, pipeArr5);
    }

    @Override // com.javanut.gl.impl.stage.AbstractTrafficOrderedStage
    public void startup() {
        super.startup();
        this.data.startupPubSub(this.hardware);
    }

    /* JADX WARN: Code restructure failed: missing block: B:31:0x00a0, code lost:
    
        super.run();
     */
    @Override // com.javanut.gl.impl.stage.AbstractTrafficOrderedStage
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void run() {
        /*
            r4 = this;
        L0:
            r0 = r4
            com.javanut.gl.impl.stage.MessagePubSubImpl r0 = r0.data
            r1 = 0
            r0.foundWork = r1
            r0 = r4
            com.javanut.gl.impl.stage.MessagePubSubImpl r0 = r0.data
            int r0 = r0.pendingPublishCount
            if (r0 <= 0) goto L41
            r0 = r4
            com.javanut.gl.impl.stage.MessagePubSubImpl r0 = r0.data
            r0.processPending()
            r0 = r4
            com.javanut.gl.impl.stage.MessagePubSubImpl r0 = r0.data
            int r0 = r0.pendingPublishCount
            if (r0 <= 0) goto L24
            return
        L24:
            r0 = r4
            com.javanut.gl.impl.stage.MessagePubSubImpl r0 = r0.data
            boolean r0 = r0.pendingIngress
            if (r0 == 0) goto L41
            r0 = r4
            com.javanut.gl.impl.stage.MessagePubSubImpl r0 = r0.data
            com.javanut.pronghorn.pipe.Pipe<com.javanut.gl.impl.schema.IngressMessages>[] r0 = r0.ingressMessagePipes
            r1 = r4
            com.javanut.gl.impl.stage.MessagePubSubImpl r1 = r1.data
            int r1 = r1.pendingReleaseCountIdx
            r0 = r0[r1]
            int r0 = com.javanut.pronghorn.pipe.PipeReader.releaseReadLock(r0)
        L41:
            r0 = r4
            com.javanut.gl.impl.stage.MessagePubSubImpl r0 = r0.data
            com.javanut.pronghorn.pipe.Pipe<com.javanut.gl.impl.schema.IngressMessages>[] r0 = r0.ingressMessagePipes
            int r0 = r0.length
            r5 = r0
        L4a:
            int r5 = r5 + (-1)
            r0 = r5
            if (r0 < 0) goto La0
            r0 = r4
            com.javanut.gl.impl.stage.MessagePubSubImpl r0 = r0.data
            com.javanut.pronghorn.pipe.Pipe<com.javanut.gl.impl.schema.IngressMessages>[] r0 = r0.ingressMessagePipes
            r1 = r5
            r0 = r0[r1]
            r6 = r0
        L5b:
            r0 = r6
            boolean r0 = com.javanut.pronghorn.pipe.PipeReader.tryReadFragment(r0)
            if (r0 == 0) goto L9d
            r0 = r4
            com.javanut.gl.impl.stage.MessagePubSubImpl r0 = r0.data
            r1 = r6
            int r0 = r0.ingressCollectSubLists(r1)
            r7 = r0
            r0 = r7
            if (r0 < 0) goto L9a
            r0 = r4
            com.javanut.gl.impl.stage.MessagePubSubImpl r0 = r0.data
            r1 = r6
            r2 = r7
            r0.readIngress(r1, r2)
            r0 = r4
            com.javanut.gl.impl.stage.MessagePubSubImpl r0 = r0.data
            int r0 = r0.pendingPublishCount
            if (r0 <= 0) goto L95
            r0 = r4
            com.javanut.gl.impl.stage.MessagePubSubImpl r0 = r0.data
            com.javanut.gl.impl.stage.MessagePubSubImpl$PubType r1 = com.javanut.gl.impl.stage.MessagePubSubImpl.PubType.Message
            r0.pendingDeliveryType = r1
            r0 = r4
            com.javanut.gl.impl.stage.MessagePubSubImpl r0 = r0.data
            r1 = r5
            r0.pendingReleaseCountIdx = r1
            return
        L95:
            r0 = r6
            int r0 = com.javanut.pronghorn.pipe.PipeReader.releaseReadLock(r0)
        L9a:
            goto L5b
        L9d:
            goto L4a
        La0:
            r0 = r4
            super.run()
            r0 = r4
            com.javanut.gl.impl.stage.MessagePubSubImpl r0 = r0.data
            boolean r0 = r0.foundWork
            if (r0 != 0) goto L0
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: com.javanut.gl.impl.stage.MessagePubSubTrafficStage.run():void");
    }

    @Override // com.javanut.gl.impl.stage.AbstractTrafficOrderedStage
    protected void processMessagesForPipe(int i) {
        Pipe<MessagePubSub> pipe = this.data.incomingSubsAndPubsPipe[i];
        long[] jArr = this.data.consumedMarks[i];
        while (isPreviousConsumed(this.data, i) && PipeReader.hasContentToRead(pipe) && hasReleaseCountRemaining(i) && isChannelUnBlocked(i) && this.data.isNotBlockedByStateChange(pipe) && PipeReader.tryReadFragment(pipe)) {
            this.data.foundWork = true;
            switch (PipeReader.getMsgIdx(pipe)) {
                case 0:
                    this.data.addSubscription(pipe);
                    PipeReader.releaseReadLock(this.data.incomingSubsAndPubsPipe[i]);
                    decReleaseCount(i);
                    break;
                case 4:
                    this.data.unsubscribe(pipe);
                    PipeReader.releaseReadLock(this.data.incomingSubsAndPubsPipe[i]);
                    decReleaseCount(i);
                    break;
                case 8:
                    int readInt = PipeReader.readInt(pipe, 1);
                    byte[] readBytesBackingArray = PipeReader.readBytesBackingArray(pipe, MessagePubSub.MSG_PUBLISH_103_FIELD_TOPIC_1);
                    int readBytesPosition = PipeReader.readBytesPosition(pipe, MessagePubSub.MSG_PUBLISH_103_FIELD_TOPIC_1);
                    int readBytesLength = PipeReader.readBytesLength(pipe, MessagePubSub.MSG_PUBLISH_103_FIELD_TOPIC_1);
                    int readBytesMask = PipeReader.readBytesMask(pipe, MessagePubSub.MSG_PUBLISH_103_FIELD_TOPIC_1);
                    int subscriptionListIdx = this.data.subscriptionListIdx(readBytesBackingArray, readBytesPosition, readBytesLength, readBytesMask);
                    this.data.collectAllSubscriptionLists(readBytesBackingArray, readBytesPosition, readBytesLength, readBytesMask);
                    IntHashTable.clear(this.data.deDupeTable);
                    if (subscriptionListIdx < 0) {
                        MessagePubSubImpl.logger.info("no subscribers on topic: {} ", Appendables.appendUTF8(new StringBuilder(), readBytesBackingArray, readBytesPosition, readBytesLength, readBytesMask));
                        PipeReader.releaseReadLock(this.data.incomingSubsAndPubsPipe[i]);
                        decReleaseCount(i);
                        break;
                    } else {
                        this.data.readNormalMessages(i, pipe, jArr, readInt, subscriptionListIdx);
                        if (this.data.pendingPublishCount > 0) {
                            MessagePubSubImpl.logger.warn("Message PubSub pipes have become full, you may want to consider fewer messages or longer pipes for MessagePubSub outgoing in graph {} [2]", this.data.graphName);
                            this.data.pendingDeliveryType = MessagePubSubImpl.PubType.Message;
                            this.data.pendingReleaseCountIdx = i;
                            return;
                        }
                        break;
                    }
                case MessagePubSub.MSG_CHANGESTATE_70 /* 13 */:
                    if (!$assertionsDisabled && this.data.stateChangeInFlight != -1) {
                        throw new AssertionError("Attempting to process state change before all listeners have consumed the in flight change");
                    }
                    this.data.newState = PipeReader.readInt(pipe, 1);
                    if (this.data.currentState != this.data.newState) {
                        this.data.stateChangeInFlight = i;
                        this.data.pendingAck[i] = true;
                        this.data.requiredConsumes[i] = jArr.length;
                        for (int i2 = 0; i2 < this.data.outgoingMessagePipes.length; i2++) {
                            this.data.copyToSubscriberState(this.data.currentState, this.data.newState, i2, jArr);
                        }
                    } else {
                        PipeReader.releaseReadLock(this.data.incomingSubsAndPubsPipe[i]);
                        decReleaseCount(i);
                    }
                    if (this.data.pendingPublishCount > 0) {
                        MessagePubSubImpl.logger.warn("State change pipes have become full, you may want to consider fewer state changes or longer pipes for MessagePubSub outgoing");
                        this.data.pendingDeliveryType = MessagePubSubImpl.PubType.State;
                        this.data.pendingReleaseCountIdx = i;
                        return;
                    }
                    this.data.currentState = this.data.newState;
                    break;
                    break;
            }
        }
    }

    boolean isPreviousConsumed(MessagePubSubImpl messagePubSubImpl, int i) {
        long[] jArr = messagePubSubImpl.consumedMarks[i];
        int countUnconsumed = messagePubSubImpl.countUnconsumed(jArr, 0);
        if (jArr.length - countUnconsumed < messagePubSubImpl.requiredConsumes[i]) {
            return false;
        }
        if (countUnconsumed > 0) {
            Arrays.fill(jArr, 0L);
        }
        if (!messagePubSubImpl.pendingAck[i]) {
            return true;
        }
        PipeReader.releaseReadLock(messagePubSubImpl.incomingSubsAndPubsPipe[i]);
        decReleaseCount(i);
        messagePubSubImpl.pendingAck[i] = false;
        if (messagePubSubImpl.stateChangeInFlight != i) {
            return true;
        }
        messagePubSubImpl.stateChangeInFlight = -1;
        return true;
    }

    static {
        $assertionsDisabled = !MessagePubSubTrafficStage.class.desiredAssertionStatus();
    }
}
