package cz.seznam.euphoria.flink;

import cz.seznam.euphoria.core.client.dataset.Dataset;
import cz.seznam.euphoria.core.client.dataset.windowing.GlobalWindowing;
import cz.seznam.euphoria.core.client.dataset.windowing.TimeInterval;
import cz.seznam.euphoria.core.client.dataset.windowing.TimeSliding;
import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.core.client.util.Either;
import cz.seznam.euphoria.core.client.util.Pair;
import cz.seznam.euphoria.core.client.util.Triple;
import cz.seznam.euphoria.flink.batch.BatchElement;
import cz.seznam.euphoria.flink.streaming.StreamingElement;
import cz.seznam.euphoria.flink.streaming.windowing.KeyedMultiWindowedElement;
import cz.seznam.euphoria.shaded.guava.com.google.common.collect.Sets;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:cz/seznam/euphoria/flink/ExecutionEnvironment.class */
public class ExecutionEnvironment {
    private static final Logger LOG = LoggerFactory.getLogger(ExecutionEnvironment.class);
    private org.apache.flink.api.java.ExecutionEnvironment batchEnv;
    private StreamExecutionEnvironment streamEnv;

    /* loaded from: input_file:cz/seznam/euphoria/flink/ExecutionEnvironment$Mode.class */
    public enum Mode {
        BATCH,
        STREAMING
    }

    public ExecutionEnvironment(Mode mode, boolean z, Set<Class<?>> set) {
        Set<Class<?>> classesToRegister = getClassesToRegister(set);
        if (mode == Mode.BATCH) {
            this.batchEnv = z ? org.apache.flink.api.java.ExecutionEnvironment.createLocalEnvironment() : org.apache.flink.api.java.ExecutionEnvironment.getExecutionEnvironment();
            org.apache.flink.api.java.ExecutionEnvironment executionEnvironment = this.batchEnv;
            executionEnvironment.getClass();
            classesToRegister.forEach(executionEnvironment::registerType);
        } else {
            this.streamEnv = z ? StreamExecutionEnvironment.createLocalEnvironment() : StreamExecutionEnvironment.getExecutionEnvironment();
            StreamExecutionEnvironment streamExecutionEnvironment = this.streamEnv;
            streamExecutionEnvironment.getClass();
            classesToRegister.forEach(streamExecutionEnvironment::registerType);
        }
        LOG.info("Registered classes {} within flink's runtime", classesToRegister);
    }

    public ExecutionConfig getExecutionConfig() {
        if (this.batchEnv != null) {
            return this.batchEnv.getConfig();
        }
        if (this.streamEnv != null) {
            return this.streamEnv.getConfig();
        }
        throw new IllegalStateException("No execution environment initialized");
    }

    public void execute() throws Exception {
        if (this.batchEnv != null) {
            this.batchEnv.execute();
        } else {
            if (this.streamEnv == null) {
                throw new IllegalStateException("No execution environment initialized");
            }
            this.streamEnv.execute();
        }
    }

    public String dumpExecutionPlan() throws Exception {
        return this.batchEnv != null ? this.batchEnv.getExecutionPlan() : this.streamEnv.getExecutionPlan();
    }

    public org.apache.flink.api.java.ExecutionEnvironment getBatchEnv() {
        if (this.batchEnv == null) {
            throw new IllegalStateException("Batch environment not initialized");
        }
        return this.batchEnv;
    }

    public StreamExecutionEnvironment getStreamEnv() {
        if (this.streamEnv == null) {
            throw new IllegalStateException("Stream environment not initialized");
        }
        return this.streamEnv;
    }

    public static Mode determineMode(Flow flow) {
        Iterator it = flow.sources().iterator();
        while (it.hasNext()) {
            if (!((Dataset) it.next()).isBounded()) {
                return Mode.STREAMING;
            }
        }
        return Mode.BATCH;
    }

    private Set<Class<?>> getClassesToRegister(Set<Class<?>> set) {
        HashSet newHashSet = Sets.newHashSet(set);
        newHashSet.add(GlobalWindowing.Window.class);
        newHashSet.add(TimeInterval.class);
        newHashSet.add(TimeSliding.SlidingWindowSet.class);
        newHashSet.add(Either.class);
        newHashSet.add(Pair.class);
        newHashSet.add(Triple.class);
        newHashSet.add(StreamingElement.class);
        newHashSet.add(BatchElement.class);
        newHashSet.add(KeyedMultiWindowedElement.class);
        return newHashSet;
    }
}
