package org.apache.kylin.rest.service;

import com.google.common.collect.Lists;
import io.kyligence.kap.secondstorage.SecondStorageUpdater;
import io.kyligence.kap.secondstorage.SecondStorageUtil;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.kylin.common.KapConfig;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.QueryTrace;
import org.apache.kylin.common.Singletons;
import org.apache.kylin.common.util.ExecutorServiceUtil;
import org.apache.kylin.common.util.NamedThreadFactory;
import org.apache.kylin.metadata.query.QueryHistoryInfo;
import org.apache.kylin.metadata.query.QueryMetrics;
import org.apache.kylin.metadata.query.RDBMSQueryHistoryDAO;
import org.apache.kylin.query.util.SparkJobTrace;
import org.apache.kylin.query.util.SparkJobTraceMetric;
import org.apache.kylin.rest.util.SpringContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kylin/rest/service/QueryHistoryScheduler.class */
public class QueryHistoryScheduler {
    private static final Logger logger = LoggerFactory.getLogger("query");
    protected BlockingQueue<QueryMetrics> queryMetricsQueue = new LinkedBlockingQueue(KylinConfig.getInstanceFromEnv().getQueryHistoryBufferSize());
    private ScheduledExecutorService writeQueryHistoryScheduler;
    private long sparkJobTraceTimeoutMs;
    private boolean isQuerySparkJobTraceEnabled;
    private boolean isSecondStorageQueryMetricCollect;

    /* loaded from: input_file:org/apache/kylin/rest/service/QueryHistoryScheduler$WriteQueryHistoryRunner.class */
    public class WriteQueryHistoryRunner implements Runnable {
        RDBMSQueryHistoryDAO queryHistoryDAO = RDBMSQueryHistoryDAO.getInstance();

        WriteQueryHistoryRunner() {
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r0v20, types: [java.util.List] */
        @Override // java.lang.Runnable
        public void run() {
            try {
                ArrayList newArrayList = Lists.newArrayList();
                QueryHistoryScheduler.this.queryMetricsQueue.drainTo(newArrayList);
                ArrayList arrayList = (!QueryHistoryScheduler.this.isQuerySparkJobTraceEnabled || newArrayList.size() <= 0) ? newArrayList : (List) newArrayList.stream().filter(queryMetrics -> {
                    String queryId = queryMetrics.getQueryId();
                    return QueryHistoryScheduler.this.isCollectedFinished(queryId, SparkJobTrace.getSparkJobTraceMetric(queryId), queryMetrics);
                }).collect(Collectors.toList());
                QueryHistoryScheduler.this.collectSecondStorageMetric(arrayList);
                this.queryHistoryDAO.insert(arrayList);
            } catch (Exception e) {
                QueryHistoryScheduler.logger.error("Error when write query history", e);
            }
        }
    }

    public QueryHistoryScheduler() {
        logger.debug("New NQueryHistoryScheduler created");
    }

    public static QueryHistoryScheduler getInstance() {
        return (QueryHistoryScheduler) Singletons.getInstance(QueryHistoryScheduler.class);
    }

    public void init() throws Exception {
        KapConfig instanceFromEnv = KapConfig.getInstanceFromEnv();
        this.sparkJobTraceTimeoutMs = instanceFromEnv.getSparkJobTraceTimeoutMs();
        this.isQuerySparkJobTraceEnabled = instanceFromEnv.isQuerySparkJobTraceEnabled();
        this.writeQueryHistoryScheduler = Executors.newScheduledThreadPool(1, new NamedThreadFactory("WriteQueryHistoryWorker"));
        this.writeQueryHistoryScheduler.scheduleWithFixedDelay(new WriteQueryHistoryRunner(), 1L, KylinConfig.getInstanceFromEnv().getQueryHistorySchedulerInterval(), TimeUnit.SECONDS);
        this.isSecondStorageQueryMetricCollect = KylinConfig.getInstanceFromEnv().getSecondStorageQueryMetricCollect();
    }

    public void offerQueryHistoryQueue(QueryMetrics queryMetrics) {
        if (this.queryMetricsQueue.offer(queryMetrics)) {
            return;
        }
        logger.info("queryMetricsQueue is full");
    }

    synchronized void shutdown() {
        logger.info("Shutting down NQueryHistoryScheduler ....");
        if (this.writeQueryHistoryScheduler != null) {
            ExecutorServiceUtil.forceShutdown(this.writeQueryHistoryScheduler);
        }
    }

    public boolean isCollectedFinished(String str, SparkJobTraceMetric sparkJobTraceMetric, QueryMetrics queryMetrics) {
        if (sparkJobTraceMetric == null) {
            if (System.currentTimeMillis() - (queryMetrics.getQueryTime() + queryMetrics.getQueryDuration()) > this.sparkJobTraceTimeoutMs) {
                logger.warn("QueryMetrics timeout lost spark job trace kylin.query.spark-job-trace-timeout-ms={} queryId:{}", Long.valueOf(this.sparkJobTraceTimeoutMs), str);
                return true;
            }
            offerQueryHistoryQueue(queryMetrics);
            return false;
        }
        List traces = queryMetrics.getQueryHistoryInfo().getTraces();
        AtomicLong atomicLong = new AtomicLong(0L);
        traces.forEach(queryTraceSpan -> {
            if ("PREPARE_AND_SUBMIT_JOB".equals(queryTraceSpan.getName())) {
                queryTraceSpan.setDuration(sparkJobTraceMetric.getPrepareAndSubmitJobMs());
            }
            atomicLong.addAndGet(queryTraceSpan.getDuration());
        });
        traces.add(new QueryHistoryInfo.QueryTraceSpan("WAIT_FOR_EXECUTION", (String) QueryTrace.SPAN_GROUPS.get("WAIT_FOR_EXECUTION"), sparkJobTraceMetric.getWaitForExecutionMs()));
        atomicLong.addAndGet(sparkJobTraceMetric.getWaitForExecutionMs());
        traces.add(new QueryHistoryInfo.QueryTraceSpan("EXECUTION", (String) QueryTrace.SPAN_GROUPS.get("EXECUTION"), sparkJobTraceMetric.getExecutionMs()));
        atomicLong.addAndGet(sparkJobTraceMetric.getExecutionMs());
        traces.add(new QueryHistoryInfo.QueryTraceSpan("FETCH_RESULT", (String) QueryTrace.SPAN_GROUPS.get("FETCH_RESULT"), queryMetrics.getQueryDuration() - atomicLong.get()));
        return true;
    }

    public void collectSecondStorageMetric(List<QueryMetrics> list) {
        if (this.isSecondStorageQueryMetricCollect && SecondStorageUtil.isGlobalEnable()) {
            SecondStorageUpdater secondStorageUpdater = (SecondStorageUpdater) SpringContext.getBean(SecondStorageUpdater.class);
            for (QueryMetrics queryMetrics : list) {
                try {
                    if (queryMetrics.isSecondStorage() && SecondStorageUtil.isProjectEnable(queryMetrics.getProjectName())) {
                        Map queryMetric = secondStorageUpdater.getQueryMetric(queryMetrics.getProjectName(), queryMetrics.getQueryId());
                        if (queryMetric.containsKey("totalScanBytes")) {
                            queryMetrics.setTotalScanBytes(((Long) queryMetric.get("totalScanBytes")).longValue());
                        }
                        if (queryMetric.containsKey("totalScanCount")) {
                            queryMetrics.setTotalScanCount(((Long) queryMetric.get("totalScanCount")).longValue());
                        }
                        if (queryMetric.containsKey("sourceResultCount")) {
                            queryMetrics.getQueryHistoryInfo().setSourceResultCount(((Long) queryMetric.get("sourceResultCount")).longValue());
                        }
                    }
                } catch (Exception e) {
                    logger.error("Get tired storage metric fail. query_id: {}, message: {}", queryMetrics.getQueryId(), e.getMessage());
                }
            }
        }
    }
}
