package org.apache.flink.runtime.executiongraph.failover;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.GlobalModVersionMismatch;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/failover/FailoverRegion.class */
public class FailoverRegion {
    private static final AtomicReferenceFieldUpdater<FailoverRegion, JobStatus> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(FailoverRegion.class, JobStatus.class, "state");
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) FailoverRegion.class);
    private final ExecutionGraph executionGraph;
    private final List<ExecutionVertex> connectedExecutionVertexes;
    private final Map<JobVertexID, ExecutionJobVertex> tasks;
    private final AbstractID id = new AbstractID();
    private volatile JobStatus state = JobStatus.RUNNING;

    public FailoverRegion(ExecutionGraph executionGraph, List<ExecutionVertex> list, Map<JobVertexID, ExecutionJobVertex> map) {
        this.executionGraph = (ExecutionGraph) Preconditions.checkNotNull(executionGraph);
        this.connectedExecutionVertexes = (List) Preconditions.checkNotNull(list);
        this.tasks = (Map) Preconditions.checkNotNull(map);
        LOG.debug("Created failover region {} with vertices: {}", this.id, list);
    }

    public void onExecutionFail(Execution execution, Throwable th) {
        if (this.executionGraph.getRestartStrategy().canRestart()) {
            cancel(execution.getGlobalModVersion());
        } else {
            this.executionGraph.failGlobal(th);
        }
    }

    private void allVerticesInTerminalState(long j) {
        JobStatus jobStatus;
        do {
            jobStatus = this.state;
            if (!jobStatus.equals(JobStatus.CANCELLING)) {
                LOG.info("FailoverRegion {} is {} when allVerticesInTerminalState.", this.id, this.state);
                return;
            }
        } while (!transitionState(jobStatus, JobStatus.CANCELED));
        reset(j);
    }

    public JobStatus getState() {
        return this.state;
    }

    private void failover(long j) {
        if (!this.executionGraph.getRestartStrategy().canRestart()) {
            this.executionGraph.failGlobal(new FlinkException("RestartStrategy validate fail"));
            return;
        }
        JobStatus jobStatus = this.state;
        if (jobStatus.equals(JobStatus.RUNNING)) {
            cancel(j);
        } else if (jobStatus.equals(JobStatus.CANCELED)) {
            reset(j);
        } else {
            LOG.info("FailoverRegion {} is {} when notified to failover.", this.id, this.state);
        }
    }

    private void cancel(long j) {
        JobStatus jobStatus;
        this.executionGraph.getJobMasterMainThreadExecutor().assertRunningInMainThread();
        do {
            jobStatus = this.state;
            if (!jobStatus.equals(JobStatus.RUNNING)) {
                LOG.info("FailoverRegion {} is {} when cancel.", this.id, this.state);
                return;
            }
        } while (!transitionState(jobStatus, JobStatus.CANCELLING));
        createTerminationFutureOverAllConnectedVertexes().thenAccept(r7 -> {
            allVerticesInTerminalState(j);
        });
    }

    @VisibleForTesting
    protected CompletableFuture<Void> createTerminationFutureOverAllConnectedVertexes() {
        ArrayList arrayList = new ArrayList(this.connectedExecutionVertexes.size());
        Iterator<ExecutionVertex> it = this.connectedExecutionVertexes.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().cancel());
        }
        return FutureUtils.waitForAll(arrayList);
    }

    private void reset(long j) {
        try {
            HashSet hashSet = new HashSet();
            long currentTimeMillis = System.currentTimeMillis();
            for (ExecutionVertex executionVertex : this.connectedExecutionVertexes) {
                CoLocationGroup coLocationGroup = executionVertex.getJobVertex().getCoLocationGroup();
                if (coLocationGroup != null && !hashSet.contains(coLocationGroup)) {
                    coLocationGroup.resetConstraints();
                    hashSet.add(coLocationGroup);
                }
                executionVertex.resetForNewExecution(currentTimeMillis, j);
            }
            if (transitionState(JobStatus.CANCELED, JobStatus.CREATED)) {
                restart(j);
            } else {
                LOG.info("FailoverRegion {} switched from CANCELLING to CREATED fail, will fail this region again.", this.id);
                failover(j);
            }
        } catch (GlobalModVersionMismatch e) {
            this.state = JobStatus.RUNNING;
        } catch (Throwable th) {
            LOG.info("FailoverRegion {} reset fail, will failover again.", this.id);
            failover(j);
        }
    }

    private void restart(long j) {
        try {
            if (transitionState(JobStatus.CREATED, JobStatus.RUNNING)) {
                if (this.executionGraph.getCheckpointCoordinator() != null) {
                    this.executionGraph.getCheckpointCoordinator().abortPendingCheckpoints(new CheckpointException(CheckpointFailureReason.JOB_FAILOVER_REGION));
                    this.executionGraph.getCheckpointCoordinator().restoreLatestCheckpointedState(this.tasks, false, true);
                }
                HashSet hashSet = new HashSet(this.connectedExecutionVertexes.size());
                Iterator<ExecutionVertex> it = this.connectedExecutionVertexes.iterator();
                while (it.hasNext()) {
                    AllocationID latestPriorAllocation = it.next().getLatestPriorAllocation();
                    if (latestPriorAllocation != null) {
                        hashSet.add(latestPriorAllocation);
                    }
                }
                Iterator<ExecutionVertex> it2 = this.connectedExecutionVertexes.iterator();
                while (it2.hasNext()) {
                    try {
                        it2.next().scheduleForExecution(this.executionGraph.getSlotProviderStrategy(), LocationPreferenceConstraint.ANY, hashSet);
                    } catch (Throwable th) {
                        failover(j);
                    }
                }
            } else {
                LOG.info("FailoverRegion {} switched from CREATED to RUNNING fail, will fail this region again.", this.id);
                failover(j);
            }
        } catch (Exception e) {
            LOG.info("FailoverRegion {} restart failed, failover again.", this.id, e);
            failover(j);
        }
    }

    private boolean transitionState(JobStatus jobStatus, JobStatus jobStatus2) {
        if (!STATE_UPDATER.compareAndSet(this, jobStatus, jobStatus2)) {
            return false;
        }
        LOG.info("FailoverRegion {} switched from state {} to {}.", this.id, jobStatus, jobStatus2);
        return true;
    }
}
