package org.apache.flink.runtime.jobmaster.slotpool;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.Locality;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.SlotSelectionStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.class */
public class SchedulerImpl implements Scheduler {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) SchedulerImpl.class);
    private static final int DEFAULT_SLOT_SHARING_MANAGERS_MAP_SIZE = 128;

    @Nonnull
    private final SlotSelectionStrategy slotSelectionStrategy;

    @Nonnull
    private final SlotPool slotPool;

    @Nonnull
    private ComponentMainThreadExecutor componentMainThreadExecutor;

    @Nonnull
    private final Map<SlotSharingGroupId, SlotSharingManager> slotSharingManagers;

    public SchedulerImpl(@Nonnull SlotSelectionStrategy slotSelectionStrategy, @Nonnull SlotPool slotPool) {
        this(slotSelectionStrategy, slotPool, new HashMap(128));
    }

    @VisibleForTesting
    public SchedulerImpl(@Nonnull SlotSelectionStrategy slotSelectionStrategy, @Nonnull SlotPool slotPool, @Nonnull Map<SlotSharingGroupId, SlotSharingManager> map) {
        this.slotSelectionStrategy = slotSelectionStrategy;
        this.slotSharingManagers = map;
        this.slotPool = slotPool;
        this.componentMainThreadExecutor = new ComponentMainThreadExecutor.DummyComponentMainThreadExecutor("Scheduler is not initialized with proper main thread executor. Call to Scheduler.start(...) required.");
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.Scheduler
    public void start(@Nonnull ComponentMainThreadExecutor componentMainThreadExecutor) {
        this.componentMainThreadExecutor = componentMainThreadExecutor;
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.SlotProvider
    public CompletableFuture<LogicalSlot> allocateSlot(SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile, boolean z, Time time) {
        return allocateSlotInternal(slotRequestId, scheduledUnit, slotProfile, z, time);
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.SlotProvider
    public CompletableFuture<LogicalSlot> allocateBatchSlot(SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile, boolean z) {
        return allocateSlotInternal(slotRequestId, scheduledUnit, slotProfile, z, null);
    }

    @Nonnull
    private CompletableFuture<LogicalSlot> allocateSlotInternal(SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile, boolean z, @Nullable Time time) {
        log.debug("Received slot request [{}] for task: {}", slotRequestId, scheduledUnit.getTaskToExecute());
        this.componentMainThreadExecutor.assertRunningInMainThread();
        CompletableFuture<LogicalSlot> completableFuture = new CompletableFuture<>();
        internalAllocateSlot(completableFuture, slotRequestId, scheduledUnit, slotProfile, z, time);
        return completableFuture;
    }

    private void internalAllocateSlot(CompletableFuture<LogicalSlot> completableFuture, SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile, boolean z, Time time) {
        (scheduledUnit.getSlotSharingGroupId() == null ? allocateSingleSlot(slotRequestId, slotProfile, z, time) : allocateSharedSlot(slotRequestId, scheduledUnit, slotProfile, z, time)).whenComplete((logicalSlot, th) -> {
            if (th == null) {
                completableFuture.complete(logicalSlot);
                return;
            }
            Optional findThrowable = ExceptionUtils.findThrowable(th, SharedSlotOversubscribedException.class);
            if (findThrowable.isPresent() && ((SharedSlotOversubscribedException) findThrowable.get()).canRetry()) {
                internalAllocateSlot(completableFuture, slotRequestId, scheduledUnit, slotProfile, z, time);
            } else {
                cancelSlotRequest(slotRequestId, scheduledUnit.getSlotSharingGroupId(), th);
                completableFuture.completeExceptionally(th);
            }
        });
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.SlotProvider
    public void cancelSlotRequest(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable th) {
        this.componentMainThreadExecutor.assertRunningInMainThread();
        if (slotSharingGroupId != null) {
            releaseSharedSlot(slotRequestId, slotSharingGroupId, th);
        } else {
            this.slotPool.releaseSlot(slotRequestId, th);
        }
    }

    @Override // org.apache.flink.runtime.jobmaster.SlotOwner
    public void returnLogicalSlot(LogicalSlot logicalSlot) {
        cancelSlotRequest(logicalSlot.getSlotRequestId(), logicalSlot.getSlotSharingGroupId(), new FlinkException("Slot is being returned to the SlotPool."));
    }

    private CompletableFuture<LogicalSlot> allocateSingleSlot(SlotRequestId slotRequestId, SlotProfile slotProfile, boolean z, @Nullable Time time) {
        Optional<SlotAndLocality> tryAllocateFromAvailable = tryAllocateFromAvailable(slotRequestId, slotProfile);
        if (!tryAllocateFromAvailable.isPresent()) {
            return z ? requestNewAllocatedSlot(slotRequestId, slotProfile, time).thenApply(physicalSlot -> {
                try {
                    return completeAllocationByAssigningPayload(slotRequestId, new SlotAndLocality(physicalSlot, Locality.UNKNOWN));
                } catch (FlinkException e) {
                    throw new CompletionException(e);
                }
            }) : FutureUtils.completedExceptionally(new NoResourceAvailableException("Could not allocate a simple slot for " + slotRequestId + '.'));
        }
        try {
            return CompletableFuture.completedFuture(completeAllocationByAssigningPayload(slotRequestId, tryAllocateFromAvailable.get()));
        } catch (FlinkException e) {
            return FutureUtils.completedExceptionally(e);
        }
    }

    @Nonnull
    private CompletableFuture<PhysicalSlot> requestNewAllocatedSlot(SlotRequestId slotRequestId, SlotProfile slotProfile, @Nullable Time time) {
        return time == null ? this.slotPool.requestNewAllocatedBatchSlot(slotRequestId, slotProfile.getResourceProfile()) : this.slotPool.requestNewAllocatedSlot(slotRequestId, slotProfile.getResourceProfile(), time);
    }

    @Nonnull
    private LogicalSlot completeAllocationByAssigningPayload(@Nonnull SlotRequestId slotRequestId, @Nonnull SlotAndLocality slotAndLocality) throws FlinkException {
        PhysicalSlot slot = slotAndLocality.getSlot();
        SingleLogicalSlot singleLogicalSlot = new SingleLogicalSlot(slotRequestId, slot, null, slotAndLocality.getLocality(), this);
        if (slot.tryAssignPayload(singleLogicalSlot)) {
            return singleLogicalSlot;
        }
        FlinkException flinkException = new FlinkException("Could not assign payload to allocated slot " + slot.getAllocationId() + '.');
        this.slotPool.releaseSlot(slotRequestId, flinkException);
        throw flinkException;
    }

    private Optional<SlotAndLocality> tryAllocateFromAvailable(@Nonnull SlotRequestId slotRequestId, @Nonnull SlotProfile slotProfile) {
        return this.slotSelectionStrategy.selectBestSlotForProfile((Collection) this.slotPool.getAvailableSlotsInformation().stream().map(SlotSelectionStrategy.SlotInfoAndResources::new).collect(Collectors.toList()), slotProfile).flatMap(slotInfoAndLocality -> {
            return this.slotPool.allocateAvailableSlot(slotRequestId, slotInfoAndLocality.getSlotInfo().getAllocationId()).map(physicalSlot -> {
                return new SlotAndLocality(physicalSlot, slotInfoAndLocality.getLocality());
            });
        });
    }

    private CompletableFuture<LogicalSlot> allocateSharedSlot(SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile, boolean z, @Nullable Time time) {
        SlotSharingManager computeIfAbsent = this.slotSharingManagers.computeIfAbsent(scheduledUnit.getSlotSharingGroupId(), slotSharingGroupId -> {
            return new SlotSharingManager(slotSharingGroupId, this.slotPool, this);
        });
        try {
            SlotSharingManager.MultiTaskSlotLocality allocateCoLocatedMultiTaskSlot = scheduledUnit.getCoLocationConstraint() != null ? allocateCoLocatedMultiTaskSlot(scheduledUnit.getCoLocationConstraint(), computeIfAbsent, slotProfile, z, time) : allocateMultiTaskSlot(scheduledUnit.getJobVertexId(), computeIfAbsent, slotProfile, z, time);
            Preconditions.checkState(!allocateCoLocatedMultiTaskSlot.getMultiTaskSlot().contains(scheduledUnit.getJobVertexId()));
            return allocateCoLocatedMultiTaskSlot.getMultiTaskSlot().allocateSingleTaskSlot(slotRequestId, slotProfile.getResourceProfile(), scheduledUnit.getJobVertexId(), allocateCoLocatedMultiTaskSlot.getLocality()).getLogicalSlotFuture();
        } catch (NoResourceAvailableException e) {
            return FutureUtils.completedExceptionally(e);
        }
    }

    private SlotSharingManager.MultiTaskSlotLocality allocateCoLocatedMultiTaskSlot(CoLocationConstraint coLocationConstraint, SlotSharingManager slotSharingManager, SlotProfile slotProfile, boolean z, @Nullable Time time) throws NoResourceAvailableException {
        SlotRequestId slotRequestId = coLocationConstraint.getSlotRequestId();
        if (slotRequestId != null) {
            SlotSharingManager.TaskSlot taskSlot = slotSharingManager.getTaskSlot(slotRequestId);
            if (taskSlot != null) {
                Preconditions.checkState(taskSlot instanceof SlotSharingManager.MultiTaskSlot);
                SlotSharingManager.MultiTaskSlot multiTaskSlot = (SlotSharingManager.MultiTaskSlot) taskSlot;
                if (multiTaskSlot.mayHaveEnoughResourcesToFulfill(slotProfile.getResourceProfile())) {
                    return SlotSharingManager.MultiTaskSlotLocality.of(multiTaskSlot, Locality.LOCAL);
                }
                throw new NoResourceAvailableException("Not enough resources in the slot for all co-located tasks.");
            }
            coLocationConstraint.setSlotRequestId(null);
        }
        if (coLocationConstraint.isAssigned()) {
            slotProfile = new SlotProfile(slotProfile.getResourceProfile(), Collections.singleton(coLocationConstraint.getLocation()), slotProfile.getPreferredAllocations());
        }
        SlotSharingManager.MultiTaskSlotLocality allocateMultiTaskSlot = allocateMultiTaskSlot(coLocationConstraint.getGroupId(), slotSharingManager, slotProfile, z, time);
        if (coLocationConstraint.isAssigned() && allocateMultiTaskSlot.getLocality() != Locality.LOCAL) {
            allocateMultiTaskSlot.getMultiTaskSlot().release(new FlinkException("Multi task slot is not local and, thus, does not fulfill the co-location constraint."));
            throw new NoResourceAvailableException("Could not allocate a local multi task slot for the co location constraint " + coLocationConstraint + '.');
        }
        SlotRequestId slotRequestId2 = new SlotRequestId();
        SlotSharingManager.MultiTaskSlot allocateMultiTaskSlot2 = allocateMultiTaskSlot.getMultiTaskSlot().allocateMultiTaskSlot(slotRequestId2, coLocationConstraint.getGroupId());
        coLocationConstraint.setSlotRequestId(slotRequestId2);
        allocateMultiTaskSlot2.getSlotContextFuture().whenComplete((slotContext, th) -> {
            if (th != null) {
                log.debug("Failed to lock colocation constraint {} because the slot allocation for slot request {} failed.", coLocationConstraint.getGroupId(), coLocationConstraint.getSlotRequestId(), th);
            } else if (Objects.equals(coLocationConstraint.getSlotRequestId(), slotRequestId2)) {
                coLocationConstraint.lockLocation(slotContext.getTaskManagerLocation());
            } else {
                log.debug("Failed to lock colocation constraint {} because assigned slot request {} differs from fulfilled slot request {}.", coLocationConstraint.getGroupId(), coLocationConstraint.getSlotRequestId(), slotRequestId2);
            }
        });
        return SlotSharingManager.MultiTaskSlotLocality.of(allocateMultiTaskSlot2, allocateMultiTaskSlot.getLocality());
    }

    private SlotSharingManager.MultiTaskSlotLocality allocateMultiTaskSlot(AbstractID abstractID, SlotSharingManager slotSharingManager, SlotProfile slotProfile, boolean z, @Nullable Time time) throws NoResourceAvailableException {
        SlotSelectionStrategy.SlotInfoAndLocality orElse = this.slotSelectionStrategy.selectBestSlotForProfile(slotSharingManager.listResolvedRootSlotInfo(abstractID), slotProfile).orElse(null);
        SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality = orElse != null ? new SlotSharingManager.MultiTaskSlotLocality(slotSharingManager.getResolvedRootSlot(orElse.getSlotInfo()), orElse.getLocality()) : null;
        if (multiTaskSlotLocality != null && multiTaskSlotLocality.getLocality() == Locality.LOCAL) {
            return multiTaskSlotLocality;
        }
        SlotRequestId slotRequestId = new SlotRequestId();
        SlotRequestId slotRequestId2 = new SlotRequestId();
        Optional<SlotAndLocality> tryAllocateFromAvailable = tryAllocateFromAvailable(slotRequestId, slotProfile);
        if (tryAllocateFromAvailable.isPresent()) {
            SlotAndLocality slotAndLocality = tryAllocateFromAvailable.get();
            if (slotAndLocality.getLocality() == Locality.LOCAL || orElse == null) {
                PhysicalSlot slot = slotAndLocality.getSlot();
                SlotSharingManager.MultiTaskSlot createRootSlot = slotSharingManager.createRootSlot(slotRequestId2, CompletableFuture.completedFuture(slotAndLocality.getSlot()), slotRequestId);
                if (slot.tryAssignPayload(createRootSlot)) {
                    return SlotSharingManager.MultiTaskSlotLocality.of(createRootSlot, slotAndLocality.getLocality());
                }
                createRootSlot.release(new FlinkException("Could not assign payload to allocated slot " + slot.getAllocationId() + '.'));
            }
        }
        if (multiTaskSlotLocality != null) {
            if (tryAllocateFromAvailable.isPresent()) {
                this.slotPool.releaseSlot(slotRequestId, new FlinkException("Locality constraint is not better fulfilled by allocated slot."));
            }
            return multiTaskSlotLocality;
        }
        if (!z) {
            throw new NoResourceAvailableException("Could not allocate a shared slot for " + abstractID + '.');
        }
        SlotSharingManager.MultiTaskSlot unresolvedRootSlot = slotSharingManager.getUnresolvedRootSlot(abstractID);
        if (unresolvedRootSlot == null) {
            CompletableFuture<PhysicalSlot> requestNewAllocatedSlot = requestNewAllocatedSlot(slotRequestId, slotProfile, time);
            unresolvedRootSlot = slotSharingManager.createRootSlot(slotRequestId2, requestNewAllocatedSlot, slotRequestId);
            requestNewAllocatedSlot.whenComplete((physicalSlot, th) -> {
                SlotSharingManager.TaskSlot taskSlot = slotSharingManager.getTaskSlot(slotRequestId2);
                if (taskSlot == null) {
                    this.slotPool.releaseSlot(slotRequestId, new FlinkException("Could not find task slot with " + slotRequestId2 + '.'));
                    return;
                }
                if (!(taskSlot instanceof SlotSharingManager.MultiTaskSlot) || th != null) {
                    taskSlot.release(th);
                } else {
                    if (physicalSlot.tryAssignPayload((SlotSharingManager.MultiTaskSlot) taskSlot)) {
                        return;
                    }
                    taskSlot.release(new FlinkException("Could not assign payload to allocated slot " + physicalSlot.getAllocationId() + '.'));
                }
            });
        }
        return SlotSharingManager.MultiTaskSlotLocality.of(unresolvedRootSlot, Locality.UNKNOWN);
    }

    private void releaseSharedSlot(@Nonnull SlotRequestId slotRequestId, @Nonnull SlotSharingGroupId slotSharingGroupId, Throwable th) {
        SlotSharingManager slotSharingManager = this.slotSharingManagers.get(slotSharingGroupId);
        if (slotSharingManager == null) {
            log.debug("Could not find slot sharing group {}. Ignoring release slot request.", slotSharingGroupId);
            return;
        }
        SlotSharingManager.TaskSlot taskSlot = slotSharingManager.getTaskSlot(slotRequestId);
        if (taskSlot != null) {
            taskSlot.release(th);
        } else {
            log.debug("Could not find slot [{}] in slot sharing group {}. Ignoring release slot request.", slotRequestId, slotSharingGroupId);
        }
    }

    @Override // org.apache.flink.runtime.jobmaster.slotpool.Scheduler
    public boolean requiresPreviousExecutionGraphAllocations() {
        return this.slotSelectionStrategy instanceof PreviousAllocationSlotSelectionStrategy;
    }
}
