package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.metrics.WatermarkGauge;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/io/StreamOneInputProcessor.class */
public final class StreamOneInputProcessor<IN> implements StreamInputProcessor {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) StreamOneInputProcessor.class);
    private final StreamTaskInput input;
    private final Object lock;
    private final OperatorChain<?, ?> operatorChain;
    private StatusWatermarkValve statusWatermarkValve;
    private final StreamStatusMaintainer streamStatusMaintainer;
    private final OneInputStreamOperator<IN, ?> streamOperator;
    private final WatermarkGauge watermarkGauge;
    private Counter numRecordsIn;

    /* loaded from: input_file:org/apache/flink/streaming/runtime/io/StreamOneInputProcessor$ForwardingValveOutputHandler.class */
    private class ForwardingValveOutputHandler implements StatusWatermarkValve.ValveOutputHandler {
        private final OneInputStreamOperator<IN, ?> operator;
        private final Object lock;

        private ForwardingValveOutputHandler(OneInputStreamOperator<IN, ?> oneInputStreamOperator, Object obj) {
            this.operator = (OneInputStreamOperator) Preconditions.checkNotNull(oneInputStreamOperator);
            this.lock = Preconditions.checkNotNull(obj);
        }

        @Override // org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.ValveOutputHandler
        public void handleWatermark(Watermark watermark) {
            try {
                synchronized (this.lock) {
                    StreamOneInputProcessor.this.watermarkGauge.setCurrentWatermark(watermark.getTimestamp());
                    this.operator.processWatermark(watermark);
                }
            } catch (Exception e) {
                throw new RuntimeException("Exception occurred while processing valve output watermark: ", e);
            }
        }

        @Override // org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.ValveOutputHandler
        public void handleStreamStatus(StreamStatus streamStatus) {
            try {
                synchronized (this.lock) {
                    StreamOneInputProcessor.this.streamStatusMaintainer.toggleStreamStatus(streamStatus);
                }
            } catch (Exception e) {
                throw new RuntimeException("Exception occurred while processing valve output stream status: ", e);
            }
        }
    }

    public StreamOneInputProcessor(InputGate[] inputGateArr, TypeSerializer<IN> typeSerializer, StreamTask<?, ?> streamTask, CheckpointingMode checkpointingMode, Object obj, IOManager iOManager, Configuration configuration, StreamStatusMaintainer streamStatusMaintainer, OneInputStreamOperator<IN, ?> oneInputStreamOperator, TaskIOMetricGroup taskIOMetricGroup, WatermarkGauge watermarkGauge, String str, OperatorChain<?, ?> operatorChain) throws IOException {
        InputGate createInputGate = InputGateUtil.createInputGate(inputGateArr);
        CheckpointedInputGate createCheckpointedInputGate = InputProcessorUtil.createCheckpointedInputGate(streamTask, checkpointingMode, iOManager, createInputGate, configuration, str);
        this.input = new StreamTaskNetworkInput(createCheckpointedInputGate, typeSerializer, iOManager, 0);
        this.lock = Preconditions.checkNotNull(obj);
        this.streamStatusMaintainer = (StreamStatusMaintainer) Preconditions.checkNotNull(streamStatusMaintainer);
        this.streamOperator = (OneInputStreamOperator) Preconditions.checkNotNull(oneInputStreamOperator);
        this.statusWatermarkValve = new StatusWatermarkValve(createInputGate.getNumberOfInputChannels(), new ForwardingValveOutputHandler(oneInputStreamOperator, obj));
        this.watermarkGauge = watermarkGauge;
        createCheckpointedInputGate.getClass();
        taskIOMetricGroup.gauge("checkpointAlignmentTime", (String) createCheckpointedInputGate::getAlignmentDurationNanos);
        this.operatorChain = (OperatorChain) Preconditions.checkNotNull(operatorChain);
    }

    @Override // org.apache.flink.streaming.runtime.io.StreamInputProcessor
    public boolean processInput() throws Exception {
        initializeNumRecordsIn();
        StreamElement pollNextNullable = this.input.pollNextNullable();
        if (pollNextNullable == null) {
            this.input.isAvailable().get();
            return !checkFinished();
        }
        int lastChannel = this.input.getLastChannel();
        Preconditions.checkState(lastChannel != -1);
        processElement(pollNextNullable, lastChannel);
        return true;
    }

    private boolean checkFinished() throws Exception {
        boolean isFinished = this.input.isFinished();
        if (isFinished) {
            synchronized (this.lock) {
                this.operatorChain.endInput(1);
            }
        }
        return isFinished;
    }

    private void processElement(StreamElement streamElement, int i) throws Exception {
        if (streamElement.isRecord()) {
            StreamRecord<?> asRecord = streamElement.asRecord();
            synchronized (this.lock) {
                this.numRecordsIn.inc();
                this.streamOperator.setKeyContextElement1(asRecord);
                this.streamOperator.processElement(asRecord);
            }
            return;
        }
        if (streamElement.isWatermark()) {
            this.statusWatermarkValve.inputWatermark(streamElement.asWatermark(), i);
            return;
        }
        if (streamElement.isStreamStatus()) {
            this.statusWatermarkValve.inputStreamStatus(streamElement.asStreamStatus(), i);
        } else {
            if (!streamElement.isLatencyMarker()) {
                throw new UnsupportedOperationException("Unknown type of StreamElement");
            }
            synchronized (this.lock) {
                this.streamOperator.processLatencyMarker(streamElement.asLatencyMarker());
            }
        }
    }

    private void initializeNumRecordsIn() {
        if (this.numRecordsIn == null) {
            try {
                this.numRecordsIn = ((OperatorMetricGroup) this.streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
            } catch (Exception e) {
                LOG.warn("An exception occurred during the metrics setup.", (Throwable) e);
                this.numRecordsIn = new SimpleCounter();
            }
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.input.close();
    }
}
