package cz.seznam.euphoria.flink.batch;

import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.core.client.functional.UnaryPredicate;
import cz.seznam.euphoria.core.client.graph.DAG;
import cz.seznam.euphoria.core.client.io.DataSink;
import cz.seznam.euphoria.core.client.operator.FlatMap;
import cz.seznam.euphoria.core.client.operator.Operator;
import cz.seznam.euphoria.core.client.operator.ReduceByKey;
import cz.seznam.euphoria.core.client.operator.ReduceStateByKey;
import cz.seznam.euphoria.core.client.operator.Repartition;
import cz.seznam.euphoria.core.client.operator.Sort;
import cz.seznam.euphoria.core.client.operator.Union;
import cz.seznam.euphoria.core.executor.FlowUnfolder;
import cz.seznam.euphoria.core.util.Settings;
import cz.seznam.euphoria.flink.FlinkOperator;
import cz.seznam.euphoria.flink.FlowOptimizer;
import cz.seznam.euphoria.flink.FlowTranslator;
import cz.seznam.euphoria.flink.accumulators.FlinkAccumulatorFactory;
import cz.seznam.euphoria.flink.batch.io.DataSinkWrapper;
import cz.seznam.euphoria.shaded.guava.com.google.common.base.Preconditions;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Collection;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.core.io.LocatableInputSplit;

/* loaded from: input_file:cz/seznam/euphoria/flink/batch/BatchFlowTranslator.class */
public class BatchFlowTranslator extends FlowTranslator {
    public static final SplitAssignerFactory DEFAULT_SPLIT_ASSIGNER_FACTORY = (locatableInputSplitArr, num) -> {
        return new LocatableInputSplitAssigner(locatableInputSplitArr);
    };
    private final Map<Class, Translation> translations;
    private final Settings settings;
    private final ExecutionEnvironment env;
    private final FlinkAccumulatorFactory accumulatorFactory;

    /* loaded from: input_file:cz/seznam/euphoria/flink/batch/BatchFlowTranslator$SplitAssignerFactory.class */
    public interface SplitAssignerFactory extends BiFunction<LocatableInputSplit[], Integer, InputSplitAssigner>, Serializable {
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:cz/seznam/euphoria/flink/batch/BatchFlowTranslator$Translation.class */
    public static class Translation<O extends Operator<?, ?>> {
        final BatchOperatorTranslator<O> translator;
        final UnaryPredicate<O> accept;

        private Translation(BatchOperatorTranslator<O> batchOperatorTranslator, UnaryPredicate<O> unaryPredicate) {
            this.translator = (BatchOperatorTranslator) Objects.requireNonNull(batchOperatorTranslator);
            this.accept = unaryPredicate;
        }

        static <O extends Operator<?, ?>> void set(Map<Class, Translation> map, Class<O> cls, BatchOperatorTranslator<O> batchOperatorTranslator) {
            set(map, cls, batchOperatorTranslator, null);
        }

        static <O extends Operator<?, ?>> void set(Map<Class, Translation> map, Class<O> cls, BatchOperatorTranslator<O> batchOperatorTranslator, UnaryPredicate<O> unaryPredicate) {
            map.put(cls, new Translation(batchOperatorTranslator, unaryPredicate));
        }
    }

    public BatchFlowTranslator(Settings settings, ExecutionEnvironment executionEnvironment, FlinkAccumulatorFactory flinkAccumulatorFactory) {
        this(settings, executionEnvironment, flinkAccumulatorFactory, DEFAULT_SPLIT_ASSIGNER_FACTORY);
    }

    public BatchFlowTranslator(Settings settings, ExecutionEnvironment executionEnvironment, FlinkAccumulatorFactory flinkAccumulatorFactory, SplitAssignerFactory splitAssignerFactory) {
        this.translations = new IdentityHashMap();
        this.settings = (Settings) Objects.requireNonNull(settings);
        this.env = (ExecutionEnvironment) Objects.requireNonNull(executionEnvironment);
        this.accumulatorFactory = (FlinkAccumulatorFactory) Objects.requireNonNull(flinkAccumulatorFactory);
        Translation.set(this.translations, FlowUnfolder.InputOperator.class, new InputTranslator(splitAssignerFactory));
        Translation.set(this.translations, FlatMap.class, new FlatMapTranslator());
        Translation.set(this.translations, Repartition.class, new RepartitionTranslator());
        Translation.set(this.translations, ReduceStateByKey.class, new ReduceStateByKeyTranslator());
        Translation.set(this.translations, Union.class, new UnionTranslator());
        Translation.set(this.translations, ReduceByKey.class, new ReduceByKeyTranslator(), ReduceByKeyTranslator::wantTranslate);
        Translation.set(this.translations, Sort.class, new SortTranslator(), SortTranslator::wantTranslate);
    }

    @Override // cz.seznam.euphoria.flink.FlowTranslator
    protected Collection<FlowTranslator.TranslateAcceptor> getAcceptors() {
        return (Collection) this.translations.entrySet().stream().map(entry -> {
            return new FlowTranslator.TranslateAcceptor((Class) entry.getKey(), ((Translation) entry.getValue()).accept);
        }).collect(Collectors.toList());
    }

    @Override // cz.seznam.euphoria.flink.FlowTranslator
    protected FlowOptimizer createOptimizer() {
        FlowOptimizer flowOptimizer = new FlowOptimizer();
        flowOptimizer.setMaxParallelism(this.env.getParallelism());
        return flowOptimizer;
    }

    @Override // cz.seznam.euphoria.flink.FlowTranslator
    public List<DataSink<?>> translateInto(Flow flow) {
        DAG<FlinkOperator<Operator<?, ?>>> flowToDag = flowToDag(flow);
        BatchExecutorContext batchExecutorContext = new BatchExecutorContext(this.env, flowToDag, this.accumulatorFactory, this.settings);
        flowToDag.traverse().map((v0) -> {
            return v0.get();
        }).forEach(flinkOperator -> {
            Operator originalOperator = flinkOperator.getOriginalOperator();
            Translation translation = this.translations.get(originalOperator.getClass());
            if (translation == null) {
                throw new UnsupportedOperationException("Operator " + flinkOperator.getClass().getSimpleName() + " not supported");
            }
            Preconditions.checkState(translation.accept == 0 || Boolean.TRUE.equals(translation.accept.apply(originalOperator)));
            batchExecutorContext.setOutput(flinkOperator, (DataSet) translation.translator.translate(flinkOperator, batchExecutorContext));
        });
        ArrayList arrayList = new ArrayList();
        flowToDag.getLeafs().stream().map((v0) -> {
            return v0.get();
        }).filter(flinkOperator2 -> {
            return flinkOperator2.output().getOutputSink() != null;
        }).forEach(flinkOperator3 -> {
            DataSink outputSink = flinkOperator3.output().getOutputSink();
            arrayList.add(outputSink);
            ((DataSet) Objects.requireNonNull(batchExecutorContext.getOutputStream(flinkOperator3))).output(new DataSinkWrapper(outputSink));
        });
        return arrayList;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 666050004:
                if (implMethodName.equals("lambda$static$368573a3$1")) {
                    z = false;
                    break;
                }
                break;
            case 720818462:
                if (implMethodName.equals("wantTranslate")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/seznam/euphoria/flink/batch/BatchFlowTranslator$SplitAssignerFactory") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/seznam/euphoria/flink/batch/BatchFlowTranslator") && serializedLambda.getImplMethodSignature().equals("([Lorg/apache/flink/core/io/LocatableInputSplit;Ljava/lang/Integer;)Lorg/apache/flink/core/io/InputSplitAssigner;")) {
                    return (locatableInputSplitArr, num) -> {
                        return new LocatableInputSplitAssigner(locatableInputSplitArr);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/seznam/euphoria/core/client/functional/UnaryPredicate") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/seznam/euphoria/flink/batch/ReduceByKeyTranslator") && serializedLambda.getImplMethodSignature().equals("(Lcz/seznam/euphoria/core/client/operator/ReduceByKey;)Z")) {
                    return ReduceByKeyTranslator::wantTranslate;
                }
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/seznam/euphoria/core/client/functional/UnaryPredicate") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/seznam/euphoria/flink/batch/SortTranslator") && serializedLambda.getImplMethodSignature().equals("(Lcz/seznam/euphoria/core/client/operator/Sort;)Z")) {
                    return SortTranslator::wantTranslate;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
