package cz.seznam.euphoria.flink;

import cz.seznam.euphoria.core.client.graph.DAG;
import cz.seznam.euphoria.core.client.operator.Operator;
import cz.seznam.euphoria.core.client.operator.PartitioningAware;
import cz.seznam.euphoria.core.executor.FlowUnfolder;
import java.util.HashMap;
import java.util.List;
import java.util.stream.Collectors;

/* loaded from: input_file:cz/seznam/euphoria/flink/FlowOptimizer.class */
public class FlowOptimizer {
    private int maxParallelism = Integer.MAX_VALUE;

    public int getMaxParallelism() {
        return this.maxParallelism;
    }

    public void setMaxParallelism(int i) {
        this.maxParallelism = i;
    }

    public DAG<FlinkOperator<Operator<?, ?>>> optimize(DAG<Operator<?, ?>> dag) {
        return setParallelism(convert(dag));
    }

    private DAG<FlinkOperator<Operator<?, ?>>> convert(DAG<Operator<?, ?>> dag) {
        DAG<FlinkOperator<Operator<?, ?>>> of = DAG.of(new FlinkOperator[0]);
        HashMap hashMap = new HashMap();
        dag.traverse().forEach(node -> {
            Operator operator = (Operator) node.get();
            FlinkOperator flinkOperator = new FlinkOperator(operator);
            hashMap.put(operator, flinkOperator);
            of.add(flinkOperator, (List) node.getParents().stream().map(node -> {
                return (FlinkOperator) hashMap.get(node.get());
            }).collect(Collectors.toList()));
        });
        return of;
    }

    private DAG<FlinkOperator<Operator<?, ?>>> setParallelism(DAG<FlinkOperator<Operator<?, ?>>> dag) {
        dag.traverse().forEach(node -> {
            FlinkOperator flinkOperator = (FlinkOperator) node.get();
            PartitioningAware originalOperator = flinkOperator.getOriginalOperator();
            if (originalOperator instanceof FlowUnfolder.InputOperator) {
                flinkOperator.setParallelism(Math.min(this.maxParallelism, originalOperator.output().getSource().getPartitions().size()));
            } else if (!(originalOperator instanceof PartitioningAware)) {
                flinkOperator.setParallelism(node.getParents().stream().mapToInt(node -> {
                    return ((FlinkOperator) node.get()).getParallelism();
                }).max().getAsInt());
            } else {
                flinkOperator.setParallelism(Math.min(this.maxParallelism, originalOperator.getPartitioning().getNumPartitions()));
            }
        });
        return dag;
    }
}
