package org.apache.seatunnel.core.starter.spark.execution;

import java.net.URL;
import java.util.Iterator;
import java.util.List;
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.core.starter.enums.PluginType;
import org.apache.seatunnel.core.starter.execution.RuntimeEnvironment;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigValue;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Seconds;
import org.apache.spark.streaming.StreamingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/core/starter/spark/execution/SparkRuntimeEnvironment.class */
public class SparkRuntimeEnvironment implements RuntimeEnvironment {
    private static final long DEFAULT_SPARK_STREAMING_DURATION = 5;
    private static final String PLUGIN_NAME_KEY = "plugin_name";
    private SparkConf sparkConf;
    private SparkSession sparkSession;
    private StreamingContext streamingContext;
    private Config config;
    private JobMode jobMode;
    private static final Logger log = LoggerFactory.getLogger(SparkRuntimeEnvironment.class);
    private static volatile SparkRuntimeEnvironment INSTANCE = null;
    private boolean enableHive = false;
    private String jobName = Constants.LOGO;

    private SparkRuntimeEnvironment(Config config) {
        setEnableHive(checkIsContainHive(config));
        initialize(config);
    }

    public void setEnableHive(boolean z) {
        this.enableHive = z;
    }

    @Override // org.apache.seatunnel.core.starter.execution.RuntimeEnvironment
    public RuntimeEnvironment setConfig(Config config) {
        this.config = config;
        return this;
    }

    @Override // org.apache.seatunnel.core.starter.execution.RuntimeEnvironment
    public RuntimeEnvironment setJobMode(JobMode jobMode) {
        this.jobMode = jobMode;
        return this;
    }

    @Override // org.apache.seatunnel.core.starter.execution.RuntimeEnvironment
    public JobMode getJobMode() {
        return this.jobMode;
    }

    @Override // org.apache.seatunnel.core.starter.execution.RuntimeEnvironment
    public Config getConfig() {
        return this.config;
    }

    @Override // org.apache.seatunnel.core.starter.execution.RuntimeEnvironment
    public CheckResult checkConfig() {
        return CheckResult.success();
    }

    @Override // org.apache.seatunnel.core.starter.execution.RuntimeEnvironment
    public void registerPlugin(List<URL> list) {
        log.info("register plugins :" + list);
    }

    @Override // org.apache.seatunnel.core.starter.execution.RuntimeEnvironment
    public SparkRuntimeEnvironment prepare() {
        if (this.config.hasPath("job.name")) {
            this.jobName = this.config.getString("job.name");
        }
        this.sparkConf = createSparkConf();
        SparkSession.Builder config = SparkSession.builder().config(this.sparkConf);
        if (this.enableHive) {
            config.enableHiveSupport();
        }
        this.sparkSession = config.getOrCreate();
        createStreamingContext();
        return this;
    }

    public SparkSession getSparkSession() {
        return this.sparkSession;
    }

    public StreamingContext getStreamingContext() {
        return this.streamingContext;
    }

    public SparkConf getSparkConf() {
        return this.sparkConf;
    }

    private SparkConf createSparkConf() {
        SparkConf sparkConf = new SparkConf();
        this.config.entrySet().forEach(entry -> {
            sparkConf.set((String) entry.getKey(), String.valueOf(((ConfigValue) entry.getValue()).unwrapped()));
        });
        sparkConf.setAppName(this.jobName);
        return sparkConf;
    }

    private void createStreamingContext() {
        long j = this.sparkSession.sparkContext().getConf().getLong("spark.stream.batchDuration", DEFAULT_SPARK_STREAMING_DURATION);
        if (this.streamingContext == null) {
            this.streamingContext = new StreamingContext(this.sparkSession.sparkContext(), Seconds.apply(j));
        }
    }

    protected boolean checkIsContainHive(Config config) {
        Iterator<? extends Config> it = config.getConfigList(PluginType.SOURCE.getType()).iterator();
        while (it.hasNext()) {
            if (it.next().getString("plugin_name").toLowerCase().contains("hive")) {
                return true;
            }
        }
        Iterator<? extends Config> it2 = config.getConfigList(PluginType.SINK.getType()).iterator();
        while (it2.hasNext()) {
            if (it2.next().getString("plugin_name").toLowerCase().contains("hive")) {
                return true;
            }
        }
        return false;
    }

    public static SparkRuntimeEnvironment getInstance(Config config) {
        if (INSTANCE == null) {
            synchronized (SparkRuntimeEnvironment.class) {
                if (INSTANCE == null) {
                    INSTANCE = new SparkRuntimeEnvironment(config);
                }
            }
        }
        return INSTANCE;
    }
}
