package com.javanut.gl.impl.blocking;

import com.javanut.gl.api.MsgRuntime;
import com.javanut.gl.api.PubSubFixedTopicService;
import com.javanut.gl.api.TickListener;
import com.javanut.gl.impl.BuilderImpl;
import com.javanut.pronghorn.pipe.ChannelReader;
import com.javanut.pronghorn.pipe.DataOutputBlobWriter;
import com.javanut.pronghorn.pipe.Pipe;
import com.javanut.pronghorn.pipe.PipeConfigManager;
import com.javanut.pronghorn.pipe.RawDataSchema;
import com.javanut.pronghorn.stage.PronghornStage;

/* loaded from: input_file:com/javanut/gl/impl/blocking/JoinBlockingBehavior.class */
public class JoinBlockingBehavior implements TickListener {
    private Pipe<RawDataSchema>[] inputPipes;
    private PubSubFixedTopicService[] targetService;
    private TargetSelector selector;

    public JoinBlockingBehavior(MsgRuntime<?, ?, ?> msgRuntime, Pipe<RawDataSchema>[] pipeArr, String[] strArr, TargetSelector targetSelector) {
        this.inputPipes = pipeArr;
        this.selector = targetSelector;
        PipeConfigManager pipeConfigManager = new PipeConfigManager(4, 32, 64);
        BuilderImpl builder = MsgRuntime.builder(msgRuntime);
        int maxVarLength = PronghornStage.maxVarLength(pipeArr);
        int minimumFragmentsOnPipe = pipeArr[0].config().minimumFragmentsOnPipe();
        this.targetService = new PubSubFixedTopicService[strArr.length];
        int length = strArr.length;
        while (true) {
            length--;
            if (length < 0) {
                return;
            } else {
                this.targetService[length] = builder.newCommandChannel(msgRuntime.constructingParallelInstance(), pipeConfigManager).newPubSubService(strArr[length], minimumFragmentsOnPipe, maxVarLength);
            }
        }
    }

    @Override // com.javanut.gl.impl.TickListenerBase
    public void tickEvent() {
        int length = this.inputPipes.length;
        while (true) {
            length--;
            if (length < 0) {
                return;
            }
            Pipe<RawDataSchema> pipe = this.inputPipes[length];
            while (Pipe.hasContentToRead(this.inputPipes[length])) {
                ChannelReader peekInputStream = Pipe.peekInputStream(pipe, 1);
                if (Pipe.peekMsg(pipe, 0)) {
                    int absolutePosition = peekInputStream.absolutePosition();
                    int pickTargetIdx = this.selector.pickTargetIdx(peekInputStream);
                    peekInputStream.absolutePosition(absolutePosition);
                    if (this.targetService[pickTargetIdx].publishTopic(channelWriter -> {
                        peekInputStream.readInto(channelWriter, peekInputStream.available());
                        peekInputStream.readFromEndInto((DataOutputBlobWriter) channelWriter);
                    })) {
                        Pipe.skipNextFragment(pipe);
                    }
                } else if (Pipe.peekMsg(pipe, -1)) {
                    Pipe.skipNextFragment(pipe);
                }
            }
        }
    }
}
