package org.apache.kylin.streaming.app;

import java.io.IOException;
import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Generated;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kylin.cluster.IClusterManager;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.StorageURL;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.persistence.metadata.JdbcPartialAuditLogStore;
import org.apache.kylin.common.util.AddressUtil;
import org.apache.kylin.common.util.Application;
import org.apache.kylin.common.util.TimeZoneUtils;
import org.apache.kylin.common.util.Unsafe;
import org.apache.kylin.engine.spark.job.KylinBuildEnv;
import org.apache.kylin.engine.spark.job.UdfManager;
import org.apache.kylin.engine.spark.utils.JobMetricsUtils;
import org.apache.kylin.guava30.shaded.common.base.Preconditions;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.JobTypeEnum;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.cube.utils.StreamingUtils;
import org.apache.kylin.streaming.constants.StreamingConstants;
import org.apache.kylin.streaming.jobs.GracefulStopInterface;
import org.apache.kylin.streaming.manager.StreamingJobManager;
import org.apache.kylin.streaming.metadata.StreamingJobMeta;
import org.apache.kylin.streaming.request.StreamingJobUpdateRequest;
import org.apache.kylin.streaming.rest.RestSupport;
import org.apache.kylin.streaming.util.JobExecutionIdHolder;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.KylinSession;
import org.apache.spark.sql.KylinSession$;
import org.apache.spark.sql.SparderEnv;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSessionExtensions;
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
import org.apache.spark.sql.catalyst.rules.Rule;
import org.apache.spark.sql.execution.datasource.AlignmentTableStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;

/* loaded from: input_file:org/apache/kylin/streaming/app/StreamingApplication.class */
public abstract class StreamingApplication implements Application, GracefulStopInterface {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(StreamingApplication.class);
    protected SparkSession ss;
    protected String project;
    protected String dataflowId;
    protected String distMetaUrl;
    protected JobTypeEnum jobType;
    protected String jobId;
    protected Integer jobExecId;
    protected final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
    private final AtomicReference<Object> metaResPathSet = new AtomicReference<>();

    private void prepareKylinConfig() throws Exception {
        if (!StorageURL.valueOf(this.distMetaUrl).getScheme().equals(StreamingConstants.STREAMING_META_URL_DEFAULT)) {
            this.kylinConfig.setMetadataUrl(this.distMetaUrl);
            return;
        }
        JdbcPartialAuditLogStore jdbcPartialAuditLogStore = new JdbcPartialAuditLogStore(this.kylinConfig, str -> {
            return str.startsWith(String.format(Locale.ROOT, "/%s%s/%s", this.project, "/dataflow_details", this.dataflowId)) || getMetaResPathSet().contains(str);
        });
        this.kylinConfig.setMetadataUrl(this.distMetaUrl);
        Preconditions.checkState(StreamingConstants.STREAMING_META_URL_DEFAULT.equals(this.kylinConfig.getMetadataUrl().getScheme()));
        ResourceStore kylinMetaStore = ResourceStore.getKylinMetaStore(this.kylinConfig);
        kylinMetaStore.getMetadataStore().setAuditLogStore(jdbcPartialAuditLogStore);
        kylinMetaStore.catchup();
        log.info("start job from offset:{}", Long.valueOf(jdbcPartialAuditLogStore.getLogOffset()));
    }

    private Set<String> initMetaPathSet() {
        Set<String> collectPrecalculationResource = NDataflowManager.getInstance(this.kylinConfig, this.project).getDataflow(this.dataflowId).collectPrecalculationResource();
        collectPrecalculationResource.add(String.format(Locale.ROOT, "/%s%s/%s", this.project, "/streaming", this.jobId));
        return collectPrecalculationResource;
    }

    protected void prepareBeforeExecute() throws ExecuteException {
        try {
            TimeZoneUtils.setDefaultTimeZone(this.kylinConfig);
            if (isJobOnCluster()) {
                prepareKylinConfig();
            }
            getOrCreateSparkSession(KylinBuildEnv.getOrCreate(this.kylinConfig).sparkConf());
            this.jobExecId = reportApplicationInfo();
            JobExecutionIdHolder.setJobExecutionId(this.jobId, this.jobExecId);
            startJobExecutionIdCheckThread();
        } catch (Exception e) {
            throw new ExecuteException(e);
        }
    }

    public abstract void parseParams(String[] strArr);

    public void execute(String[] strArr) {
        try {
            parseParams(strArr);
            prepareBeforeExecute();
            doExecute();
        } catch (Exception e) {
            log.error("{} execute error", getClass().getCanonicalName(), e);
            ExceptionUtils.rethrow(e);
        }
    }

    protected abstract void doExecute() throws ExecuteException;

    public void getOrCreateSparkSession(SparkConf sparkConf) {
        SparkSession.Builder config = SparkSession.builder().withExtensions(new AbstractFunction1<SparkSessionExtensions, BoxedUnit>() { // from class: org.apache.kylin.streaming.app.StreamingApplication.1
            public BoxedUnit apply(SparkSessionExtensions sparkSessionExtensions) {
                sparkSessionExtensions.injectPostHocResolutionRule(new AbstractFunction1<SparkSession, Rule<LogicalPlan>>() { // from class: org.apache.kylin.streaming.app.StreamingApplication.1.1
                    public Rule<LogicalPlan> apply(SparkSession sparkSession) {
                        return new AlignmentTableStats(sparkSession);
                    }
                });
                return BoxedUnit.UNIT;
            }
        }).enableHiveSupport().config(sparkConf).config("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false");
        boolean z = !isJobOnCluster() && SparderEnv.isSparkAvailable();
        if (z) {
            z = !(SparderEnv.getSparkSession() instanceof KylinSession);
        }
        if (z) {
            this.ss = config.getOrCreate();
        } else {
            this.ss = KylinSession$.MODULE$.KylinBuilder(config).buildCluster().getOrCreateKylinSession();
        }
        UdfManager.create(this.ss);
        JobMetricsUtils.registerListener(this.ss);
        if (isJobOnCluster()) {
            Unsafe.setProperty("kylin.env", KylinConfig.getInstanceFromEnv().getDeployEnv());
        }
    }

    public void closeAuditLogStore(SparkSession sparkSession) {
        if (isJobOnCluster()) {
            JobMetricsUtils.unRegisterListener(sparkSession);
            try {
                ResourceStore.getKylinMetaStore(KylinConfig.getInstanceFromEnv()).getAuditLogStore().close();
            } catch (IOException e) {
                log.error("close audit log error", e);
            }
        }
    }

    public Integer reportApplicationInfo() {
        KylinBuildEnv orCreateKylinBuildEnv = getOrCreateKylinBuildEnv(this.kylinConfig);
        String applicationId = this.ss.sparkContext().applicationId();
        String trackingUrl = getTrackingUrl(orCreateKylinBuildEnv.clusterManager(), this.ss);
        boolean isTrackingUrlIpAddressEnabled = this.kylinConfig.isTrackingUrlIpAddressEnabled();
        try {
            if (StringUtils.isBlank(trackingUrl)) {
                log.info("Get tracking url of application $appId, but empty url found.");
            }
            if (isTrackingUrlIpAddressEnabled && !StringUtils.isEmpty(trackingUrl)) {
                trackingUrl = tryReplaceHostAddress(trackingUrl);
            }
        } catch (Exception e) {
            log.error("get tracking url failed!", e);
        }
        StreamingJobUpdateRequest streamingJobUpdateRequest = new StreamingJobUpdateRequest(this.project, this.dataflowId, this.jobType.name(), applicationId, trackingUrl);
        streamingJobUpdateRequest.setProcessId(StreamingUtils.getProcessId());
        streamingJobUpdateRequest.setNodeInfo(AddressUtil.getZkLocalInstance());
        RestSupport createRestSupport = createRestSupport(this.kylinConfig);
        Throwable th = null;
        try {
            try {
                Integer valueOf = Integer.valueOf(Integer.parseInt((String) createRestSupport.execute(createRestSupport.createHttpPut("/streaming_jobs/spark"), streamingJobUpdateRequest).getData()));
                if (createRestSupport != null) {
                    if (0 != 0) {
                        try {
                            createRestSupport.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createRestSupport.close();
                    }
                }
                return valueOf;
            } finally {
            }
        } catch (Throwable th3) {
            if (createRestSupport != null) {
                if (th != null) {
                    try {
                        createRestSupport.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createRestSupport.close();
                }
            }
            throw th3;
        }
    }

    public KylinBuildEnv getOrCreateKylinBuildEnv(KylinConfig kylinConfig) {
        return KylinBuildEnv.getOrCreate(kylinConfig);
    }

    public String getTrackingUrl(IClusterManager iClusterManager, SparkSession sparkSession) {
        return iClusterManager.getBuildTrackingUrl(sparkSession);
    }

    public String tryReplaceHostAddress(String str) {
        try {
            String host = URI.create(str).getHost();
            return str.replace(host, InetAddress.getByName(host).getHostAddress());
        } catch (UnknownHostException e) {
            log.error("failed to get the ip address of $originHost, step back to use the origin tracking url.", e);
            return str;
        }
    }

    public void systemExit(int i) {
        if (isJobOnCluster()) {
            Unsafe.systemExit(i);
        }
    }

    public boolean isJobOnCluster() {
        return (StreamingUtils.isLocalMode() || KylinConfig.getInstanceFromEnv().isUTEnv()) ? false : true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void closeSparkSession() {
        if (StreamingUtils.isLocalMode() || this.ss.sparkContext().isStopped()) {
            return;
        }
        this.ss.stop();
    }

    public SparkSession getSparkSession() {
        return this.ss;
    }

    public void setSparkSession(SparkSession sparkSession) {
        this.ss = sparkSession;
    }

    public Map<String, String> getJobParams(StreamingJobMeta streamingJobMeta) {
        return streamingJobMeta.getParams();
    }

    public boolean isGracefulShutdown(String str, String str2) {
        return StreamingConstants.ACTION_GRACEFUL_SHUTDOWN.equals(StreamingJobManager.getInstance(KylinConfig.getInstanceFromEnv(), str).getStreamingJobByUuid(str2).getAction());
    }

    public boolean isRunning() {
        return (getStopFlag() || this.ss.sparkContext().isStopped()) ? false : true;
    }

    public void startJobExecutionIdCheckThread() {
        Thread thread = new Thread(() -> {
            KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
            long streamingJobExecutionIdCheckInterval = instanceFromEnv.getStreamingJobExecutionIdCheckInterval();
            while (isRunning()) {
                try {
                    StreamingUtils.replayAuditlog();
                } catch (Exception e) {
                    log.warn("check JobExecutionId error:", e);
                }
                if (!Objects.equals(this.jobExecId, StreamingJobManager.getInstance(instanceFromEnv, this.project).getStreamingJobByUuid(this.jobId).getJobExecutionId())) {
                    closeSparkSession();
                    return;
                } else {
                    continue;
                    StreamingUtils.sleep(TimeUnit.MINUTES.toMillis(streamingJobExecutionIdCheckInterval));
                }
            }
        });
        thread.setDaemon(true);
        thread.start();
    }

    public RestSupport createRestSupport(KylinConfig kylinConfig) {
        return new RestSupport(kylinConfig);
    }

    @Generated
    public Set<String> getMetaResPathSet() {
        Object obj = this.metaResPathSet.get();
        if (obj == null) {
            synchronized (this.metaResPathSet) {
                obj = this.metaResPathSet.get();
                if (obj == null) {
                    Set<String> initMetaPathSet = initMetaPathSet();
                    obj = initMetaPathSet == null ? this.metaResPathSet : initMetaPathSet;
                    this.metaResPathSet.set(obj);
                }
            }
        }
        return (Set) (obj == this.metaResPathSet ? null : obj);
    }
}
