package org.apache.nemo.compiler.backend.nemo;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.inject.Inject;
import org.apache.nemo.common.ir.IRDAG;
import org.apache.nemo.common.ir.edge.executionproperty.MessageIdEdgeProperty;
import org.apache.nemo.common.ir.executionproperty.ExecutionPropertyMap;
import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
import org.apache.nemo.common.ir.vertex.utility.runtimepass.MessageAggregatorVertex;
import org.apache.nemo.compiler.backend.nemo.prophet.ParallelismProphet;
import org.apache.nemo.compiler.backend.nemo.prophet.Prophet;
import org.apache.nemo.compiler.backend.nemo.prophet.SkewProphet;
import org.apache.nemo.compiler.optimizer.NemoOptimizer;
import org.apache.nemo.compiler.optimizer.pass.runtime.Message;
import org.apache.nemo.runtime.common.comm.ControlMessage;
import org.apache.nemo.runtime.common.plan.PhysicalPlan;
import org.apache.nemo.runtime.common.plan.PhysicalPlanGenerator;
import org.apache.nemo.runtime.common.plan.PlanRewriter;
import org.apache.nemo.runtime.common.plan.Stage;
import org.apache.nemo.runtime.common.plan.StageEdge;
import org.apache.nemo.runtime.master.scheduler.SimulationScheduler;
import org.apache.reef.tang.InjectionFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nemo/compiler/backend/nemo/NemoPlanRewriter.class */
public final class NemoPlanRewriter implements PlanRewriter {
    private static final Logger LOG = LoggerFactory.getLogger(NemoPlanRewriter.class.getName());
    private static final String DATA_NOT_AUGMENTED = "NONE";
    private final NemoOptimizer nemoOptimizer;
    private final NemoBackend nemoBackend;
    private final Map<Integer, Map<Object, Long>> messageIdToAggregatedData = new HashMap();
    private CountDownLatch readyToRewriteLatch = new CountDownLatch(1);
    private final InjectionFuture<SimulationScheduler> simulationSchedulerInjectionFuture;
    private final PhysicalPlanGenerator physicalPlanGenerator;
    private IRDAG currentIRDAG;
    private PhysicalPlan currentPhysicalPlan;

    @Inject
    public NemoPlanRewriter(NemoOptimizer nemoOptimizer, NemoBackend nemoBackend, InjectionFuture<SimulationScheduler> injectionFuture, PhysicalPlanGenerator physicalPlanGenerator) {
        this.nemoOptimizer = nemoOptimizer;
        this.nemoBackend = nemoBackend;
        this.simulationSchedulerInjectionFuture = injectionFuture;
        this.physicalPlanGenerator = physicalPlanGenerator;
    }

    public void setCurrentIRDAG(IRDAG irdag) {
        this.currentIRDAG = irdag;
    }

    public void setCurrentPhysicalPlan(PhysicalPlan physicalPlan) {
        this.currentPhysicalPlan = physicalPlan;
    }

    public PhysicalPlan rewrite(int i) {
        try {
            this.readyToRewriteLatch.await();
        } catch (InterruptedException e) {
            LOG.error("Interrupted while waiting for the rewrite latch: {}", e);
            Thread.currentThread().interrupt();
        }
        if (this.currentIRDAG == null) {
            throw new IllegalStateException();
        }
        Map<Object, Long> remove = this.messageIdToAggregatedData.remove(Integer.valueOf(i));
        if (remove == null) {
            throw new IllegalStateException();
        }
        Set set = (Set) this.currentIRDAG.getVertices().stream().flatMap(iRVertex -> {
            return this.currentIRDAG.getIncomingEdgesOf(iRVertex).stream();
        }).filter(iREdge -> {
            return iREdge.getPropertyValue(MessageIdEdgeProperty.class).isPresent() && ((HashSet) iREdge.getPropertyValue(MessageIdEdgeProperty.class).get()).contains(Integer.valueOf(i)) && !(iREdge.getDst() instanceof MessageAggregatorVertex);
        }).collect(Collectors.toSet());
        if (set.isEmpty()) {
            throw new IllegalArgumentException(String.valueOf(i));
        }
        IRDAG optimizeAtRunTime = this.nemoOptimizer.optimizeAtRunTime(this.currentIRDAG, new Message(i, set, remove));
        setCurrentIRDAG(optimizeAtRunTime);
        PhysicalPlan compile = this.nemoBackend.compile(optimizeAtRunTime);
        List topologicalSort = this.currentPhysicalPlan.getStageDAG().getTopologicalSort();
        List topologicalSort2 = compile.getStageDAG().getTopologicalSort();
        IntStream.range(0, topologicalSort.size()).forEachOrdered(i2 -> {
            ExecutionPropertyMap executionProperties = ((Stage) topologicalSort2.get(i2)).getExecutionProperties();
            ((Stage) topologicalSort.get(i2)).setExecutionProperties(executionProperties);
            executionProperties.get(ParallelismProperty.class).ifPresent(num -> {
                ((Stage) topologicalSort.get(i2)).getTaskIndices().clear();
                ((Stage) topologicalSort.get(i2)).getTaskIndices().addAll((Collection) IntStream.range(0, num.intValue()).boxed().collect(Collectors.toList()));
                IntStream.range(((Stage) topologicalSort.get(i2)).getVertexIdToReadables().size(), num.intValue()).forEach(i2 -> {
                    ((Stage) topologicalSort.get(i2)).getVertexIdToReadables().add(new HashMap());
                });
            });
        });
        return this.currentPhysicalPlan;
    }

    public void accumulate(int i, Set<StageEdge> set, Object obj) {
        List list = (List) obj;
        Prophet skewProphet = (list.isEmpty() || !((ControlMessage.RunTimePassMessageEntry) list.get(0)).getKey().equals(DATA_NOT_AUGMENTED)) ? new SkewProphet(list) : new ParallelismProphet(this.currentIRDAG, this.currentPhysicalPlan, (SimulationScheduler) this.simulationSchedulerInjectionFuture.get(), this.physicalPlanGenerator, set);
        this.messageIdToAggregatedData.putIfAbsent(Integer.valueOf(i), new HashMap());
        this.messageIdToAggregatedData.get(Integer.valueOf(i)).putAll(skewProphet.calculate());
        this.readyToRewriteLatch.countDown();
    }
}
