package org.apache.kylin.streaming.jobs.scheduler;

import java.util.AbstractMap;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.common.exception.ServerErrorCode;
import org.apache.kylin.common.msg.MsgPicker;
import org.apache.kylin.common.persistence.transaction.UnitOfWork;
import org.apache.kylin.common.util.ExecutorServiceUtil;
import org.apache.kylin.guava30.shaded.common.base.Preconditions;
import org.apache.kylin.guava30.shaded.common.base.Predicate;
import org.apache.kylin.guava30.shaded.common.collect.Maps;
import org.apache.kylin.guava30.shaded.common.collect.Sets;
import org.apache.kylin.job.constant.JobStatusEnum;
import org.apache.kylin.job.execution.JobTypeEnum;
import org.apache.kylin.metadata.cube.model.NDataSegment;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.cube.model.NDataflowUpdate;
import org.apache.kylin.metadata.cube.utils.StreamingUtils;
import org.apache.kylin.metadata.model.NDataModelManager;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
import org.apache.kylin.streaming.constants.StreamingConstants;
import org.apache.kylin.streaming.jobs.thread.StreamingJobRunner;
import org.apache.kylin.streaming.manager.StreamingJobManager;
import org.apache.kylin.streaming.metadata.StreamingJobMeta;
import org.apache.kylin.streaming.util.JobKiller;
import org.apache.kylin.streaming.util.MetaInfoUpdater;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:org/apache/kylin/streaming/jobs/scheduler/StreamingScheduler.class */
public class StreamingScheduler {
    private String project;
    private ExecutorService jobPool;

    @Generated
    private static final Logger log = LoggerFactory.getLogger(StreamingScheduler.class);
    private static final Map<String, StreamingScheduler> INSTANCE_MAP = Maps.newConcurrentMap();
    private static final List<JobStatusEnum> STARTABLE_STATUS_LIST = Arrays.asList(JobStatusEnum.ERROR, JobStatusEnum.STOPPED, JobStatusEnum.NEW, JobStatusEnum.LAUNCHING_ERROR);
    private static StreamingJobStatusWatcher jobStatusUpdater = new StreamingJobStatusWatcher();
    private AtomicBoolean initialized = new AtomicBoolean(false);
    private AtomicBoolean hasStarted = new AtomicBoolean(false);
    private Map<String, StreamingJobRunner> runnerMap = Maps.newHashMap();
    private Map<String, AbstractMap.SimpleEntry<AtomicInteger, AtomicInteger>> retryMap = Maps.newHashMap();
    private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);

    public StreamingScheduler(String str) {
        Preconditions.checkNotNull(str);
        this.project = str;
        if (INSTANCE_MAP.containsKey(str)) {
            throw new IllegalStateException("StreamingScheduler for project " + str + " has been initiated. Use getInstance() instead.");
        }
        init();
    }

    public static synchronized StreamingScheduler getInstance(String str) {
        return INSTANCE_MAP.computeIfAbsent(str, StreamingScheduler::new);
    }

    public synchronized void init() {
        KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
        if (!instanceFromEnv.isJobNode()) {
            log.info("server mode: {}, no need to run job scheduler", instanceFromEnv.getServerMode());
            return;
        }
        if (!UnitOfWork.isAlreadyInTransaction()) {
            log.info("Initializing Job Engine ....");
        }
        if (this.initialized.compareAndSet(false, true)) {
            if (instanceFromEnv.streamingEnabled()) {
                int maxStreamingConcurrentJobLimit = instanceFromEnv.getMaxStreamingConcurrentJobLimit();
                this.jobPool = new ThreadPoolExecutor(maxStreamingConcurrentJobLimit, maxStreamingConcurrentJobLimit * 2, Long.MAX_VALUE, TimeUnit.DAYS, (BlockingQueue<Runnable>) new SynchronousQueue(), (ThreadFactory) new BasicThreadFactory.Builder().namingPattern("StreamingJobWorker(project:" + this.project + ")").uncaughtExceptionHandler((thread, th) -> {
                    log.error("Something wrong happened when building threadFactory of streaming.", th);
                    throw new IllegalStateException(th);
                }).build());
                log.debug("New StreamingScheduler created by project '{}': {}", this.project, Integer.valueOf(System.identityHashCode(this)));
                this.scheduledExecutorService.scheduleWithFixedDelay(this::retryJob, 5L, 1L, TimeUnit.MINUTES);
                jobStatusUpdater.schedule();
            }
            resumeJobs(instanceFromEnv);
            this.hasStarted.set(true);
        }
    }

    public synchronized void submitJob(String str, String str2, JobTypeEnum jobTypeEnum) {
        KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
        if (instanceFromEnv.streamingEnabled()) {
            String jobId = StreamingUtils.getJobId(str2, jobTypeEnum.name());
            StreamingJobMeta streamingJobByUuid = StreamingJobManager.getInstance(instanceFromEnv, str).getStreamingJobByUuid(jobId);
            checkJobStartStatus(streamingJobByUuid, jobId);
            JobKiller.killProcess(streamingJobByUuid);
            killYarnApplication(jobId, str2);
            Predicate predicate = nDataSegment -> {
                return (nDataSegment.getStatus() == SegmentStatusEnum.NEW || nDataSegment.getStorageBytesSize() == 0) && nDataSegment.getAdditionalInfo() != null;
            };
            if (JobTypeEnum.STREAMING_BUILD == jobTypeEnum) {
                deleteBrokenSegment(str, str2, nDataSegment2 -> {
                    return predicate.apply(nDataSegment2) && !nDataSegment2.getAdditionalInfo().containsKey(StreamingConstants.FILE_LAYER);
                });
            } else if (JobTypeEnum.STREAMING_MERGE == jobTypeEnum) {
                deleteBrokenSegment(str, str2, nDataSegment3 -> {
                    return predicate.apply(nDataSegment3) && nDataSegment3.getAdditionalInfo().containsKey(StreamingConstants.FILE_LAYER);
                });
            }
            MetaInfoUpdater.updateJobState(str, jobId, JobStatusEnum.STARTING);
            StreamingJobRunner streamingJobRunner = new StreamingJobRunner(str, str2, jobTypeEnum);
            this.runnerMap.put(jobId, streamingJobRunner);
            this.jobPool.execute(streamingJobRunner);
            if (StreamingUtils.isJobOnCluster(instanceFromEnv)) {
                return;
            }
            MetaInfoUpdater.updateJobState(str, jobId, Sets.newHashSet(new JobStatusEnum[]{JobStatusEnum.RUNNING, JobStatusEnum.ERROR}), JobStatusEnum.RUNNING);
        }
    }

    public synchronized void stopJob(String str, JobTypeEnum jobTypeEnum) {
        String jobId = StreamingUtils.getJobId(str, jobTypeEnum.name());
        KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
        if (!applicationExisted(jobId)) {
            doStop(str, jobTypeEnum);
            if (StreamingUtils.isJobOnCluster(instanceFromEnv)) {
                MetaInfoUpdater.updateJobState(this.project, jobId, Sets.newHashSet(new JobStatusEnum[]{JobStatusEnum.STOPPED, JobStatusEnum.ERROR}), JobStatusEnum.ERROR);
                return;
            } else {
                MetaInfoUpdater.updateJobState(this.project, jobId, Sets.newHashSet(new JobStatusEnum[]{JobStatusEnum.STOPPED, JobStatusEnum.ERROR}), JobStatusEnum.STOPPED);
                return;
            }
        }
        JobStatusEnum currentStatus = StreamingJobManager.getInstance(instanceFromEnv, this.project).getStreamingJobByUuid(jobId).getCurrentStatus();
        if (JobStatusEnum.ERROR == currentStatus || JobStatusEnum.STOPPED == currentStatus) {
            return;
        }
        MetaInfoUpdater.updateJobState(this.project, jobId, JobStatusEnum.STOPPING);
        doStop(str, jobTypeEnum);
    }

    private void doStop(String str, JobTypeEnum jobTypeEnum) {
        String jobId = StreamingUtils.getJobId(str, jobTypeEnum.name());
        StreamingJobRunner streamingJobRunner = this.runnerMap.get(jobId);
        synchronized (this.runnerMap) {
            if (Objects.isNull(streamingJobRunner)) {
                streamingJobRunner = new StreamingJobRunner(this.project, str, jobTypeEnum);
                streamingJobRunner.init();
                this.runnerMap.put(jobId, streamingJobRunner);
            }
        }
        streamingJobRunner.stop();
    }

    public static synchronized void shutdownByProject(String str) {
        StreamingScheduler streamingScheduler = INSTANCE_MAP.get(str);
        if (streamingScheduler != null) {
            INSTANCE_MAP.remove(str);
            streamingScheduler.forceShutdown();
        }
    }

    public void forceShutdown() {
        log.info("Shutting down DefaultScheduler ....");
        releaseResources();
        ExecutorServiceUtil.forceShutdown(this.scheduledExecutorService);
        ExecutorServiceUtil.forceShutdown(this.jobPool);
    }

    private void releaseResources() {
        this.initialized.set(false);
        this.hasStarted.set(false);
        INSTANCE_MAP.remove(this.project);
    }

    private void checkJobStartStatus(StreamingJobMeta streamingJobMeta, String str) {
        if (!STARTABLE_STATUS_LIST.contains(streamingJobMeta.getCurrentStatus())) {
            throw new KylinException(ServerErrorCode.JOB_START_FAILURE, str);
        }
    }

    public void retryJob() {
        KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
        ((List) StreamingJobManager.getInstance(instanceFromEnv, this.project).listAllStreamingJobMeta().stream().filter(streamingJobMeta -> {
            return "true".equals(streamingJobMeta.getParams().getOrDefault(StreamingConstants.STREAMING_RETRY_ENABLE, instanceFromEnv.getStreamingJobRetryEnabled()));
        }).collect(Collectors.toList())).forEach(streamingJobMeta2 -> {
            JobStatusEnum currentStatus = streamingJobMeta2.getCurrentStatus();
            String modelId = streamingJobMeta2.getModelId();
            String jobId = StreamingUtils.getJobId(modelId, streamingJobMeta2.getJobType().name());
            if (this.retryMap.containsKey(jobId) || currentStatus == JobStatusEnum.ERROR) {
                if (!(!applicationExisted(jobId))) {
                    if (currentStatus == JobStatusEnum.RUNNING && this.retryMap.containsKey(jobId)) {
                        log.debug("remove jobId=" + jobId);
                        this.retryMap.remove(jobId);
                        return;
                    }
                    return;
                }
                if (!this.retryMap.containsKey(jobId)) {
                    if (currentStatus == JobStatusEnum.ERROR) {
                        this.retryMap.put(jobId, new AbstractMap.SimpleEntry<>(new AtomicInteger(instanceFromEnv.getStreamingJobRetryInterval()), new AtomicInteger(1)));
                        return;
                    }
                    return;
                }
                int i = this.retryMap.get(jobId).getKey().get();
                int i2 = this.retryMap.get(jobId).getValue().get();
                log.debug("targetCnt=" + i + ",currCnt=" + i2 + " jobId=" + jobId);
                if (i <= instanceFromEnv.getStreamingJobMaxRetryInterval()) {
                    this.retryMap.get(jobId).getValue().incrementAndGet();
                    if (i == i2 && currentStatus == JobStatusEnum.ERROR) {
                        log.info("begin to restart job:" + modelId + "_" + streamingJobMeta2.getJobType());
                        restartJob(instanceFromEnv, streamingJobMeta2, jobId, i);
                    }
                }
            }
        });
    }

    private void restartJob(KylinConfig kylinConfig, StreamingJobMeta streamingJobMeta, String str, int i) {
        try {
            submitJob(streamingJobMeta.getProject(), streamingJobMeta.getModelId(), streamingJobMeta.getJobType());
            if (i < kylinConfig.getStreamingJobMaxRetryInterval()) {
                this.retryMap.get(str).getKey().addAndGet(kylinConfig.getStreamingJobRetryInterval());
            }
            this.retryMap.get(str).getValue().set(0);
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }

    private void deleteBrokenSegment(String str, String str2, Predicate<NDataSegment> predicate) {
        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
            NDataflowManager nDataflowManager = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), str);
            List list = (List) nDataflowManager.getDataflow(str2).getSegments().stream().filter(nDataSegment -> {
                return predicate.apply(nDataSegment);
            }).collect(Collectors.toList());
            if (!list.isEmpty()) {
                NDataflowUpdate nDataflowUpdate = new NDataflowUpdate(str2);
                nDataflowUpdate.setToRemoveSegs((NDataSegment[]) list.toArray(new NDataSegment[0]));
                nDataflowManager.updateDataflow(nDataflowUpdate);
            }
            return 0;
        }, str);
    }

    public boolean applicationExisted(String str) {
        return JobKiller.applicationExisted(str);
    }

    public void killYarnApplication(String str, String str2) {
        if (applicationExisted(str)) {
            JobKiller.killApplication(str);
            if (applicationExisted(str)) {
                throw new KylinException(ServerErrorCode.REPEATED_START_ERROR, String.format(Locale.ROOT, MsgPicker.getMsg().getJobStartFailure(), NDataModelManager.getInstance(KylinConfig.getInstanceFromEnv(), this.project).getDataModelDesc(str2).getAlias()));
            }
        }
    }

    private void killJob(String str, JobTypeEnum jobTypeEnum) {
        killJob(str, jobTypeEnum, JobStatusEnum.ERROR);
    }

    public void killJob(String str, JobTypeEnum jobTypeEnum, JobStatusEnum jobStatusEnum) {
        killJob(StreamingUtils.getJobId(str, jobTypeEnum.name()), jobStatusEnum);
    }

    private void killJob(String str, JobStatusEnum jobStatusEnum) {
        JobKiller.killProcess(StreamingJobManager.getInstance(KylinConfig.getInstanceFromEnv(), this.project).getStreamingJobByUuid(str));
        JobKiller.killApplication(str);
        MetaInfoUpdater.updateJobState(this.project, str, jobStatusEnum);
    }

    private void resumeJobs(KylinConfig kylinConfig) {
        List<StreamingJobMeta> listAllStreamingJobMeta = StreamingJobManager.getInstance(kylinConfig, this.project).listAllStreamingJobMeta();
        if (CollectionUtils.isEmpty(listAllStreamingJobMeta)) {
            return;
        }
        ((List) listAllStreamingJobMeta.stream().filter(streamingJobMeta -> {
            return JobStatusEnum.STARTING == streamingJobMeta.getCurrentStatus() || JobStatusEnum.STOPPING == streamingJobMeta.getCurrentStatus() || JobStatusEnum.RUNNING == streamingJobMeta.getCurrentStatus() || JobStatusEnum.ERROR == streamingJobMeta.getCurrentStatus();
        }).collect(Collectors.toList())).forEach(streamingJobMeta2 -> {
            String modelId = streamingJobMeta2.getModelId();
            JobTypeEnum jobType = streamingJobMeta2.getJobType();
            if (streamingJobMeta2.isSkipListener()) {
                skipJobListener(this.project, StreamingUtils.getJobId(modelId, jobType.name()), false);
            }
            if (JobStatusEnum.RUNNING != streamingJobMeta2.getCurrentStatus() && JobStatusEnum.STARTING != streamingJobMeta2.getCurrentStatus()) {
                killJob(streamingJobMeta2.getModelId(), streamingJobMeta2.getJobType());
            } else {
                killJob(streamingJobMeta2.getModelId(), streamingJobMeta2.getJobType(), JobStatusEnum.STOPPED);
                submitJob(this.project, modelId, jobType);
            }
        });
    }

    public void skipJobListener(String str, String str2, boolean z) {
        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
            StreamingJobManager.getInstance(KylinConfig.getInstanceFromEnv(), str).updateStreamingJob(str2, streamingJobMeta -> {
                if (streamingJobMeta != null) {
                    streamingJobMeta.setSkipListener(z);
                }
            });
            return null;
        }, str);
    }

    @Generated
    public String getProject() {
        return this.project;
    }

    @Generated
    public AtomicBoolean getInitialized() {
        return this.initialized;
    }

    @Generated
    public AtomicBoolean getHasStarted() {
        return this.hasStarted;
    }
}
