package org.apache.kylin.rest.monitor;

import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.Generated;
import org.apache.kylin.common.KapConfig;
import org.apache.kylin.common.Singletons;
import org.apache.kylin.common.metrics.MetricsCategory;
import org.apache.kylin.common.metrics.MetricsGroup;
import org.apache.kylin.common.metrics.MetricsName;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparderEnv;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kylin/rest/monitor/SparkContextCanary.class */
public class SparkContextCanary {
    private static final Logger logger = LoggerFactory.getLogger(SparkContextCanary.class);
    private static final int THRESHOLD_TO_RESTART_SPARK = KapConfig.getInstanceFromEnv().getThresholdToRestartSpark();
    private static final int PERIOD_MINUTES = KapConfig.getInstanceFromEnv().getSparkCanaryPeriodMinutes();
    private static final String WORKING_DIR = KapConfig.getInstanceFromEnv().getWriteHdfsWorkingDirectory();
    private static final String CHECK_TYPE = KapConfig.getInstanceFromEnv().getSparkCanaryType();
    private volatile int errorAccumulated = 0;
    private volatile long lastResponseTime = -1;
    private volatile boolean sparkRestarting = false;

    private SparkContextCanary() {
    }

    public static SparkContextCanary getInstance() {
        return (SparkContextCanary) Singletons.getInstance(SparkContextCanary.class, cls -> {
            return new SparkContextCanary();
        });
    }

    public void init(ScheduledExecutorService scheduledExecutorService) {
        logger.info("Start monitoring Spark");
        scheduledExecutorService.scheduleWithFixedDelay(this::monitor, PERIOD_MINUTES, PERIOD_MINUTES, TimeUnit.MINUTES);
    }

    public boolean isError() {
        return this.errorAccumulated >= THRESHOLD_TO_RESTART_SPARK;
    }

    void monitor() {
        try {
            long currentTimeMillis = System.currentTimeMillis();
            if (SparderEnv.isSparkAvailable()) {
                Future<Boolean> check = check();
                try {
                    try {
                        try {
                            long currentTimeMillis2 = System.currentTimeMillis();
                            check.get(KapConfig.getInstanceFromEnv().getSparkCanaryErrorResponseMs(), TimeUnit.MILLISECONDS);
                            logger.info("SparkContextCanary checkWriteFile returned successfully, takes {} ms.", Long.valueOf(System.currentTimeMillis() - currentTimeMillis2));
                            this.errorAccumulated = 0;
                        } catch (TimeoutException e) {
                            this.errorAccumulated++;
                            check.cancel(true);
                            logger.error("SparkContextCanary write file timeout.", e);
                        }
                    } catch (Exception e2) {
                        this.errorAccumulated++;
                        logger.error("SparkContextCanary write file occurs exception.", e2);
                    }
                } catch (InterruptedException e3) {
                    this.errorAccumulated++;
                    logger.error("Thread is interrupted.", e3);
                    Thread.currentThread().interrupt();
                } catch (ExecutionException e4) {
                    this.errorAccumulated = Math.max(this.errorAccumulated + 1, THRESHOLD_TO_RESTART_SPARK);
                    logger.error("SparkContextCanary numberCount occurs exception, need to restart immediately.", e4);
                }
            } else {
                logger.info("Spark is unavailable, need to restart immediately.");
                this.errorAccumulated = Math.max(this.errorAccumulated + 1, THRESHOLD_TO_RESTART_SPARK);
            }
            this.lastResponseTime = System.currentTimeMillis() - currentTimeMillis;
            logger.debug("Spark context errorAccumulated:{}", Integer.valueOf(this.errorAccumulated));
            if (isError()) {
                this.sparkRestarting = true;
                try {
                    logger.warn("Repairing spark context");
                    SparderEnv.restartSpark();
                    MetricsGroup.hostTagCounterInc(MetricsName.SPARDER_RESTART, MetricsCategory.GLOBAL, "global");
                } catch (Throwable th) {
                    logger.error("Restart spark context failed.", th);
                }
                this.sparkRestarting = false;
            }
        } catch (Throwable th2) {
            logger.error("Error when monitoring Spark.", th2);
            Thread.currentThread().interrupt();
        }
    }

    private Future<Boolean> check() {
        return Executors.newSingleThreadExecutor().submit(() -> {
            SparkSession sparkSession = SparderEnv.getSparkSession();
            JavaSparkContext fromSparkContext = JavaSparkContext.fromSparkContext(SparderEnv.getSparkSession().sparkContext());
            fromSparkContext.setLocalProperty("spark.scheduler.pool", "vip_tasks");
            fromSparkContext.setJobDescription("Canary check by " + CHECK_TYPE);
            String str = CHECK_TYPE;
            boolean z = -1;
            switch (str.hashCode()) {
                case 3143036:
                    if (str.equals("file")) {
                        z = false;
                        break;
                    }
                    break;
                case 94851343:
                    if (str.equals("count")) {
                        z = true;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    ArrayList arrayList = new ArrayList();
                    for (int i = 0; i < 100; i++) {
                        arrayList.add(RowFactory.create(new Object[]{Integer.valueOf(i)}));
                    }
                    sparkSession.createDataFrame(fromSparkContext.parallelize(arrayList), new StructType(new StructField[]{DataTypes.createStructField("col", DataTypes.IntegerType, true)})).write().mode(SaveMode.Overwrite).parquet(WORKING_DIR + "/_health/" + sparkSession.sparkContext().applicationId());
                    break;
                case true:
                    ArrayList arrayList2 = new ArrayList();
                    for (int i2 = 0; i2 < 100; i2++) {
                        arrayList2.add(Integer.valueOf(i2));
                    }
                    fromSparkContext.parallelize(arrayList2).count();
                    break;
            }
            fromSparkContext.setJobDescription((String) null);
            return true;
        });
    }

    @Generated
    public int getErrorAccumulated() {
        return this.errorAccumulated;
    }

    @Generated
    public long getLastResponseTime() {
        return this.lastResponseTime;
    }

    @Generated
    public boolean isSparkRestarting() {
        return this.sparkRestarting;
    }
}
