package org.apache.nemo.compiler.backend.nemo.prophet;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.nemo.common.Pair;
import org.apache.nemo.common.dag.DAGBuilder;
import org.apache.nemo.common.ir.IRDAG;
import org.apache.nemo.common.ir.edge.IREdge;
import org.apache.nemo.common.ir.edge.executionproperty.PartitionerProperty;
import org.apache.nemo.common.ir.vertex.executionproperty.ParallelismProperty;
import org.apache.nemo.runtime.common.metric.JobMetric;
import org.apache.nemo.runtime.common.plan.PhysicalPlan;
import org.apache.nemo.runtime.common.plan.PhysicalPlanGenerator;
import org.apache.nemo.runtime.common.plan.StageEdge;
import org.apache.nemo.runtime.master.metric.MetricStore;
import org.apache.nemo.runtime.master.scheduler.SimulationScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nemo/compiler/backend/nemo/prophet/ParallelismProphet.class */
public final class ParallelismProphet implements Prophet<String, Long> {
    private static final Logger LOG = LoggerFactory.getLogger(ParallelismProphet.class.getName());
    private final SimulationScheduler simulationScheduler;
    private final PhysicalPlanGenerator physicalPlanGenerator;
    private final IRDAG currentIRDAG;
    private final PhysicalPlan currentPhysicalPlan;
    private final Set<StageEdge> edgesToOptimize;
    private final int partitionerProperty;

    public ParallelismProphet(IRDAG irdag, PhysicalPlan physicalPlan, SimulationScheduler simulationScheduler, PhysicalPlanGenerator physicalPlanGenerator, Set<StageEdge> set) {
        this.currentIRDAG = irdag;
        this.currentPhysicalPlan = physicalPlan;
        this.simulationScheduler = simulationScheduler;
        this.physicalPlanGenerator = physicalPlanGenerator;
        this.edgesToOptimize = set;
        this.partitionerProperty = calculatePartitionerProperty(set);
    }

    @Override // org.apache.nemo.compiler.backend.nemo.prophet.Prophet
    public Map<String, Long> calculate() {
        HashMap hashMap = new HashMap();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 7; i++) {
            arrayList.add(makePhysicalPlanForSimulation((int) (this.partitionerProperty / Math.pow(2.0d, i)), this.edgesToOptimize, this.currentIRDAG));
        }
        hashMap.put("opt.parallelism", Long.valueOf(((Integer) ((Pair) Collections.min((List) arrayList.stream().map(this::launchSimulationForPlan).filter(pair -> {
            return ((double) ((Long) pair.right()).longValue()) > 0.5d;
        }).collect(Collectors.toList()), Comparator.comparing(pair2 -> {
            return (Long) pair2.right();
        }))).left()).longValue()));
        return hashMap;
    }

    private synchronized Pair<Integer, Long> launchSimulationForPlan(PhysicalPlan physicalPlan) {
        this.simulationScheduler.schedulePlan(physicalPlan, 1);
        MetricStore collectMetricStore = this.simulationScheduler.collectMetricStore();
        ArrayList arrayList = new ArrayList();
        collectMetricStore.getMetricMap(JobMetric.class).values().forEach(obj -> {
            arrayList.add(Pair.of(Integer.valueOf(Integer.parseInt(((JobMetric) obj).getId().split("-")[1])), ((JobMetric) obj).getJobDuration()));
        });
        return (Pair) Collections.min(arrayList, Comparator.comparing((v0) -> {
            return v0.right();
        }));
    }

    private int calculatePartitionerProperty(Set<StageEdge> set) {
        return ((Integer) ((Pair) set.iterator().next().getPropertyValue(PartitionerProperty.class).get()).right()).intValue();
    }

    private PhysicalPlan makePhysicalPlanForSimulation(int i, Set<StageEdge> set, IRDAG irdag) {
        Set set2 = (Set) set.stream().map(stageEdge -> {
            return stageEdge.getDst().getIRDAG().getVertices();
        }).flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toSet());
        DAGBuilder dAGBuilder = new DAGBuilder();
        irdag.topologicalDo(iRVertex -> {
            if (set2.contains(iRVertex)) {
                iRVertex.setProperty(ParallelismProperty.of(Integer.valueOf(i)));
                dAGBuilder.addVertex(iRVertex);
                for (IREdge iREdge : irdag.getIncomingEdgesOf(iRVertex)) {
                    if (set2.contains(iREdge.getSrc())) {
                        dAGBuilder.connectVertices(iREdge);
                    }
                }
            }
        });
        return new PhysicalPlan(this.currentPhysicalPlan.getPlanId().concat("-" + i), this.physicalPlanGenerator.stagePartitionIrDAG(new IRDAG(dAGBuilder.buildWithoutSourceSinkCheck())));
    }
}
