package cz.seznam.euphoria.flink;

import cz.seznam.euphoria.core.client.accumulators.AccumulatorProvider;
import cz.seznam.euphoria.core.client.accumulators.VoidAccumulatorProvider;
import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.core.executor.Executor;
import cz.seznam.euphoria.core.util.Settings;
import cz.seznam.euphoria.flink.accumulators.FlinkAccumulatorFactory;
import cz.seznam.euphoria.flink.batch.BatchFlowTranslator;
import cz.seznam.euphoria.flink.streaming.StreamingFlowTranslator;
import java.time.Duration;
import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/seznam/euphoria/flink/FlinkExecutor.class */
public class FlinkExecutor implements Executor {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkExecutor.class);
    private final boolean localEnv;
    private boolean dumpExecPlan;
    private Optional<AbstractStateBackend> stateBackend;
    private Duration autoWatermarkInterval;
    private Duration allowedLateness;
    private Duration latencyTracking;
    private FlinkAccumulatorFactory accumulatorFactory;
    private final Set<Class<?>> registeredClasses;

    @Nullable
    private Duration checkpointInterval;
    private final ExecutorService submitExecutor;

    public FlinkExecutor() {
        this(false);
    }

    public FlinkExecutor(boolean z) {
        this.stateBackend = Optional.empty();
        this.autoWatermarkInterval = Duration.ofMillis(200L);
        this.allowedLateness = Duration.ofMillis(0L);
        this.latencyTracking = Duration.ofSeconds(2L);
        this.accumulatorFactory = new FlinkAccumulatorFactory.Adapter(VoidAccumulatorProvider.getFactory());
        this.registeredClasses = new HashSet();
        this.submitExecutor = Executors.newCachedThreadPool();
        this.localEnv = z;
    }

    public FlinkExecutor setDumpExecutionPlan(boolean z) {
        this.dumpExecPlan = z;
        return this;
    }

    public CompletableFuture<Executor.Result> submit(Flow flow) {
        return CompletableFuture.supplyAsync(() -> {
            return execute(flow);
        }, this.submitExecutor);
    }

    public void shutdown() {
        LOG.info("Shutting down flink executor.");
        this.submitExecutor.shutdownNow();
    }

    public void setAccumulatorProvider(AccumulatorProvider.Factory factory) {
        this.accumulatorFactory = new FlinkAccumulatorFactory.Adapter((AccumulatorProvider.Factory) Objects.requireNonNull(factory));
    }

    public void setAccumulatorProvider(FlinkAccumulatorFactory flinkAccumulatorFactory) {
        this.accumulatorFactory = (FlinkAccumulatorFactory) Objects.requireNonNull(flinkAccumulatorFactory);
    }

    /* JADX WARN: Incorrect condition in loop: B:21:0x0150 */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private cz.seznam.euphoria.core.executor.Executor.Result execute(cz.seznam.euphoria.core.client.flow.Flow r8) {
        /*
            Method dump skipped, instructions count: 444
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: cz.seznam.euphoria.flink.FlinkExecutor.execute(cz.seznam.euphoria.core.client.flow.Flow):cz.seznam.euphoria.core.executor.Executor$Result");
    }

    protected FlowTranslator createBatchTranslator(Settings settings, ExecutionEnvironment executionEnvironment, FlinkAccumulatorFactory flinkAccumulatorFactory) {
        return new BatchFlowTranslator(settings, executionEnvironment.getBatchEnv(), flinkAccumulatorFactory);
    }

    protected FlowTranslator createStreamTranslator(Settings settings, ExecutionEnvironment executionEnvironment, FlinkAccumulatorFactory flinkAccumulatorFactory, Duration duration, Duration duration2) {
        return new StreamingFlowTranslator(settings, executionEnvironment.getStreamEnv(), flinkAccumulatorFactory, duration, duration2);
    }

    public FlinkExecutor setStateBackend(AbstractStateBackend abstractStateBackend) {
        this.stateBackend = Optional.of(abstractStateBackend);
        return this;
    }

    public FlinkExecutor setAutoWatermarkInterval(Duration duration) {
        this.autoWatermarkInterval = (Duration) Objects.requireNonNull(duration);
        return this;
    }

    public FlinkExecutor setAllowedLateness(Duration duration) {
        this.allowedLateness = (Duration) Objects.requireNonNull(duration);
        return this;
    }

    public FlinkExecutor setLatencyTrackingInterval(Duration duration) {
        this.latencyTracking = (Duration) Objects.requireNonNull(duration);
        return this;
    }

    public FlinkExecutor registerClass(Class<?> cls) {
        this.registeredClasses.add(cls);
        return this;
    }

    public FlinkExecutor setCheckpointInterval(@Nonnull Duration duration) {
        this.checkpointInterval = (Duration) Objects.requireNonNull(duration);
        return this;
    }
}
