package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.operators.InputSelectable;
import org.apache.flink.streaming.api.operators.InputSelection;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor.class */
public final class StreamTwoInputSelectableProcessor<IN1, IN2> implements StreamInputProcessor {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) StreamTwoInputSelectableProcessor.class);
    private static final CompletableFuture<?> UNAVAILABLE = new CompletableFuture<>();
    private final TwoInputStreamOperator<IN1, IN2, ?> streamOperator;
    private final InputSelectable inputSelector;
    private final Object lock;
    private final StreamTaskInput input1;
    private final StreamTaskInput input2;
    private final OperatorChain<?, ?> operatorChain;
    private final StatusWatermarkValve statusWatermarkValve1;
    private final StatusWatermarkValve statusWatermarkValve2;
    private StreamStatus firstStatus;
    private StreamStatus secondStatus;
    private int availableInputsMask;
    private int lastReadInputIndex;
    private InputSelection inputSelection;
    private Counter numRecordsIn;
    private boolean isPrepared;

    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/StreamTwoInputSelectableProcessor$ForwardingValveOutputHandler.class */
    private class ForwardingValveOutputHandler implements StatusWatermarkValve.ValveOutputHandler {
        private final TwoInputStreamOperator<IN1, IN2, ?> operator;
        private final Object lock;
        private final StreamStatusMaintainer streamStatusMaintainer;
        private final WatermarkGauge inputWatermarkGauge;
        private final int inputIndex;

        private ForwardingValveOutputHandler(TwoInputStreamOperator<IN1, IN2, ?> twoInputStreamOperator, Object obj, StreamStatusMaintainer streamStatusMaintainer, WatermarkGauge watermarkGauge, int i) {
            this.operator = (TwoInputStreamOperator) Preconditions.checkNotNull(twoInputStreamOperator);
            this.lock = Preconditions.checkNotNull(obj);
            this.streamStatusMaintainer = (StreamStatusMaintainer) Preconditions.checkNotNull(streamStatusMaintainer);
            this.inputWatermarkGauge = watermarkGauge;
            this.inputIndex = i;
        }

        @Override // org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.ValveOutputHandler
        public void handleWatermark(Watermark watermark) {
            try {
                synchronized (this.lock) {
                    this.inputWatermarkGauge.setCurrentWatermark(watermark.getTimestamp());
                    if (this.inputIndex == 0) {
                        this.operator.processWatermark1(watermark);
                    } else {
                        this.operator.processWatermark2(watermark);
                    }
                }
            } catch (Exception e) {
                throw new RuntimeException("Exception occurred while processing valve output watermark of input" + (this.inputIndex + 1) + ": ", e);
            }
        }

        @Override // org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.ValveOutputHandler
        public void handleStreamStatus(StreamStatus streamStatus) {
            StreamStatus streamStatus2;
            try {
                synchronized (this.lock) {
                    if (this.inputIndex == 0) {
                        StreamTwoInputSelectableProcessor.this.firstStatus = streamStatus;
                        streamStatus2 = StreamTwoInputSelectableProcessor.this.secondStatus;
                    } else {
                        StreamTwoInputSelectableProcessor.this.secondStatus = streamStatus;
                        streamStatus2 = StreamTwoInputSelectableProcessor.this.firstStatus;
                    }
                    if (!streamStatus.equals(this.streamStatusMaintainer.getStreamStatus())) {
                        if (streamStatus.isActive()) {
                            this.streamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
                        } else if (streamStatus2.isIdle()) {
                            this.streamStatusMaintainer.toggleStreamStatus(StreamStatus.IDLE);
                        }
                    }
                }
            } catch (Exception e) {
                throw new RuntimeException("Exception occurred while processing valve output stream status of input" + (this.inputIndex + 1) + ": ", e);
            }
        }
    }

    public StreamTwoInputSelectableProcessor(Collection<InputGate> collection, Collection<InputGate> collection2, TypeSerializer<IN1> typeSerializer, TypeSerializer<IN2> typeSerializer2, StreamTask<?, ?> streamTask, CheckpointingMode checkpointingMode, Object obj, IOManager iOManager, Configuration configuration, StreamStatusMaintainer streamStatusMaintainer, TwoInputStreamOperator<IN1, IN2, ?> twoInputStreamOperator, WatermarkGauge watermarkGauge, WatermarkGauge watermarkGauge2, String str, OperatorChain<?, ?> operatorChain) throws IOException {
        Preconditions.checkState(twoInputStreamOperator instanceof InputSelectable);
        this.streamOperator = (TwoInputStreamOperator) Preconditions.checkNotNull(twoInputStreamOperator);
        this.inputSelector = (InputSelectable) twoInputStreamOperator;
        this.lock = Preconditions.checkNotNull(obj);
        InputGate createInputGate = InputGateUtil.createInputGate((InputGate[]) collection.toArray(new InputGate[0]));
        InputGate createInputGate2 = InputGateUtil.createInputGate((InputGate[]) collection2.toArray(new InputGate[0]));
        CheckpointedInputGate[] createCheckpointedInputGatePair = InputProcessorUtil.createCheckpointedInputGatePair(streamTask, checkpointingMode, iOManager, createInputGate, createInputGate2, configuration, str);
        Preconditions.checkState(createCheckpointedInputGatePair.length == 2);
        this.input1 = new StreamTaskNetworkInput(createCheckpointedInputGatePair[0], typeSerializer, iOManager, 0);
        this.input2 = new StreamTaskNetworkInput(createCheckpointedInputGatePair[1], typeSerializer2, iOManager, 1);
        this.statusWatermarkValve1 = new StatusWatermarkValve(createInputGate.getNumberOfInputChannels(), new ForwardingValveOutputHandler(twoInputStreamOperator, obj, streamStatusMaintainer, watermarkGauge, 0));
        this.statusWatermarkValve2 = new StatusWatermarkValve(createInputGate2.getNumberOfInputChannels(), new ForwardingValveOutputHandler(twoInputStreamOperator, obj, streamStatusMaintainer, watermarkGauge2, 1));
        this.operatorChain = (OperatorChain) Preconditions.checkNotNull(operatorChain);
        this.firstStatus = StreamStatus.ACTIVE;
        this.secondStatus = StreamStatus.ACTIVE;
        this.availableInputsMask = (int) new InputSelection.Builder().select(1).select(2).build().getInputMask();
        this.lastReadInputIndex = 1;
        this.isPrepared = false;
    }

    @Override // org.apache.flink.streaming.runtime.io.StreamInputProcessor
    public boolean processInput() throws Exception {
        StreamElement pollNextNullable;
        if (!this.isPrepared) {
            prepareForProcessing();
        }
        int selectNextReadingInputIndex = selectNextReadingInputIndex();
        if (selectNextReadingInputIndex == -1) {
            return false;
        }
        this.lastReadInputIndex = selectNextReadingInputIndex;
        if (selectNextReadingInputIndex == 0) {
            pollNextNullable = this.input1.pollNextNullable();
            if (pollNextNullable != null) {
                processElement1(pollNextNullable, this.input1.getLastChannel());
            }
        } else {
            pollNextNullable = this.input2.pollNextNullable();
            if (pollNextNullable != null) {
                processElement2(pollNextNullable, this.input2.getLastChannel());
            }
        }
        if (pollNextNullable == null) {
            setUnavailableInput(selectNextReadingInputIndex);
        }
        return !checkFinished();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        IOException iOException = null;
        try {
            this.input1.close();
        } catch (IOException e) {
            iOException = (IOException) ExceptionUtils.firstOrSuppressed(e, null);
        }
        try {
            this.input2.close();
        } catch (IOException e2) {
            iOException = (IOException) ExceptionUtils.firstOrSuppressed(e2, iOException);
        }
        if (iOException != null) {
            throw iOException;
        }
    }

    private int selectNextReadingInputIndex() throws InterruptedException, ExecutionException, IOException {
        do {
            int fairSelectNextIndexOutOf2 = this.inputSelection.fairSelectNextIndexOutOf2(this.availableInputsMask, this.lastReadInputIndex);
            if (fairSelectNextIndexOutOf2 != -1) {
                if (this.availableInputsMask < 3 && this.inputSelection.isALLMaskOf2()) {
                    checkAndSetAvailable(1 - fairSelectNextIndexOutOf2);
                }
                return fairSelectNextIndexOutOf2;
            }
        } while (waitForAvailableInput(this.inputSelection));
        return -1;
    }

    private void processElement1(StreamElement streamElement, int i) throws Exception {
        if (streamElement.isRecord()) {
            StreamRecord<?> asRecord = streamElement.asRecord();
            synchronized (this.lock) {
                this.numRecordsIn.inc();
                this.streamOperator.setKeyContextElement1(asRecord);
                this.streamOperator.processElement1(asRecord);
                this.inputSelection = this.inputSelector.nextSelection();
            }
            return;
        }
        if (streamElement.isWatermark()) {
            this.statusWatermarkValve1.inputWatermark(streamElement.asWatermark(), i);
            return;
        }
        if (streamElement.isStreamStatus()) {
            this.statusWatermarkValve1.inputStreamStatus(streamElement.asStreamStatus(), i);
        } else {
            if (!streamElement.isLatencyMarker()) {
                throw new UnsupportedOperationException("Unknown type of StreamElement on input1");
            }
            synchronized (this.lock) {
                this.streamOperator.processLatencyMarker1(streamElement.asLatencyMarker());
            }
        }
    }

    private void processElement2(StreamElement streamElement, int i) throws Exception {
        if (streamElement.isRecord()) {
            StreamRecord<?> asRecord = streamElement.asRecord();
            synchronized (this.lock) {
                this.numRecordsIn.inc();
                this.streamOperator.setKeyContextElement2(asRecord);
                this.streamOperator.processElement2(asRecord);
                this.inputSelection = this.inputSelector.nextSelection();
            }
            return;
        }
        if (streamElement.isWatermark()) {
            this.statusWatermarkValve2.inputWatermark(streamElement.asWatermark(), i);
            return;
        }
        if (streamElement.isStreamStatus()) {
            this.statusWatermarkValve2.inputStreamStatus(streamElement.asStreamStatus(), i);
        } else {
            if (!streamElement.isLatencyMarker()) {
                throw new UnsupportedOperationException("Unknown type of StreamElement on input2");
            }
            synchronized (this.lock) {
                this.streamOperator.processLatencyMarker2(streamElement.asLatencyMarker());
            }
        }
    }

    private void prepareForProcessing() {
        this.inputSelection = this.inputSelector.nextSelection();
        try {
            this.numRecordsIn = ((OperatorMetricGroup) this.streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
        } catch (Exception e) {
            LOG.warn("An exception occurred during the metrics setup.", (Throwable) e);
            this.numRecordsIn = new SimpleCounter();
        }
        this.isPrepared = true;
    }

    private void checkAndSetAvailable(int i) {
        StreamTaskInput input = getInput(i);
        if (input.isFinished() || !input.isAvailable().isDone()) {
            return;
        }
        setAvailableInput(i);
    }

    private boolean waitForAvailableInput(InputSelection inputSelection) throws ExecutionException, InterruptedException, IOException {
        if (inputSelection.isALLMaskOf2()) {
            return waitForAvailableEitherInput();
        }
        waitForOneInput(inputSelection.getInputMask() == InputSelection.FIRST.getInputMask() ? this.input1 : this.input2);
        return true;
    }

    private boolean waitForAvailableEitherInput() throws ExecutionException, InterruptedException {
        CompletableFuture<?> isAvailable = this.input1.isFinished() ? UNAVAILABLE : this.input1.isAvailable();
        CompletableFuture<?> isAvailable2 = this.input2.isFinished() ? UNAVAILABLE : this.input2.isAvailable();
        if (isAvailable == UNAVAILABLE && isAvailable2 == UNAVAILABLE) {
            return false;
        }
        CompletableFuture.anyOf(isAvailable, isAvailable2).get();
        if (isAvailable.isDone()) {
            setAvailableInput(this.input1.getInputIndex());
        }
        if (!isAvailable2.isDone()) {
            return true;
        }
        setAvailableInput(this.input2.getInputIndex());
        return true;
    }

    private void waitForOneInput(StreamTaskInput streamTaskInput) throws IOException, ExecutionException, InterruptedException {
        if (streamTaskInput.isFinished()) {
            throw new IOException("Could not read the finished input: input" + (streamTaskInput.getInputIndex() + 1) + ".");
        }
        streamTaskInput.isAvailable().get();
        setAvailableInput(streamTaskInput.getInputIndex());
    }

    private boolean checkFinished() throws Exception {
        if (getInput(this.lastReadInputIndex).isFinished()) {
            synchronized (this.lock) {
                this.operatorChain.endInput(getInputId(this.lastReadInputIndex));
                this.inputSelection = this.inputSelector.nextSelection();
            }
        }
        return this.input1.isFinished() && this.input2.isFinished();
    }

    private void setAvailableInput(int i) {
        this.availableInputsMask |= 1 << i;
    }

    private void setUnavailableInput(int i) {
        this.availableInputsMask &= (1 << i) ^ (-1);
    }

    private StreamTaskInput getInput(int i) {
        return i == 0 ? this.input1 : this.input2;
    }

    private int getInputId(int i) {
        return i + 1;
    }
}
