package org.apache.tez.dag.app.dag.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.Preconditions;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.InputSpecUpdate;
import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.apache.tez.runtime.api.events.InputUpdatePayloadEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/tez/dag/app/dag/impl/RootInputVertexManager.class */
public class RootInputVertexManager extends VertexManagerPlugin {
    private static final Logger LOG = LoggerFactory.getLogger(RootInputVertexManager.class);
    public static final String TEZ_ROOT_INPUT_VERTEX_MANAGER_ENABLE_SLOW_START = "tez.root-input-vertex-manager.enable.slow-start";
    public static final boolean TEZ_ROOT_INPUT_VERTEX_MANAGER_ENABLE_SLOW_START_DEFAULT = false;
    public static final String TEZ_ROOT_INPUT_VERTEX_MANAGER_MIN_SRC_FRACTION = "tez.root-input-vertex-manager.min-src-fraction";
    public static final float TEZ_ROOT_INPUT_VERTEX_MANAGER_MIN_SRC_FRACTION_DEFAULT = 0.25f;
    public static final String TEZ_ROOT_INPUT_VERTEX_MANAGER_MAX_SRC_FRACTION = "tez.root-input-vertex-manager.max-src-fraction";
    public static final float TEZ_ROOT_INPUT_VERTEX_MANAGER_MAX_SRC_FRACTION_DEFAULT = 0.75f;
    private String configuredInputName;
    int totalNumSourceTasks;
    int numSourceTasksCompleted;
    private AtomicBoolean onVertexStartedDone;
    private final Map<String, SourceVertexInfo> srcVertexInfo;
    boolean sourceVerticesScheduled;
    List<PendingTaskInfo> pendingTasks;
    int totalTasksToSchedule;
    boolean slowStartEnabled;
    float slowStartMinFraction;
    float slowStartMaxFraction;

    @VisibleForTesting
    Configuration conf;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/RootInputVertexManager$PendingTaskInfo.class */
    public static class PendingTaskInfo {
        private final int index;

        public PendingTaskInfo(int i) {
            this.index = i;
        }

        public String toString() {
            return "[index=" + this.index + "]";
        }

        public int getIndex() {
            return this.index;
        }
    }

    /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/RootInputVertexManager$RootInputVertexManagerConfigBuilder.class */
    public static final class RootInputVertexManagerConfigBuilder {
        private final Configuration conf;

        private RootInputVertexManagerConfigBuilder(@Nullable Configuration configuration) {
            if (configuration == null) {
                this.conf = new Configuration(false);
            } else {
                this.conf = configuration;
            }
        }

        public RootInputVertexManagerConfigBuilder setSlowStart(boolean z) {
            this.conf.setBoolean(RootInputVertexManager.TEZ_ROOT_INPUT_VERTEX_MANAGER_ENABLE_SLOW_START, z);
            return this;
        }

        public RootInputVertexManagerConfigBuilder setSlowStartMinSrcCompletionFraction(float f) {
            this.conf.setFloat(RootInputVertexManager.TEZ_ROOT_INPUT_VERTEX_MANAGER_MIN_SRC_FRACTION, f);
            return this;
        }

        public RootInputVertexManagerConfigBuilder setSlowStartMaxSrcCompletionFraction(float f) {
            this.conf.setFloat(RootInputVertexManager.TEZ_ROOT_INPUT_VERTEX_MANAGER_MAX_SRC_FRACTION, f);
            return this;
        }

        public VertexManagerPluginDescriptor build() {
            try {
                return VertexManagerPluginDescriptor.create(RootInputVertexManager.class.getName()).setUserPayload(TezUtils.createUserPayloadFromConf(this.conf));
            } catch (IOException e) {
                throw new TezUncheckedException(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/tez/dag/app/dag/impl/RootInputVertexManager$SourceVertexInfo.class */
    public static class SourceVertexInfo {
        final EdgeProperty edgeProperty;
        boolean vertexIsConfigured;
        final BitSet finishedTaskSet = new BitSet();
        int numTasks;

        SourceVertexInfo(EdgeProperty edgeProperty, int i) {
            this.edgeProperty = edgeProperty;
        }

        int getNumTasks() {
            return this.numTasks;
        }

        int getNumCompletedTasks() {
            return this.finishedTaskSet.cardinality();
        }
    }

    SourceVertexInfo createSourceVertexInfo(EdgeProperty edgeProperty, int i) {
        return new SourceVertexInfo(edgeProperty, i);
    }

    public RootInputVertexManager(VertexManagerPluginContext vertexManagerPluginContext) {
        super(vertexManagerPluginContext);
        this.totalNumSourceTasks = 0;
        this.numSourceTasksCompleted = 0;
        this.onVertexStartedDone = new AtomicBoolean(false);
        this.srcVertexInfo = new ConcurrentHashMap();
        this.sourceVerticesScheduled = false;
        this.pendingTasks = Lists.newLinkedList();
        this.totalTasksToSchedule = 0;
        this.slowStartEnabled = false;
        this.slowStartMinFraction = 0.0f;
        this.slowStartMaxFraction = 0.0f;
    }

    public void onVertexStarted(List<TaskAttemptIdentifier> list) {
        for (Map.Entry entry : getContext().getInputVertexEdgeProperties().entrySet()) {
            String str = (String) entry.getKey();
            int vertexNumTasks = getContext().getVertexNumTasks(str);
            if (vertexNumTasks > 0) {
                LOG.info("Task count in " + str + ": " + vertexNumTasks);
                this.srcVertexInfo.put(str, createSourceVertexInfo((EdgeProperty) entry.getValue(), getContext().getVertexNumTasks(getContext().getVertexName())));
                getContext().registerForVertexStateUpdates(str, EnumSet.of(VertexState.CONFIGURED));
            } else {
                LOG.info("Vertex: " + getContext().getVertexName() + "; Ignoring " + str + " as it has " + vertexNumTasks + " tasks");
            }
        }
        if (list != null) {
            Iterator<TaskAttemptIdentifier> it = list.iterator();
            while (it.hasNext()) {
                onSourceTaskCompleted(it.next());
            }
        }
        this.onVertexStartedDone.set(true);
        updatePendingTasks();
        processPendingTasks();
    }

    public void onVertexStateUpdated(VertexStateUpdate vertexStateUpdate) {
        Preconditions.checkArgument(vertexStateUpdate.getVertexState() == VertexState.CONFIGURED, "Received incorrect state notification : " + vertexStateUpdate.getVertexState() + " for vertex: " + vertexStateUpdate.getVertexName() + " in vertex: " + getContext().getVertexName());
        SourceVertexInfo sourceVertexInfo = this.srcVertexInfo.get(vertexStateUpdate.getVertexName());
        if (sourceVertexInfo != null) {
            Preconditions.checkState(!sourceVertexInfo.vertexIsConfigured);
            sourceVertexInfo.vertexIsConfigured = true;
            sourceVertexInfo.numTasks = getContext().getVertexNumTasks(vertexStateUpdate.getVertexName());
            this.totalNumSourceTasks += sourceVertexInfo.numTasks;
            LOG.info("Received configured notification : {} for vertex: {} in vertex: {} numjourceTasks: {}", new Object[]{vertexStateUpdate.getVertexState(), vertexStateUpdate.getVertexName(), getContext().getVertexName(), Integer.valueOf(this.totalNumSourceTasks)});
            processPendingTasks();
        }
    }

    public void onSourceTaskCompleted(TaskAttemptIdentifier taskAttemptIdentifier) {
        String name = taskAttemptIdentifier.getTaskIdentifier().getVertexIdentifier().getName();
        int identifier = taskAttemptIdentifier.getTaskIdentifier().getIdentifier();
        SourceVertexInfo sourceVertexInfo = this.srcVertexInfo.get(name);
        if (sourceVertexInfo.vertexIsConfigured) {
            Preconditions.checkState(identifier < sourceVertexInfo.numTasks, "Received completion for srcTaskId " + identifier + " but Vertex: " + name + " has only " + sourceVertexInfo.numTasks + " tasks");
        }
        BitSet bitSet = sourceVertexInfo.finishedTaskSet;
        if (!bitSet.get(identifier)) {
            bitSet.set(identifier);
            this.numSourceTasksCompleted++;
        }
        processPendingTasks();
    }

    public void initialize() {
        UserPayload userPayload = getContext().getUserPayload();
        if (userPayload == null || userPayload.getPayload() == null || userPayload.getPayload().limit() == 0) {
            throw new RuntimeException("Could not initialize RootInputVertexManager from provided user payload");
        }
        try {
            this.conf = TezUtils.createConfFromUserPayload(userPayload);
            this.slowStartEnabled = this.conf.getBoolean(TEZ_ROOT_INPUT_VERTEX_MANAGER_ENABLE_SLOW_START, false);
            if (this.slowStartEnabled) {
                this.slowStartMinFraction = this.conf.getFloat(TEZ_ROOT_INPUT_VERTEX_MANAGER_MIN_SRC_FRACTION, 0.25f);
                this.slowStartMaxFraction = this.conf.getFloat(TEZ_ROOT_INPUT_VERTEX_MANAGER_MAX_SRC_FRACTION, Math.max(this.slowStartMinFraction, 0.75f));
            } else {
                this.slowStartMinFraction = 0.0f;
                this.slowStartMaxFraction = 0.0f;
            }
            if (this.slowStartMinFraction < 0.0f || this.slowStartMaxFraction > 1.0f || this.slowStartMaxFraction < this.slowStartMinFraction) {
                throw new IllegalArgumentException("Invalid values for slowStartMinFraction/slowStartMaxFraction. Min cannot be < 0, max cannot be > 1, and max cannot be < min., configuredMin=" + this.slowStartMinFraction + ", configuredMax=" + this.slowStartMaxFraction);
            }
            updatePendingTasks();
        } catch (IOException e) {
            throw new RuntimeException("Could not initialize RootInputVertexManager from provided user payload", e);
        }
    }

    public void onVertexManagerEventReceived(VertexManagerEvent vertexManagerEvent) {
    }

    public void onRootVertexInitialized(String str, InputDescriptor inputDescriptor, List<Event> list) {
        LinkedList newLinkedList = Lists.newLinkedList();
        boolean z = false;
        Iterator<Event> it = list.iterator();
        while (it.hasNext()) {
            InputConfigureVertexTasksEvent inputConfigureVertexTasksEvent = (Event) it.next();
            if (inputConfigureVertexTasksEvent instanceof InputConfigureVertexTasksEvent) {
                Preconditions.checkState(!z);
                Preconditions.checkState(getContext().getVertexNumTasks(getContext().getVertexName()) == -1, "Parallelism for the vertex should be set to -1 if the InputInitializer is setting parallelism, VertexName: " + getContext().getVertexName());
                Preconditions.checkState(this.configuredInputName == null, "RootInputVertexManager cannot configure multiple inputs. Use a custom VertexManager, VertexName: " + getContext().getVertexName() + ", ConfiguredInput: " + this.configuredInputName + ", CurrentInput: " + str);
                this.configuredInputName = str;
                InputConfigureVertexTasksEvent inputConfigureVertexTasksEvent2 = inputConfigureVertexTasksEvent;
                HashMap hashMap = new HashMap();
                hashMap.put(str, inputConfigureVertexTasksEvent2.getInputSpecUpdate() == null ? InputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate() : inputConfigureVertexTasksEvent2.getInputSpecUpdate());
                getContext().reconfigureVertex(hashMap, inputConfigureVertexTasksEvent2.getLocationHint(), inputConfigureVertexTasksEvent2.getNumTasks());
            }
            if (inputConfigureVertexTasksEvent instanceof InputUpdatePayloadEvent) {
                Preconditions.checkState(!z);
                inputDescriptor.setUserPayload(UserPayload.create(((InputUpdatePayloadEvent) inputConfigureVertexTasksEvent).getUserPayload()));
            } else if (inputConfigureVertexTasksEvent instanceof InputDataInformationEvent) {
                z = true;
                Preconditions.checkState(getContext().getVertexNumTasks(getContext().getVertexName()) != 0);
                Preconditions.checkState(this.configuredInputName == null || this.configuredInputName.equals(str), "RootInputVertexManager cannot configure multiple inputs. Use a custom VertexManager, VertexName:" + getContext().getVertexName() + ", ConfiguredInput: " + this.configuredInputName + ", CurrentInput: " + str);
                this.configuredInputName = str;
                InputDataInformationEvent inputDataInformationEvent = (InputDataInformationEvent) inputConfigureVertexTasksEvent;
                inputDataInformationEvent.setTargetIndex(inputDataInformationEvent.getSourceIndex());
                newLinkedList.add(inputDataInformationEvent);
            }
        }
        getContext().addRootInputEvents(str, newLinkedList);
    }

    private boolean canScheduleTasks() {
        for (Map.Entry<String, SourceVertexInfo> entry : this.srcVertexInfo.entrySet()) {
            if (!entry.getValue().vertexIsConfigured) {
                if (!LOG.isDebugEnabled()) {
                    return false;
                }
                LOG.debug("Waiting for vertex: " + entry.getKey() + " in vertex: " + getContext().getVertexName());
                return false;
            }
        }
        this.sourceVerticesScheduled = true;
        return this.sourceVerticesScheduled;
    }

    private boolean preconditionsSatisfied() {
        if (!this.onVertexStartedDone.get()) {
            return false;
        }
        if (this.sourceVerticesScheduled || canScheduleTasks()) {
            return true;
        }
        if (!LOG.isDebugEnabled()) {
            return false;
        }
        LOG.debug("Defer scheduling tasks for vertex: {} as one task needs to be completed per source vertex", getContext().getVertexName());
        return false;
    }

    void updatePendingTasks() {
        int vertexNumTasks = getContext().getVertexNumTasks(getContext().getVertexName());
        if (vertexNumTasks == this.pendingTasks.size() || vertexNumTasks <= 0) {
            return;
        }
        this.pendingTasks.clear();
        for (int i = 0; i < vertexNumTasks; i++) {
            this.pendingTasks.add(new PendingTaskInfo(i));
        }
        this.totalTasksToSchedule = this.pendingTasks.size();
    }

    private void processPendingTasks() {
        if (preconditionsSatisfied()) {
            schedulePendingTasks();
        }
    }

    private void schedulePendingTasks() {
        List<VertexManagerPluginContext.ScheduleTaskRequest> tasksToSchedule = getTasksToSchedule();
        if (tasksToSchedule == null || tasksToSchedule.size() <= 0) {
            return;
        }
        getContext().scheduleTasks(tasksToSchedule);
    }

    float getMinSourceVertexCompletedTaskFraction() {
        float f = 1.0f;
        if (this.numSourceTasksCompleted != this.totalNumSourceTasks) {
            for (Map.Entry<String, SourceVertexInfo> entry : this.srcVertexInfo.entrySet()) {
                SourceVertexInfo value = entry.getValue();
                Preconditions.checkState(value.vertexIsConfigured, "Vertex: " + entry.getKey());
                if (value.numTasks > 0) {
                    float numCompletedTasks = value.getNumCompletedTasks() / value.numTasks;
                    if (f > numCompletedTasks) {
                        f = numCompletedTasks;
                    }
                }
            }
        }
        return f;
    }

    List<VertexManagerPluginContext.ScheduleTaskRequest> getTasksToSchedule() {
        int numOfTasksToScheduleAndLog = getNumOfTasksToScheduleAndLog(getMinSourceVertexCompletedTaskFraction());
        if (numOfTasksToScheduleAndLog <= 0) {
            return null;
        }
        ArrayList newArrayListWithCapacity = Lists.newArrayListWithCapacity(numOfTasksToScheduleAndLog);
        while (!this.pendingTasks.isEmpty() && numOfTasksToScheduleAndLog > 0) {
            numOfTasksToScheduleAndLog--;
            newArrayListWithCapacity.add(VertexManagerPluginContext.ScheduleTaskRequest.create(Integer.valueOf(this.pendingTasks.get(0).getIndex()).intValue(), (TaskLocationHint) null));
            this.pendingTasks.remove(0);
        }
        return newArrayListWithCapacity;
    }

    int getNumOfTasksToScheduleAndLog(float f) {
        int numOfTasksToSchedule = getNumOfTasksToSchedule(f);
        if (numOfTasksToSchedule > 0) {
            LOG.info("Scheduling {} tasks for vertex: {} with totalTasks: {}. {} source tasks completed out of {}. MinSourceTaskCompletedFraction: {} min: {} max: {}", new Object[]{Integer.valueOf(numOfTasksToSchedule), getContext().getVertexName(), Integer.valueOf(this.totalTasksToSchedule), Integer.valueOf(this.numSourceTasksCompleted), Integer.valueOf(this.totalNumSourceTasks), Float.valueOf(f), Float.valueOf(this.slowStartMinFraction), Float.valueOf(this.slowStartMaxFraction)});
        }
        return numOfTasksToSchedule;
    }

    int getNumOfTasksToSchedule(float f) {
        int size = this.pendingTasks.size();
        if (this.numSourceTasksCompleted == this.totalNumSourceTasks) {
            LOG.info("All source tasks completed. Ramping up {} remaining tasks for vertex: {}", Integer.valueOf(size), getContext().getVertexName());
            return size;
        }
        float f2 = 1.0f;
        float f3 = this.slowStartMaxFraction - this.slowStartMinFraction;
        if (f3 > 0.0f) {
            f2 = (f - this.slowStartMinFraction) / f3;
        } else if (f < this.slowStartMinFraction) {
            f2 = 0.0f;
        }
        return ((int) Math.ceil(Math.max(0.0f, Math.min(1.0f, f2)) * this.totalTasksToSchedule)) - (this.totalTasksToSchedule - size);
    }

    public static RootInputVertexManagerConfigBuilder createConfigBuilder(@Nullable Configuration configuration) {
        return new RootInputVertexManagerConfigBuilder(configuration);
    }
}
