package org.apache.flink.runtime.executiongraph;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeoutException;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmaster.slotpool.Scheduler;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/SchedulingUtils.class */
public class SchedulingUtils {
    public static CompletableFuture<Void> schedule(ScheduleMode scheduleMode, Iterable<ExecutionVertex> iterable, ExecutionGraph executionGraph) {
        switch (scheduleMode) {
            case LAZY_FROM_SOURCES:
            case LAZY_FROM_SOURCES_WITH_BATCH_SLOT_REQUEST:
                return scheduleLazy(iterable, executionGraph);
            case EAGER:
                return scheduleEager(iterable, executionGraph);
            default:
                throw new IllegalStateException(String.format("Schedule mode %s is invalid.", scheduleMode));
        }
    }

    public static CompletableFuture<Void> scheduleLazy(Iterable<ExecutionVertex> iterable, ExecutionGraph executionGraph) {
        executionGraph.assertRunningInJobMasterMainThread();
        SlotProviderStrategy slotProviderStrategy = executionGraph.getSlotProviderStrategy();
        Set<AllocationID> computePriorAllocationIdsIfRequiredByScheduling = computePriorAllocationIdsIfRequiredByScheduling(iterable, slotProviderStrategy.asSlotProvider());
        ArrayList arrayList = new ArrayList();
        for (ExecutionVertex executionVertex : iterable) {
            if (executionVertex.getJobVertex().getJobVertex().isInputVertex() || executionVertex.checkInputDependencyConstraints()) {
                arrayList.add(executionVertex.scheduleForExecution(slotProviderStrategy, LocationPreferenceConstraint.ANY, computePriorAllocationIdsIfRequiredByScheduling));
            }
        }
        return FutureUtils.waitForAll(arrayList);
    }

    public static CompletableFuture<Void> scheduleEager(Iterable<ExecutionVertex> iterable, ExecutionGraph executionGraph) {
        executionGraph.assertRunningInJobMasterMainThread();
        Preconditions.checkState(executionGraph.getState() == JobStatus.RUNNING, "job is not running currently");
        ArrayList arrayList = new ArrayList();
        SlotProviderStrategy slotProviderStrategy = executionGraph.getSlotProviderStrategy();
        Set<AllocationID> unmodifiableSet = Collections.unmodifiableSet(computePriorAllocationIdsIfRequiredByScheduling(iterable, slotProviderStrategy.asSlotProvider()));
        Iterator<ExecutionVertex> it = iterable.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getCurrentExecutionAttempt().allocateResourcesForExecution(slotProviderStrategy, LocationPreferenceConstraint.ALL, unmodifiableSet));
        }
        FutureUtils.ConjunctFuture combineAll = FutureUtils.combineAll(arrayList);
        return combineAll.thenAccept(collection -> {
            Iterator it2 = collection.iterator();
            while (it2.hasNext()) {
                Execution execution = (Execution) it2.next();
                try {
                    execution.deploy();
                } catch (Throwable th) {
                    throw new CompletionException(new FlinkException(String.format("Could not deploy execution %s.", execution), th));
                }
            }
        }).exceptionally(th -> {
            Throwable th;
            Throwable stripCompletionException = ExceptionUtils.stripCompletionException(th);
            if (stripCompletionException instanceof TimeoutException) {
                String str = "Could not allocate all requires slots within timeout of " + executionGraph.getAllocationTimeout() + ". Slots required: " + combineAll.getNumFuturesTotal() + ", slots allocated: " + combineAll.getNumFuturesCompleted() + ", previous allocation IDs: " + unmodifiableSet;
                StringBuilder sb = new StringBuilder();
                for (int i = 0; i < arrayList.size(); i++) {
                    CompletableFuture completableFuture = (CompletableFuture) arrayList.get(i);
                    try {
                        Execution execution = (Execution) completableFuture.getNow(null);
                        if (execution != null) {
                            sb.append("completed: " + execution);
                        } else {
                            sb.append("incomplete: " + completableFuture);
                        }
                    } catch (CompletionException e) {
                        sb.append("completed exceptionally: " + e + "/" + completableFuture);
                    }
                    if (i < arrayList.size() - 1) {
                        sb.append(", ");
                    }
                }
                th = new NoResourceAvailableException(str + ", execution status: " + sb.toString());
            } else {
                th = stripCompletionException;
            }
            throw new CompletionException(th);
        });
    }

    private static Set<AllocationID> computePriorAllocationIdsIfRequiredByScheduling(Iterable<ExecutionVertex> iterable, SlotProvider slotProvider) {
        return ((slotProvider instanceof Scheduler) && ((Scheduler) slotProvider).requiresPreviousExecutionGraphAllocations()) ? computePriorAllocationIds(iterable) : Collections.emptySet();
    }

    private static Set<AllocationID> computePriorAllocationIds(Iterable<ExecutionVertex> iterable) {
        HashSet hashSet = new HashSet();
        Iterator<ExecutionVertex> it = iterable.iterator();
        while (it.hasNext()) {
            AllocationID latestPriorAllocation = it.next().getLatestPriorAllocation();
            if (latestPriorAllocation != null) {
                hashSet.add(latestPriorAllocation);
            }
        }
        return hashSet;
    }
}
