package cz.seznam.euphoria.flink.streaming;

import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.core.client.graph.DAG;
import cz.seznam.euphoria.core.client.io.DataSink;
import cz.seznam.euphoria.core.client.operator.FlatMap;
import cz.seznam.euphoria.core.client.operator.Operator;
import cz.seznam.euphoria.core.client.operator.ReduceStateByKey;
import cz.seznam.euphoria.core.client.operator.Repartition;
import cz.seznam.euphoria.core.client.operator.Union;
import cz.seznam.euphoria.core.executor.FlowUnfolder;
import cz.seznam.euphoria.core.util.Settings;
import cz.seznam.euphoria.flink.FlinkOperator;
import cz.seznam.euphoria.flink.FlowTranslator;
import cz.seznam.euphoria.flink.accumulators.FlinkAccumulatorFactory;
import cz.seznam.euphoria.flink.streaming.io.DataSinkWrapper;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/* loaded from: input_file:cz/seznam/euphoria/flink/streaming/StreamingFlowTranslator.class */
public class StreamingFlowTranslator extends FlowTranslator {
    private final Map<Class, StreamingOperatorTranslator> translators = new IdentityHashMap();
    private final Settings settings;
    private final StreamExecutionEnvironment env;
    private final FlinkAccumulatorFactory accumulatorFactory;
    private final Duration allowedLateness;
    private final Duration autoWatermarkInterval;

    public StreamingFlowTranslator(Settings settings, StreamExecutionEnvironment streamExecutionEnvironment, FlinkAccumulatorFactory flinkAccumulatorFactory, Duration duration, Duration duration2) {
        this.settings = settings;
        this.env = (StreamExecutionEnvironment) Objects.requireNonNull(streamExecutionEnvironment);
        this.accumulatorFactory = (FlinkAccumulatorFactory) Objects.requireNonNull(flinkAccumulatorFactory);
        this.allowedLateness = (Duration) Objects.requireNonNull(duration);
        this.autoWatermarkInterval = (Duration) Objects.requireNonNull(duration2);
        this.translators.put(FlowUnfolder.InputOperator.class, new InputTranslator());
        this.translators.put(FlatMap.class, new FlatMapTranslator());
        this.translators.put(Repartition.class, new RepartitionTranslator());
        this.translators.put(ReduceStateByKey.class, new ReduceStateByKeyTranslator());
        this.translators.put(Union.class, new UnionTranslator());
    }

    @Override // cz.seznam.euphoria.flink.FlowTranslator
    protected Collection<FlowTranslator.TranslateAcceptor> getAcceptors() {
        return (Collection) this.translators.keySet().stream().map(cls -> {
            return new FlowTranslator.TranslateAcceptor(cls);
        }).collect(Collectors.toList());
    }

    @Override // cz.seznam.euphoria.flink.FlowTranslator
    public List<DataSink<?>> translateInto(Flow flow) {
        DAG<FlinkOperator<Operator<?, ?>>> flowToDag = flowToDag(flow);
        this.env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
        this.env.getConfig().setAutoWatermarkInterval(this.autoWatermarkInterval.toMillis());
        StreamingExecutorContext streamingExecutorContext = new StreamingExecutorContext(this.env, flowToDag, this.accumulatorFactory, this.settings, this.allowedLateness, this.env instanceof LocalStreamEnvironment);
        flowToDag.traverse().map((v0) -> {
            return v0.get();
        }).forEach(flinkOperator -> {
            StreamingOperatorTranslator streamingOperatorTranslator = this.translators.get(flinkOperator.getOriginalOperator().getClass());
            if (streamingOperatorTranslator == null) {
                throw new UnsupportedOperationException("Operator " + flinkOperator.getClass().getSimpleName() + " not supported");
            }
            streamingExecutorContext.setOutput(flinkOperator, streamingOperatorTranslator.translate(flinkOperator, streamingExecutorContext));
        });
        ArrayList arrayList = new ArrayList();
        flowToDag.getLeafs().stream().map((v0) -> {
            return v0.get();
        }).filter(flinkOperator2 -> {
            return flinkOperator2.output().getOutputSink() != null;
        }).forEach(flinkOperator3 -> {
            DataSink outputSink = flinkOperator3.output().getOutputSink();
            arrayList.add(outputSink);
            ((DataStream) Objects.requireNonNull(streamingExecutorContext.getOutputStream(flinkOperator3))).addSink(new DataSinkWrapper(outputSink)).setParallelism(flinkOperator3.getParallelism());
        });
        return arrayList;
    }
}
