package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/io/CheckpointBarrierAligner.class */
public class CheckpointBarrierAligner extends CheckpointBarrierHandler {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) CheckpointBarrierAligner.class);
    private final boolean[] blockedChannels;
    private final int totalNumberOfInputChannels;
    private final String taskName;
    private long currentCheckpointId;
    private int numBarriersReceived;
    private int numClosedChannels;
    private long startOfAlignmentTimestamp;
    private long latestAlignmentDurationNanos;

    /* JADX INFO: Access modifiers changed from: package-private */
    public CheckpointBarrierAligner(int i, String str, @Nullable AbstractInvokable abstractInvokable) {
        super(abstractInvokable);
        this.currentCheckpointId = -1L;
        this.totalNumberOfInputChannels = i;
        this.taskName = str;
        this.blockedChannels = new boolean[i];
    }

    @Override // org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler
    public void releaseBlocksAndResetBarriers() throws IOException {
        LOG.debug("{}: End of stream alignment, feeding buffered data back.", this.taskName);
        for (int i = 0; i < this.blockedChannels.length; i++) {
            this.blockedChannels[i] = false;
        }
        this.numBarriersReceived = 0;
        if (this.startOfAlignmentTimestamp > 0) {
            this.latestAlignmentDurationNanos = System.nanoTime() - this.startOfAlignmentTimestamp;
            this.startOfAlignmentTimestamp = 0L;
        }
    }

    @Override // org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler
    public boolean isBlocked(int i) {
        return this.blockedChannels[i];
    }

    @Override // org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler
    public boolean processBarrier(CheckpointBarrier checkpointBarrier, int i, long j) throws Exception {
        long id = checkpointBarrier.getId();
        if (this.totalNumberOfInputChannels == 1) {
            if (id <= this.currentCheckpointId) {
                return false;
            }
            this.currentCheckpointId = id;
            notifyCheckpoint(checkpointBarrier, j, this.latestAlignmentDurationNanos);
            return false;
        }
        boolean z = false;
        if (this.numBarriersReceived > 0) {
            if (id == this.currentCheckpointId) {
                onBarrier(i);
            } else {
                if (id <= this.currentCheckpointId) {
                    return false;
                }
                LOG.warn("{}: Received checkpoint barrier for checkpoint {} before completing current checkpoint {}. Skipping current checkpoint.", this.taskName, Long.valueOf(id), Long.valueOf(this.currentCheckpointId));
                notifyAbort(this.currentCheckpointId, new CheckpointException("Barrier id: " + id, CheckpointFailureReason.CHECKPOINT_DECLINED_SUBSUMED));
                releaseBlocksAndResetBarriers();
                z = true;
                beginNewAlignment(id, i);
            }
        } else {
            if (id <= this.currentCheckpointId) {
                return false;
            }
            beginNewAlignment(id, i);
        }
        if (this.numBarriersReceived + this.numClosedChannels != this.totalNumberOfInputChannels) {
            return z;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("{}: Received all barriers, triggering checkpoint {} at {}.", this.taskName, Long.valueOf(checkpointBarrier.getId()), Long.valueOf(checkpointBarrier.getTimestamp()));
        }
        releaseBlocksAndResetBarriers();
        notifyCheckpoint(checkpointBarrier, j, this.latestAlignmentDurationNanos);
        return true;
    }

    protected void beginNewAlignment(long j, int i) throws IOException {
        this.currentCheckpointId = j;
        onBarrier(i);
        this.startOfAlignmentTimestamp = System.nanoTime();
        if (LOG.isDebugEnabled()) {
            LOG.debug("{}: Starting stream alignment for checkpoint {}.", this.taskName, Long.valueOf(j));
        }
    }

    protected void onBarrier(int i) throws IOException {
        if (this.blockedChannels[i]) {
            throw new IOException("Stream corrupt: Repeated barrier for same checkpoint on input " + i);
        }
        this.blockedChannels[i] = true;
        this.numBarriersReceived++;
        if (LOG.isDebugEnabled()) {
            LOG.debug("{}: Received barrier from channel {}.", this.taskName, Integer.valueOf(i));
        }
    }

    @Override // org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler
    public boolean processCancellationBarrier(CancelCheckpointMarker cancelCheckpointMarker) throws Exception {
        long checkpointId = cancelCheckpointMarker.getCheckpointId();
        if (this.totalNumberOfInputChannels == 1) {
            if (checkpointId <= this.currentCheckpointId) {
                return false;
            }
            this.currentCheckpointId = checkpointId;
            notifyAbortOnCancellationBarrier(checkpointId);
            return false;
        }
        if (this.numBarriersReceived <= 0) {
            if (checkpointId <= this.currentCheckpointId) {
                return false;
            }
            this.currentCheckpointId = checkpointId;
            this.startOfAlignmentTimestamp = 0L;
            this.latestAlignmentDurationNanos = 0L;
            if (LOG.isDebugEnabled()) {
                LOG.debug("{}: Checkpoint {} canceled, skipping alignment.", this.taskName, Long.valueOf(checkpointId));
            }
            notifyAbortOnCancellationBarrier(checkpointId);
            return false;
        }
        if (checkpointId == this.currentCheckpointId) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("{}: Checkpoint {} canceled, aborting alignment.", this.taskName, Long.valueOf(checkpointId));
            }
            releaseBlocksAndResetBarriers();
            notifyAbortOnCancellationBarrier(checkpointId);
            return true;
        }
        if (checkpointId <= this.currentCheckpointId) {
            return false;
        }
        LOG.warn("{}: Received cancellation barrier for checkpoint {} before completing current checkpoint {}. Skipping current checkpoint.", this.taskName, Long.valueOf(checkpointId), Long.valueOf(this.currentCheckpointId));
        releaseBlocksAndResetBarriers();
        this.currentCheckpointId = checkpointId;
        this.startOfAlignmentTimestamp = 0L;
        this.latestAlignmentDurationNanos = 0L;
        notifyAbortOnCancellationBarrier(checkpointId);
        return true;
    }

    @Override // org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler
    public boolean processEndOfPartition() throws Exception {
        this.numClosedChannels++;
        if (this.numBarriersReceived <= 0) {
            return false;
        }
        notifyAbort(this.currentCheckpointId, new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM));
        releaseBlocksAndResetBarriers();
        return true;
    }

    @Override // org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler
    public long getLatestCheckpointId() {
        return this.currentCheckpointId;
    }

    @Override // org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler
    public long getAlignmentDurationNanos() {
        return this.startOfAlignmentTimestamp <= 0 ? this.latestAlignmentDurationNanos : System.nanoTime() - this.startOfAlignmentTimestamp;
    }

    public String toString() {
        return String.format("%s: last checkpoint: %d, current barriers: %d, closed channels: %d", this.taskName, Long.valueOf(this.currentCheckpointId), Integer.valueOf(this.numBarriersReceived), Integer.valueOf(this.numClosedChannels));
    }

    @Override // org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler
    public void checkpointSizeLimitExceeded(long j) throws Exception {
        releaseBlocksAndResetBarriers();
        notifyAbort(this.currentCheckpointId, new CheckpointException("Max buffered bytes: " + j, CheckpointFailureReason.CHECKPOINT_DECLINED_ALIGNMENT_LIMIT_EXCEEDED));
    }
}
