package org.apache.flink.runtime.scheduler;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.BiFunction;
import java.util.function.Function;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
import org.apache.flink.runtime.executiongraph.ExecutionGraphException;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategyResolving;
import org.apache.flink.runtime.io.network.partition.PartitionTracker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.UnknownKvStateLocation;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.FunctionUtils;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/LegacyScheduler.class */
public class LegacyScheduler implements SchedulerNG {
    private final Logger log;
    private final JobGraph jobGraph;
    private final ExecutionGraph executionGraph;
    private final BackPressureStatsTracker backPressureStatsTracker;
    private final Executor ioExecutor;
    private final Configuration jobMasterConfiguration;
    private final SlotProvider slotProvider;
    private final ScheduledExecutorService futureExecutor;
    private final ClassLoader userCodeLoader;
    private final CheckpointRecoveryFactory checkpointRecoveryFactory;
    private final Time rpcTimeout;
    private final RestartStrategy restartStrategy;
    private final BlobWriter blobWriter;
    private final Time slotRequestTimeout;
    private ComponentMainThreadExecutor mainThreadExecutor = new ComponentMainThreadExecutor.DummyComponentMainThreadExecutor("LegacyScheduler is not initialized with proper main thread executor. Call to LegacyScheduler.setMainThreadExecutor(...) required.");

    public LegacyScheduler(Logger logger, JobGraph jobGraph, BackPressureStatsTracker backPressureStatsTracker, Executor executor, Configuration configuration, SlotProvider slotProvider, ScheduledExecutorService scheduledExecutorService, ClassLoader classLoader, CheckpointRecoveryFactory checkpointRecoveryFactory, Time time, RestartStrategyFactory restartStrategyFactory, BlobWriter blobWriter, JobManagerJobMetricGroup jobManagerJobMetricGroup, Time time2, ShuffleMaster<?> shuffleMaster, PartitionTracker partitionTracker) throws Exception {
        this.log = (Logger) Preconditions.checkNotNull(logger);
        this.jobGraph = (JobGraph) Preconditions.checkNotNull(jobGraph);
        this.backPressureStatsTracker = (BackPressureStatsTracker) Preconditions.checkNotNull(backPressureStatsTracker);
        this.ioExecutor = (Executor) Preconditions.checkNotNull(executor);
        this.jobMasterConfiguration = (Configuration) Preconditions.checkNotNull(configuration);
        this.slotProvider = (SlotProvider) Preconditions.checkNotNull(slotProvider);
        this.futureExecutor = (ScheduledExecutorService) Preconditions.checkNotNull(scheduledExecutorService);
        this.userCodeLoader = (ClassLoader) Preconditions.checkNotNull(classLoader);
        this.checkpointRecoveryFactory = (CheckpointRecoveryFactory) Preconditions.checkNotNull(checkpointRecoveryFactory);
        this.rpcTimeout = (Time) Preconditions.checkNotNull(time);
        this.restartStrategy = RestartStrategyResolving.resolve(jobGraph.getSerializedExecutionConfig().deserializeValue(classLoader).getRestartStrategy(), restartStrategyFactory, jobGraph.isCheckpointingEnabled());
        logger.info("Using restart strategy {} for {} ({}).", this.restartStrategy, jobGraph.getName(), jobGraph.getJobID());
        this.blobWriter = (BlobWriter) Preconditions.checkNotNull(blobWriter);
        this.slotRequestTimeout = (Time) Preconditions.checkNotNull(time2);
        this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup, (ShuffleMaster) Preconditions.checkNotNull(shuffleMaster), (PartitionTracker) Preconditions.checkNotNull(partitionTracker));
    }

    private ExecutionGraph createAndRestoreExecutionGraph(JobManagerJobMetricGroup jobManagerJobMetricGroup, ShuffleMaster<?> shuffleMaster, PartitionTracker partitionTracker) throws Exception {
        ExecutionGraph createExecutionGraph = createExecutionGraph(jobManagerJobMetricGroup, shuffleMaster, partitionTracker);
        CheckpointCoordinator checkpointCoordinator = createExecutionGraph.getCheckpointCoordinator();
        if (checkpointCoordinator != null && !checkpointCoordinator.restoreLatestCheckpointedState(createExecutionGraph.getAllVertices(), false, false)) {
            tryRestoreExecutionGraphFromSavepoint(createExecutionGraph, this.jobGraph.getSavepointRestoreSettings());
        }
        return createExecutionGraph;
    }

    private ExecutionGraph createExecutionGraph(JobManagerJobMetricGroup jobManagerJobMetricGroup, ShuffleMaster<?> shuffleMaster, PartitionTracker partitionTracker) throws JobExecutionException, JobException {
        return ExecutionGraphBuilder.buildGraph(null, this.jobGraph, this.jobMasterConfiguration, this.futureExecutor, this.ioExecutor, this.slotProvider, this.userCodeLoader, this.checkpointRecoveryFactory, this.rpcTimeout, this.restartStrategy, jobManagerJobMetricGroup, this.blobWriter, this.slotRequestTimeout, this.log, shuffleMaster, partitionTracker);
    }

    private void tryRestoreExecutionGraphFromSavepoint(ExecutionGraph executionGraph, SavepointRestoreSettings savepointRestoreSettings) throws Exception {
        CheckpointCoordinator checkpointCoordinator;
        if (!savepointRestoreSettings.restoreSavepoint() || (checkpointCoordinator = executionGraph.getCheckpointCoordinator()) == null) {
            return;
        }
        checkpointCoordinator.restoreSavepoint(savepointRestoreSettings.getRestorePath(), savepointRestoreSettings.allowNonRestoredState(), executionGraph.getAllVertices(), this.userCodeLoader);
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public void setMainThreadExecutor(ComponentMainThreadExecutor componentMainThreadExecutor) {
        this.mainThreadExecutor = (ComponentMainThreadExecutor) Preconditions.checkNotNull(componentMainThreadExecutor);
        this.executionGraph.start(componentMainThreadExecutor);
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public void registerJobStatusListener(JobStatusListener jobStatusListener) {
        this.executionGraph.registerJobStatusListener(jobStatusListener);
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public void startScheduling() {
        this.mainThreadExecutor.assertRunningInMainThread();
        try {
            this.executionGraph.scheduleForExecution();
        } catch (Throwable th) {
            this.executionGraph.failGlobal(th);
        }
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public void suspend(Throwable th) {
        this.mainThreadExecutor.assertRunningInMainThread();
        this.executionGraph.suspend(th);
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public void cancel() {
        this.mainThreadExecutor.assertRunningInMainThread();
        this.executionGraph.cancel();
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public CompletableFuture<Void> getTerminationFuture() {
        return this.executionGraph.getTerminationFuture().thenApply(FunctionUtils.nullFn());
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public boolean updateTaskExecutionState(TaskExecutionState taskExecutionState) {
        this.mainThreadExecutor.assertRunningInMainThread();
        return this.executionGraph.updateState(taskExecutionState);
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public SerializedInputSplit requestNextInputSplit(JobVertexID jobVertexID, ExecutionAttemptID executionAttemptID) throws IOException {
        this.mainThreadExecutor.assertRunningInMainThread();
        Execution execution = this.executionGraph.getRegisteredExecutions().get(executionAttemptID);
        if (execution == null) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Can not find Execution for attempt {}.", executionAttemptID);
            }
            throw new IllegalArgumentException("Can not find Execution for attempt " + executionAttemptID);
        }
        ExecutionJobVertex jobVertex = this.executionGraph.getJobVertex(jobVertexID);
        if (jobVertex == null) {
            throw new IllegalArgumentException("Cannot find execution vertex for vertex ID " + jobVertexID);
        }
        if (jobVertex.getSplitAssigner() == null) {
            throw new IllegalStateException("No InputSplitAssigner for vertex ID " + jobVertexID);
        }
        InputSplit nextInputSplit = execution.getNextInputSplit();
        if (this.log.isDebugEnabled()) {
            this.log.debug("Send next input split {}.", nextInputSplit);
        }
        try {
            return new SerializedInputSplit(InstantiationUtil.serializeObject(nextInputSplit));
        } catch (Exception e) {
            IOException iOException = new IOException("Could not serialize the next input split of class " + nextInputSplit.getClass() + ".", e);
            jobVertex.fail(iOException);
            throw iOException;
        }
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public ExecutionState requestPartitionState(IntermediateDataSetID intermediateDataSetID, ResultPartitionID resultPartitionID) throws PartitionProducerDisposedException {
        this.mainThreadExecutor.assertRunningInMainThread();
        Execution execution = this.executionGraph.getRegisteredExecutions().get(resultPartitionID.getProducerId());
        if (execution != null) {
            return execution.getState();
        }
        IntermediateResult intermediateResult = this.executionGraph.getAllIntermediateResults().get(intermediateDataSetID);
        if (intermediateResult == null) {
            throw new IllegalArgumentException("Intermediate data set with ID " + intermediateDataSetID + " not found.");
        }
        Execution currentExecutionAttempt = intermediateResult.getPartitionById(resultPartitionID.getPartitionId()).getProducer().getCurrentExecutionAttempt();
        if (currentExecutionAttempt.getAttemptId().equals(resultPartitionID.getProducerId())) {
            return currentExecutionAttempt.getState();
        }
        throw new PartitionProducerDisposedException(resultPartitionID);
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public void scheduleOrUpdateConsumers(ResultPartitionID resultPartitionID) {
        this.mainThreadExecutor.assertRunningInMainThread();
        try {
            this.executionGraph.scheduleOrUpdateConsumers(resultPartitionID);
        } catch (ExecutionGraphException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public ArchivedExecutionGraph requestJob() {
        this.mainThreadExecutor.assertRunningInMainThread();
        return ArchivedExecutionGraph.createFrom(this.executionGraph);
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public JobStatus requestJobStatus() {
        return this.executionGraph.getState();
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public JobDetails requestJobDetails() {
        this.mainThreadExecutor.assertRunningInMainThread();
        return WebMonitorUtils.createDetailsForJob(this.executionGraph);
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public KvStateLocation requestKvStateLocation(JobID jobID, String str) throws UnknownKvStateLocation, FlinkJobNotFoundException {
        this.mainThreadExecutor.assertRunningInMainThread();
        if (!this.jobGraph.getJobID().equals(jobID)) {
            if (this.log.isDebugEnabled()) {
                this.log.debug("Request of key-value state location for unknown job {} received.", jobID);
            }
            throw new FlinkJobNotFoundException(jobID);
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Lookup key-value state for job {} with registration name {}.", this.jobGraph.getJobID(), str);
        }
        KvStateLocation kvStateLocation = this.executionGraph.getKvStateLocationRegistry().getKvStateLocation(str);
        if (kvStateLocation != null) {
            return kvStateLocation;
        }
        throw new UnknownKvStateLocation(str);
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public void notifyKvStateRegistered(JobID jobID, JobVertexID jobVertexID, KeyGroupRange keyGroupRange, String str, KvStateID kvStateID, InetSocketAddress inetSocketAddress) throws FlinkJobNotFoundException {
        this.mainThreadExecutor.assertRunningInMainThread();
        if (!this.jobGraph.getJobID().equals(jobID)) {
            throw new FlinkJobNotFoundException(jobID);
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Key value state registered for job {} under name {}.", this.jobGraph.getJobID(), str);
        }
        try {
            this.executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered(jobVertexID, keyGroupRange, str, kvStateID, inetSocketAddress);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public void notifyKvStateUnregistered(JobID jobID, JobVertexID jobVertexID, KeyGroupRange keyGroupRange, String str) throws FlinkJobNotFoundException {
        this.mainThreadExecutor.assertRunningInMainThread();
        if (!this.jobGraph.getJobID().equals(jobID)) {
            throw new FlinkJobNotFoundException(jobID);
        }
        if (this.log.isDebugEnabled()) {
            this.log.debug("Key value state unregistered for job {} under name {}.", this.jobGraph.getJobID(), str);
        }
        try {
            this.executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered(jobVertexID, keyGroupRange, str);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) {
        this.mainThreadExecutor.assertRunningInMainThread();
        this.executionGraph.updateAccumulators(accumulatorSnapshot);
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public Optional<OperatorBackPressureStats> requestOperatorBackPressureStats(JobVertexID jobVertexID) throws FlinkException {
        ExecutionJobVertex jobVertex = this.executionGraph.getJobVertex(jobVertexID);
        if (jobVertex == null) {
            throw new FlinkException("JobVertexID not found " + jobVertexID);
        }
        return this.backPressureStatsTracker.getOperatorBackPressureStats(jobVertex);
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public CompletableFuture<String> triggerSavepoint(String str, boolean z) {
        this.mainThreadExecutor.assertRunningInMainThread();
        CheckpointCoordinator checkpointCoordinator = this.executionGraph.getCheckpointCoordinator();
        if (checkpointCoordinator == null) {
            throw new IllegalStateException(String.format("Job %s is not a streaming job.", this.jobGraph.getJobID()));
        }
        if (str == null && !checkpointCoordinator.getCheckpointStorage().hasDefaultSavepointLocation()) {
            this.log.info("Trying to cancel job {} with savepoint, but no savepoint directory configured.", this.jobGraph.getJobID());
            throw new IllegalStateException("No savepoint directory configured. You can either specify a directory while cancelling via -s :targetDirectory or configure a cluster-wide default via key '" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "'.");
        }
        if (z) {
            checkpointCoordinator.stopCheckpointScheduler();
        }
        return checkpointCoordinator.triggerSavepoint(System.currentTimeMillis(), str).thenApply((v0) -> {
            return v0.getExternalPointer();
        }).handleAsync((BiFunction<? super U, Throwable, ? extends U>) (str2, th) -> {
            if (th != null) {
                if (z) {
                    startCheckpointScheduler(checkpointCoordinator);
                }
                throw new CompletionException(th);
            }
            if (z) {
                this.log.info("Savepoint stored in {}. Now cancelling {}.", str2, this.jobGraph.getJobID());
                cancel();
            }
            return str2;
        }, (Executor) this.mainThreadExecutor);
    }

    private void startCheckpointScheduler(CheckpointCoordinator checkpointCoordinator) {
        this.mainThreadExecutor.assertRunningInMainThread();
        if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
            try {
                checkpointCoordinator.startCheckpointScheduler();
            } catch (IllegalStateException e) {
            }
        }
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public void acknowledgeCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long j, CheckpointMetrics checkpointMetrics, TaskStateSnapshot taskStateSnapshot) {
        this.mainThreadExecutor.assertRunningInMainThread();
        CheckpointCoordinator checkpointCoordinator = this.executionGraph.getCheckpointCoordinator();
        AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(jobID, executionAttemptID, j, checkpointMetrics, taskStateSnapshot);
        String retrieveTaskManagerLocation = retrieveTaskManagerLocation(executionAttemptID);
        if (checkpointCoordinator != null) {
            this.ioExecutor.execute(() -> {
                try {
                    checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, retrieveTaskManagerLocation);
                } catch (Throwable th) {
                    this.log.warn("Error while processing checkpoint acknowledgement message", th);
                }
            });
        } else if (this.executionGraph.getState() == JobStatus.RUNNING) {
            this.log.error("Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator", this.jobGraph.getJobID());
        } else {
            this.log.debug("Received AcknowledgeCheckpoint message for job {} with no CheckpointCoordinator", this.jobGraph.getJobID());
        }
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public void declineCheckpoint(DeclineCheckpoint declineCheckpoint) {
        this.mainThreadExecutor.assertRunningInMainThread();
        CheckpointCoordinator checkpointCoordinator = this.executionGraph.getCheckpointCoordinator();
        String retrieveTaskManagerLocation = retrieveTaskManagerLocation(declineCheckpoint.getTaskExecutionId());
        if (checkpointCoordinator != null) {
            this.ioExecutor.execute(() -> {
                try {
                    checkpointCoordinator.receiveDeclineMessage(declineCheckpoint, retrieveTaskManagerLocation);
                } catch (Exception e) {
                    this.log.error("Error in CheckpointCoordinator while processing {}", declineCheckpoint, e);
                }
            });
        } else if (this.executionGraph.getState() == JobStatus.RUNNING) {
            this.log.error("Received DeclineCheckpoint message for job {} with no CheckpointCoordinator", this.jobGraph.getJobID());
        } else {
            this.log.debug("Received DeclineCheckpoint message for job {} with no CheckpointCoordinator", this.jobGraph.getJobID());
        }
    }

    @Override // org.apache.flink.runtime.scheduler.SchedulerNG
    public CompletableFuture<String> stopWithSavepoint(String str, boolean z) {
        this.mainThreadExecutor.assertRunningInMainThread();
        CheckpointCoordinator checkpointCoordinator = this.executionGraph.getCheckpointCoordinator();
        if (checkpointCoordinator == null) {
            return FutureUtils.completedExceptionally(new IllegalStateException(String.format("Job %s is not a streaming job.", this.jobGraph.getJobID())));
        }
        if (str == null && !checkpointCoordinator.getCheckpointStorage().hasDefaultSavepointLocation()) {
            this.log.info("Trying to cancel job {} with savepoint, but no savepoint directory configured.", this.jobGraph.getJobID());
            return FutureUtils.completedExceptionally(new IllegalStateException("No savepoint directory configured. You can either specify a directory while cancelling via -s :targetDirectory or configure a cluster-wide default via key '" + CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "'."));
        }
        checkpointCoordinator.stopCheckpointScheduler();
        CompletableFuture<U> thenApply = checkpointCoordinator.triggerSynchronousSavepoint(System.currentTimeMillis(), z, str).thenApply((v0) -> {
            return v0.getExternalPointer();
        });
        CompletableFuture<U> handle = this.executionGraph.getTerminationFuture().handle((jobStatus, th) -> {
            if (th != null) {
                this.log.info("Failed during stopping job {} with a savepoint. Reason: {}", this.jobGraph.getJobID(), th.getMessage());
                throw new CompletionException(th);
            }
            if (jobStatus == JobStatus.FINISHED) {
                return jobStatus;
            }
            this.log.info("Failed during stopping job {} with a savepoint. Reason: Reached state {} instead of FINISHED.", this.jobGraph.getJobID(), jobStatus);
            throw new CompletionException(new FlinkException("Reached state " + jobStatus + " instead of FINISHED."));
        });
        return thenApply.thenCompose((Function<? super U, ? extends CompletionStage<U>>) str2 -> {
            return handle.thenApply(jobStatus2 -> {
                return str2;
            });
        });
    }

    private String retrieveTaskManagerLocation(ExecutionAttemptID executionAttemptID) {
        return (String) Optional.ofNullable(this.executionGraph.getRegisteredExecutions().get(executionAttemptID)).map((v0) -> {
            return v0.getAssignedResourceLocation();
        }).map((v0) -> {
            return v0.toString();
        }).orElse("Unknown location");
    }
}
