package org.apache.kylin.streaming.jobs.scheduler;

import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Generated;
import org.apache.kylin.cluster.ClusterManagerFactory;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.guava30.shaded.common.collect.Maps;
import org.apache.kylin.job.constant.JobStatusEnum;
import org.apache.kylin.metadata.cube.utils.StreamingUtils;
import org.apache.kylin.metadata.project.NProjectManager;
import org.apache.kylin.streaming.manager.StreamingJobManager;
import org.apache.kylin.streaming.metadata.StreamingJobMeta;
import org.apache.kylin.streaming.util.JobKiller;
import org.apache.kylin.streaming.util.MetaInfoUpdater;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kylin/streaming/jobs/scheduler/StreamingJobStatusWatcher.class */
public class StreamingJobStatusWatcher {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(StreamingJobStatusWatcher.class);
    private static List<JobStatusEnum> STATUS_LIST = Arrays.asList(JobStatusEnum.ERROR, JobStatusEnum.RUNNING, JobStatusEnum.STARTING, JobStatusEnum.STOPPING);
    private static int WATCH_INTERVAL = 5;
    private static int JOB_KEEP_TIMEOUT = 30;
    private Map<String, AtomicInteger> startingJobMap = Maps.newHashMap();
    private Map<String, AtomicInteger> stoppingJobMap = Maps.newHashMap();
    private Map<String, AtomicInteger> jobMap = Maps.newHashMap();
    private Map<String, Long> killedJobMap = Maps.newHashMap();
    private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
    private boolean init = false;

    public synchronized void schedule() {
        if (this.init) {
            return;
        }
        KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
        if (StreamingUtils.isJobOnCluster(instanceFromEnv) && "true".equals(instanceFromEnv.getStreamingJobStatusWatchEnabled())) {
            this.scheduledExecutorService.scheduleWithFixedDelay(() -> {
                execute(getRunningJobs());
            }, WATCH_INTERVAL, WATCH_INTERVAL, TimeUnit.MINUTES);
        }
        this.init = true;
    }

    private List<String> getRunningJobs() {
        return ClusterManagerFactory.create(KylinConfig.getInstanceFromEnv()).getRunningJobs(Collections.emptySet());
    }

    public synchronized void execute(List<String> list) {
        KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
        NProjectManager.getInstance(instanceFromEnv).listAllProjects().stream().forEach(projectInstance -> {
            for (StreamingJobMeta streamingJobMeta : StreamingJobManager.getInstance(instanceFromEnv, projectInstance.getName()).listAllStreamingJobMeta()) {
                String jobId = StreamingUtils.getJobId(streamingJobMeta.getModelId(), streamingJobMeta.getJobType().name());
                if (this.killedJobMap.containsKey(jobId)) {
                    if (System.currentTimeMillis() - this.killedJobMap.get(jobId).longValue() > JOB_KEEP_TIMEOUT * 60 * 1000) {
                        this.jobMap.remove(jobId);
                        this.killedJobMap.remove(jobId);
                    }
                } else if (STATUS_LIST.contains(streamingJobMeta.getCurrentStatus())) {
                    if (!list.contains(jobId)) {
                        processMissingJobsFromYarn(streamingJobMeta, jobId);
                    } else if (!this.jobMap.containsKey(jobId)) {
                        this.jobMap.put(jobId, new AtomicInteger(0));
                    } else if (this.jobMap.get(jobId).get() != 0) {
                        this.jobMap.get(jobId).set(0);
                    }
                }
            }
        });
    }

    private void processMissingJobsFromYarn(StreamingJobMeta streamingJobMeta, String str) {
        String project = streamingJobMeta.getProject();
        if (this.jobMap.containsKey(str)) {
            killStreamingDriverProcess(str, project, streamingJobMeta);
            return;
        }
        if (streamingJobMeta.getCurrentStatus() == JobStatusEnum.STARTING) {
            moveJobId(this.startingJobMap, str);
            return;
        }
        if (streamingJobMeta.getCurrentStatus() == JobStatusEnum.STOPPING) {
            moveJobId(this.stoppingJobMap, str);
            return;
        }
        if (streamingJobMeta.getCurrentStatus() == JobStatusEnum.RUNNING) {
            this.jobMap.put(str, new AtomicInteger(0));
            return;
        }
        String lastUpdateTime = streamingJobMeta.getLastUpdateTime();
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.getDefault(Locale.Category.FORMAT));
        if (lastUpdateTime != null) {
            try {
                if ((System.currentTimeMillis() - simpleDateFormat.parse(lastUpdateTime).getTime()) / 60000 <= JOB_KEEP_TIMEOUT) {
                    this.jobMap.put(str, new AtomicInteger(0));
                }
            } catch (Exception e) {
                log.error(e.getMessage(), e);
            }
        }
    }

    private void killStreamingDriverProcess(String str, String str2, StreamingJobMeta streamingJobMeta) {
        AtomicInteger atomicInteger = this.jobMap.get(str);
        if (atomicInteger.get() < 3) {
            atomicInteger.getAndIncrement();
            return;
        }
        log.info("Begin to find & kill streaming job:" + str);
        log.info(str + " statusCode=" + JobKiller.killProcess(streamingJobMeta));
        if (streamingJobMeta.getCurrentStatus() != JobStatusEnum.ERROR) {
            MetaInfoUpdater.updateJobState(str2, str, JobStatusEnum.ERROR);
        }
        this.killedJobMap.put(str, Long.valueOf(System.currentTimeMillis()));
    }

    private void moveJobId(Map<String, AtomicInteger> map, String str) {
        if (!map.containsKey(str)) {
            map.put(str, new AtomicInteger(0));
            return;
        }
        map.get(str).getAndIncrement();
        if (map.get(str).get() >= 3) {
            this.jobMap.put(str, new AtomicInteger(0));
            map.remove(str);
        }
    }
}
