package org.apache.kylin.streaming.jobs.impl;

import java.nio.file.Paths;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.Generated;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KapConfig;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.StorageURL;
import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.common.exception.ServerErrorCode;
import org.apache.kylin.common.persistence.ImageDesc;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.metadata.MetadataStore;
import org.apache.kylin.common.persistence.transaction.UnitOfWorkParams;
import org.apache.kylin.common.scheduler.EventBusFactory;
import org.apache.kylin.common.util.AddressUtil;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.guava30.shaded.common.base.Preconditions;
import org.apache.kylin.guava30.shaded.common.io.ByteSource;
import org.apache.kylin.job.constant.JobStatusEnum;
import org.apache.kylin.job.execution.JobTypeEnum;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.cube.utils.StreamingUtils;
import org.apache.kylin.metadata.jar.JarInfoManager;
import org.apache.kylin.metadata.jar.JarTypeEnum;
import org.apache.kylin.metadata.model.NDataModelManager;
import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
import org.apache.kylin.metadata.streaming.DataParserInfo;
import org.apache.kylin.metadata.streaming.DataParserManager;
import org.apache.kylin.streaming.app.StreamingEntry;
import org.apache.kylin.streaming.app.StreamingMergeEntry;
import org.apache.kylin.streaming.constants.StreamingConstants;
import org.apache.kylin.streaming.event.StreamingJobMetaCleanEvent;
import org.apache.kylin.streaming.jobs.AbstractSparkJobLauncher;
import org.apache.kylin.streaming.jobs.StreamingJobUtils;
import org.apache.kylin.streaming.util.MetaInfoUpdater;
import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkLauncher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kylin/streaming/jobs/impl/StreamingJobLauncher.class */
public class StreamingJobLauncher extends AbstractSparkJobLauncher {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(StreamingJobLauncher.class);
    private static final String KRB5CONF_PROPS = "java.security.krb5.conf";
    private static final String JAASCONF_PROPS = "java.security.auth.login.config";
    private static final String HADOOP_CONF_PATH = "./__spark_conf__/__hadoop_conf__/";
    private Map<String, String> jobParams;
    private String mainClazz;
    private String[] appArgs;
    private Long currentTimestamp;
    private StorageURL distMetaStorageUrl;
    private boolean isYarnCluster = false;
    private boolean initialized = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.kylin.streaming.jobs.impl.StreamingJobLauncher$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/kylin/streaming/jobs/impl/StreamingJobLauncher$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$kylin$job$execution$JobTypeEnum = new int[JobTypeEnum.values().length];

        static {
            try {
                $SwitchMap$org$apache$kylin$job$execution$JobTypeEnum[JobTypeEnum.STREAMING_BUILD.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$kylin$job$execution$JobTypeEnum[JobTypeEnum.STREAMING_MERGE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    @Override // org.apache.kylin.streaming.jobs.AbstractSparkJobLauncher, org.apache.kylin.streaming.jobs.SparkJobLauncher
    public void init(String str, String str2, JobTypeEnum jobTypeEnum) {
        super.init(str, str2, jobTypeEnum);
        this.jobParams = this.strmJob.getParams();
        this.currentTimestamp = Long.valueOf(System.currentTimeMillis());
        this.config = StreamingJobUtils.getStreamingKylinConfig(this.config, this.jobParams, str2, str);
        initStorageUrl();
        switch (AnonymousClass1.$SwitchMap$org$apache$kylin$job$execution$JobTypeEnum[jobTypeEnum.ordinal()]) {
            case 1:
                this.mainClazz = StreamingConstants.SPARK_STREAMING_ENTRY;
                this.appArgs = new String[]{str, str2, this.jobParams.getOrDefault(StreamingConstants.STREAMING_DURATION, StreamingConstants.STREAMING_DURATION_DEFAULT), this.jobParams.getOrDefault(StreamingConstants.STREAMING_WATERMARK, StreamingConstants.STREAMING_WATERMARK_DEFAULT), this.distMetaStorageUrl.toString()};
                StreamingJobUtils.createExecutorJaas();
                break;
            case 2:
                this.mainClazz = StreamingConstants.SPARK_STREAMING_MERGE_ENTRY;
                this.appArgs = new String[]{str, str2, this.jobParams.getOrDefault(StreamingConstants.STREAMING_SEGMENT_MAX_SIZE, StreamingConstants.STREAMING_SEGMENT_MAX_SIZE_DEFAULT), this.jobParams.getOrDefault(StreamingConstants.STREAMING_SEGMENT_MERGE_THRESHOLD, StreamingConstants.STREAMING_SEGMENT_MERGE_THRESHOLD_DEFAULT), this.distMetaStorageUrl.toString()};
                break;
            default:
                throw new IllegalArgumentException("The streaming job Type " + jobTypeEnum.name() + " is not supported...");
        }
        this.initialized = true;
    }

    private DataParserInfo getDataParser(String str) {
        return DataParserManager.getInstance(this.config, this.project).getDataParserInfo(str);
    }

    private String getParserName() {
        return NDataModelManager.getInstance(this.config, this.project).getDataModelDesc(this.modelId).getRootFactTable().getTableDesc().getKafkaConfig().getParserName();
    }

    private String getParserJarPath(DataParserInfo dataParserInfo) {
        return JarInfoManager.getInstance(this.config, this.project).getJarInfo(JarTypeEnum.STREAMING_CUSTOM_PARSER, dataParserInfo.getJarName()).getJarPath();
    }

    private String getDriverHDFSLogPath() {
        return String.format(Locale.ROOT, "%s/%s/%s/%s/driver.%s.log", this.config.getStreamingBaseJobsLocation(), this.project, this.jobId, this.currentTimestamp, this.currentTimestamp);
    }

    private String getJobTmpMetaStoreUrlPath() {
        return String.format(Locale.ROOT, "%s/%s/%s/meta", this.config.getStreamingBaseJobsLocation(), this.project, this.modelId);
    }

    private StorageURL getJobTmpHdfsMetaStorageUrl() {
        HashMap hashMap = new HashMap();
        hashMap.put("path", String.format(Locale.ROOT, "%s/meta_%d", getJobTmpMetaStoreUrlPath(), this.currentTimestamp));
        hashMap.put("zip", "true");
        hashMap.put("snapshot", "true");
        return new StorageURL(this.config.getMetadataUrlPrefix(), StreamingConstants.STREAMING_META_URL_DEFAULT, hashMap);
    }

    protected Set<String> getMetadataDumpList() {
        Set<String> collectPrecalculationResource = NDataflowManager.getInstance(this.config, this.project).getDataflow(this.modelId).collectPrecalculationResource();
        collectPrecalculationResource.add("/_image");
        collectPrecalculationResource.add(String.format(Locale.ROOT, "/%s%s/%s", this.project, "/streaming", this.jobId));
        return collectPrecalculationResource;
    }

    @Nullable
    private String getAvailableLatestDumpPath() {
        String jobTmpMetaStoreUrlPath = getJobTmpMetaStoreUrlPath();
        HadoopUtil.mkdirIfNotExist(jobTmpMetaStoreUrlPath);
        List fileStatusPathsFromHDFSDir = HadoopUtil.getFileStatusPathsFromHDFSDir(jobTmpMetaStoreUrlPath, false);
        long streamingJobMetaRetainedTime = this.config.getStreamingJobMetaRetainedTime();
        Map map = (Map) fileStatusPathsFromHDFSDir.stream().collect(Collectors.groupingBy(fileStatus -> {
            return Boolean.valueOf(this.currentTimestamp.longValue() - fileStatus.getModificationTime() > streamingJobMetaRetainedTime);
        }));
        if (map.containsKey(Boolean.TRUE)) {
            List list = (List) ((List) map.get(Boolean.TRUE)).stream().map((v0) -> {
                return v0.getPath();
            }).collect(Collectors.toList());
            EventBusFactory.getInstance().postSync(new StreamingJobMetaCleanEvent(list));
            log.info("delete by {} streaming meta path size:{}", this.jobId, Integer.valueOf(list.size()));
        }
        if (map.containsKey(Boolean.FALSE)) {
            return (String) ((List) map.get(Boolean.FALSE)).stream().max(Comparator.comparingLong((v0) -> {
                return v0.getModificationTime();
            })).map(fileStatus2 -> {
                return fileStatus2.getPath().toString();
            }).orElse(null);
        }
        return null;
    }

    private void initStorageUrl() {
        if (!StreamingUtils.isJobOnCluster(this.config) || !StringUtils.equals(StreamingConstants.STREAMING_META_URL_DEFAULT, this.jobParams.getOrDefault(StreamingConstants.STREAMING_META_URL, StreamingConstants.STREAMING_META_URL_DEFAULT))) {
            this.distMetaStorageUrl = this.config.getMetadataUrl();
            return;
        }
        StorageURL jobTmpHdfsMetaStorageUrl = getJobTmpHdfsMetaStorageUrl();
        Preconditions.checkState(StringUtils.isNotEmpty(jobTmpHdfsMetaStorageUrl.toString()), "Missing metaUrl!");
        String availableLatestDumpPath = getAvailableLatestDumpPath();
        if (StringUtils.isNotEmpty(availableLatestDumpPath)) {
            HashMap hashMap = new HashMap(jobTmpHdfsMetaStorageUrl.getAllParameters());
            hashMap.put("path", availableLatestDumpPath);
            this.distMetaStorageUrl = new StorageURL(jobTmpHdfsMetaStorageUrl.getIdentifier(), jobTmpHdfsMetaStorageUrl.getScheme(), hashMap);
            return;
        }
        KylinConfig createKylinConfig = KylinConfig.createKylinConfig(this.config);
        createKylinConfig.setMetadataUrl(jobTmpHdfsMetaStorageUrl.toString());
        try {
            ResourceStore kylinMetaStore = ResourceStore.getKylinMetaStore(createKylinConfig);
            Throwable th = null;
            try {
                try {
                    MetadataStore metadataStore = kylinMetaStore.getMetadataStore();
                    EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(UnitOfWorkParams.builder().readonly(true).unitName(this.project).processor(() -> {
                        ResourceStore kylinMetaStore2 = ResourceStore.getKylinMetaStore(this.config);
                        Iterator<String> it = getMetadataDumpList().iterator();
                        while (it.hasNext()) {
                            kylinMetaStore2.copy(it.next(), kylinMetaStore);
                        }
                        kylinMetaStore.putResourceWithoutCheck("/_image", ByteSource.wrap(JsonUtil.writeValueAsBytes(new ImageDesc(Long.valueOf(kylinMetaStore2.getAuditLogStore().getMaxId())))), System.currentTimeMillis(), -1L);
                        return null;
                    }).build());
                    metadataStore.dump(kylinMetaStore);
                    this.distMetaStorageUrl = jobTmpHdfsMetaStorageUrl;
                    log.debug("dump meta success.{}", jobTmpHdfsMetaStorageUrl);
                    if (kylinMetaStore != null) {
                        if (0 != 0) {
                            try {
                                kylinMetaStore.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            kylinMetaStore.close();
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } finally {
            }
        } catch (Exception e) {
            log.error("dump meta error,{}", this.jobId, e);
        }
    }

    private void generateLog4jConfiguration(boolean z, StringBuilder sb, String str) {
        String str2 = "file:" + str;
        if (z || this.isYarnCluster || this.config.getSparkMaster().startsWith("k8s")) {
            str2 = Paths.get(str, new String[0]).getFileName().toString();
        }
        sb.append(javaPropertyFormatter("log4j.configurationFile", str2));
    }

    private String wrapDriverJavaOptions(Map<String, String> map) {
        String str = map.get(StreamingConstants.SPARK_DRIVER_OPTS);
        Preconditions.checkNotNull(str, "spark.driver.extraJavaOptions is empty");
        StringBuilder sb = new StringBuilder(str);
        KapConfig instanceFromEnv = KapConfig.getInstanceFromEnv();
        rewriteKrb5Conf(sb, str, instanceFromEnv.getKerberosKrb5ConfPath());
        rewriteKafkaJaasConf(sb, str, instanceFromEnv.getKafkaJaasConfPath());
        sb.append(javaPropertyFormatter(StreamingConstants.REST_SERVER_IP, AddressUtil.getLocalHostExactAddress()));
        sb.append(javaPropertyFormatter("kylin.hdfs.working.dir", this.config.getHdfsWorkingDirectory()));
        sb.append(javaPropertyFormatter("spark.driver.log4j.appender.hdfs.File", getDriverHDFSLogPath()));
        sb.append(javaPropertyFormatter("user.timezone", this.config.getTimeZone()));
        generateLog4jConfiguration(false, sb, this.config.getLogSparkStreamingDriverPropertiesFile());
        return sb.toString();
    }

    private String wrapExecutorJavaOptions(Map<String, String> map) {
        String str = map.get(StreamingConstants.SPARK_EXECUTOR_OPTS);
        Preconditions.checkNotNull(str, "spark.executor.extraJavaOptions is empty");
        KapConfig instanceFromEnv = KapConfig.getInstanceFromEnv();
        StringBuilder sb = new StringBuilder(str);
        rewriteKrb5Conf(sb, str, HADOOP_CONF_PATH + instanceFromEnv.getKerberosKrb5Conf());
        rewriteKafkaJaasConf(sb, str, HADOOP_CONF_PATH + StreamingJobUtils.getExecutorJaasName());
        sb.append(javaPropertyFormatter("kap.spark.identifier", this.jobId));
        sb.append(javaPropertyFormatter("kap.spark.jobTimeStamp", this.currentTimestamp.toString()));
        sb.append(javaPropertyFormatter("kap.spark.project", this.project));
        sb.append(javaPropertyFormatter("user.timezone", this.config.getTimeZone()));
        if (StringUtils.isNotBlank(this.config.getMountSparkLogDir())) {
            sb.append(javaPropertyFormatter("job.mountDir", this.config.getMountSparkLogDir()));
        }
        generateLog4jConfiguration(true, sb, this.config.getLogSparkStreamingExecutorPropertiesFile());
        return sb.toString();
    }

    private String wrapYarnAmJavaOptions(Map<String, String> map) {
        String orDefault = map.getOrDefault(StreamingConstants.SPARK_YARN_AM_OPTS, "");
        KapConfig instanceFromEnv = KapConfig.getInstanceFromEnv();
        StringBuilder sb = new StringBuilder(orDefault);
        rewriteKrb5Conf(sb, orDefault, HADOOP_CONF_PATH + instanceFromEnv.getKerberosKrb5Conf());
        return sb.toString();
    }

    private void rewriteKafkaJaasConf(StringBuilder sb, String str, String str2) {
        if (KapConfig.getInstanceFromEnv().isKafkaJaasEnabled() && this.jobType.equals(JobTypeEnum.STREAMING_BUILD) && !str.contains(JAASCONF_PROPS)) {
            sb.append(javaPropertyFormatter(JAASCONF_PROPS, str2));
        }
    }

    private void rewriteKrb5Conf(StringBuilder sb, String str, String str2) {
        if (!KapConfig.getInstanceFromEnv().isKerberosEnabled() || str.contains(KRB5CONF_PROPS)) {
            return;
        }
        sb.append(javaPropertyFormatter(KRB5CONF_PROPS, str2));
    }

    private void addParserJar(SparkLauncher sparkLauncher) {
        String parserName = getParserName();
        if (!this.jobType.equals(JobTypeEnum.STREAMING_BUILD) || StringUtils.equals("org.apache.kylin.parser.TimedJsonStreamParser", parserName)) {
            return;
        }
        String parserJarPath = getParserJarPath(getDataParser(parserName));
        sparkLauncher.addJar(parserJarPath);
        log.info("streaming job {} use parser {} jar path {}", new Object[]{this.jobId, parserName, parserJarPath});
    }

    public void startYarnJob() throws Exception {
        Map<String, String> streamingSparkConfig = getStreamingSparkConfig(this.config);
        streamingSparkConfig.forEach((str, str2) -> {
            this.launcher.setConf(str, str2);
        });
        SparkLauncher sparkHome = this.launcher.setAppName(this.jobId).setSparkHome(KylinConfig.getSparkHome());
        KapConfig instanceFromEnv = KapConfig.getInstanceFromEnv();
        if (instanceFromEnv.isKerberosEnabled()) {
            sparkHome.setConf(StreamingConstants.SPARK_KERBEROS_KEYTAB, instanceFromEnv.getKerberosKeytabPath());
            sparkHome.setConf(StreamingConstants.SPARK_KERBEROS_PRINCIPAL, instanceFromEnv.getKerberosPrincipal());
        }
        if (instanceFromEnv.isKafkaJaasEnabled() && this.jobType.equals(JobTypeEnum.STREAMING_BUILD)) {
            String jaasKeyTabAbsPath = StreamingJobUtils.getJaasKeyTabAbsPath();
            if (StringUtils.isNotEmpty(jaasKeyTabAbsPath)) {
                sparkHome.addFile(jaasKeyTabAbsPath);
            }
        }
        addParserJar(sparkHome);
        String orDefault = streamingSparkConfig.getOrDefault(StreamingConstants.SPARK_EXECUTOR_INSTANCES, "2");
        String orDefault2 = streamingSparkConfig.getOrDefault(StreamingConstants.SPARK_EXECUTOR_CORES, "2");
        sparkHome.setMaster(streamingSparkConfig.getOrDefault(StreamingConstants.SPARK_MASTER, StreamingConstants.SPARK_MASTER_DEFAULT)).setConf(StreamingConstants.SPARK_DRIVER_MEM, streamingSparkConfig.getOrDefault(StreamingConstants.SPARK_DRIVER_MEM, StreamingConstants.SPARK_DRIVER_MEM_DEFAULT)).setConf(StreamingConstants.SPARK_EXECUTOR_INSTANCES, orDefault).setConf(StreamingConstants.SPARK_EXECUTOR_CORES, orDefault2).setConf(StreamingConstants.SPARK_CORES_MAX, calcMaxCores(orDefault, orDefault2)).setConf(StreamingConstants.SPARK_EXECUTOR_MEM, streamingSparkConfig.getOrDefault(StreamingConstants.SPARK_EXECUTOR_MEM, "1g")).setConf(StreamingConstants.SPARK_SHUFFLE_PARTITIONS, streamingSparkConfig.getOrDefault(StreamingConstants.SPARK_SHUFFLE_PARTITIONS, StreamingConstants.SPARK_SHUFFLE_PARTITIONS_DEFAULT)).setConf(StreamingConstants.SPARK_YARN_DIST_JARS, this.kylinJobJar).setConf(StreamingConstants.SPARK_YARN_TIMELINE_SERVICE, "false").setConf("spark.driver.extraClassPath", this.kylinJobJar).setConf("spark.executor.extraClassPath", Paths.get(this.kylinJobJar, new String[0]).getFileName().toString()).setConf(StreamingConstants.SPARK_DRIVER_OPTS, wrapDriverJavaOptions(streamingSparkConfig)).setConf(StreamingConstants.SPARK_EXECUTOR_OPTS, wrapExecutorJavaOptions(streamingSparkConfig)).setConf(StreamingConstants.SPARK_YARN_AM_OPTS, wrapYarnAmJavaOptions(streamingSparkConfig)).addJar(this.config.getKylinExtJarsPath()).addFile(this.config.getLogSparkStreamingExecutorPropertiesFile()).setAppResource(this.kylinJobJar).setMainClass(this.mainClazz).addAppArgs(this.appArgs);
        this.handler = sparkHome.startApplication(new SparkAppHandle.Listener[]{this.listener});
    }

    @Override // org.apache.kylin.streaming.jobs.AbstractSparkJobLauncher, org.apache.kylin.streaming.jobs.SparkJobLauncher
    public void launch() {
        try {
            if (this.config.isUTEnv()) {
                log.info("{} -- {} {} streaming job starts to launch", new Object[]{this.project, this.modelId, this.jobType.name()});
            } else if (!StreamingUtils.isLocalMode()) {
                startYarnJob();
            } else if (JobTypeEnum.STREAMING_BUILD == this.jobType) {
                log.info("Starting streaming build job in local mode...");
                StreamingEntry.main(this.appArgs);
            } else if (JobTypeEnum.STREAMING_MERGE == this.jobType) {
                log.info("Starting streaming merge job in local mode...");
                StreamingMergeEntry.main(this.appArgs);
            }
            log.info("Streaming job create success on model {}", this.modelId);
        } catch (Exception e) {
            log.error("launch streaming application failed: " + e.getMessage(), e);
            MetaInfoUpdater.updateJobState(this.project, this.jobId, JobStatusEnum.LAUNCHING_ERROR);
            throw new KylinException(ServerErrorCode.JOB_START_FAILURE, e.getMessage());
        }
    }

    @Override // org.apache.kylin.streaming.jobs.AbstractSparkJobLauncher, org.apache.kylin.streaming.jobs.SparkJobLauncher
    public void stop() {
        MetaInfoUpdater.markGracefulShutdown(this.project, StreamingUtils.getJobId(this.modelId, this.jobType.name()));
    }

    private String calcMaxCores(String str, String str2) {
        return String.valueOf(Integer.parseInt(str) * Integer.parseInt(str2));
    }

    @Generated
    public boolean isInitialized() {
        return this.initialized;
    }
}
