package org.apache.kylin.engine.spark.job;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.CubeUpdate;
import org.apache.kylin.cube.cuboid.CuboidModeEnum;
import org.apache.kylin.engine.mr.common.CubeStatsWriter;
import org.apache.kylin.engine.spark.NSparkCubingEngine;
import org.apache.kylin.engine.spark.application.SparkApplication;
import org.apache.kylin.engine.spark.builder.NBuildSourceInfo;
import org.apache.kylin.engine.spark.job.BuildLayoutWithUpdate;
import org.apache.kylin.engine.spark.metadata.SegmentInfo;
import org.apache.kylin.engine.spark.metadata.cube.ManagerHub;
import org.apache.kylin.engine.spark.metadata.cube.PathManager;
import org.apache.kylin.engine.spark.metadata.cube.model.ForestSpanningTree;
import org.apache.kylin.engine.spark.metadata.cube.model.LayoutEntity;
import org.apache.kylin.engine.spark.metadata.cube.model.SpanningTree;
import org.apache.kylin.engine.spark.utils.BuildUtils;
import org.apache.kylin.engine.spark.utils.JobMetrics;
import org.apache.kylin.engine.spark.utils.JobMetricsUtils;
import org.apache.kylin.engine.spark.utils.Metrics;
import org.apache.kylin.engine.spark.utils.QueryExecutionCache;
import org.apache.kylin.measure.hllc.HLLCounter;
import org.apache.kylin.shaded.com.google.common.base.Joiner;
import org.apache.kylin.shaded.com.google.common.base.Preconditions;
import org.apache.kylin.shaded.com.google.common.collect.Lists;
import org.apache.kylin.shaded.com.google.common.collect.Maps;
import org.apache.kylin.storage.StorageFactory;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.hive.utils.ResourceDetectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import scala.collection.JavaConversions;

/* loaded from: input_file:org/apache/kylin/engine/spark/job/OptimizeBuildJob.class */
public class OptimizeBuildJob extends SparkApplication {
    private BuildLayoutWithUpdate buildLayoutWithUpdate;
    private CubeManager cubeManager;
    private CubeInstance cubeInstance;
    private SegmentInfo optSegInfo;
    private SegmentInfo originalSegInfo;
    private CubeSegment optSeg;
    private CubeSegment originalSeg;
    private long baseCuboidId;
    private static final Logger logger = LoggerFactory.getLogger(OptimizeBuildJob.class);
    protected static String TEMP_DIR_SUFFIX = "_temp";
    private Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap();
    private Map<Long, Short> cuboidShardNum = Maps.newConcurrentMap();
    private Map<Long, Long> cuboidsRowCount = Maps.newConcurrentMap();
    private Configuration conf = HadoopUtil.getCurrentConfiguration();

    public static void main(String[] strArr) {
        new OptimizeBuildJob().execute(strArr);
    }

    @Override // org.apache.kylin.engine.spark.application.SparkApplication
    protected void doExecute() throws Exception {
        String param = getParam("segmentId");
        String param2 = getParam("cubeId");
        this.cubeManager = CubeManager.getInstance(this.config);
        this.cubeInstance = this.cubeManager.getCubeByUuid(param2);
        this.optSeg = this.cubeInstance.getSegmentById(param);
        this.originalSeg = this.cubeInstance.getOriginalSegmentToOptimize(this.optSeg);
        this.originalSegInfo = ManagerHub.getSegmentInfo(this.config, param2, this.originalSeg.getUuid());
        calculateCuboidFromBaseCuboid();
        buildCuboidFromParent(param2);
    }

    private void calculateCuboidFromBaseCuboid() throws IOException {
        logger.info("Start to calculate cuboid statistics for optimized segment");
        long currentTimeMillis = System.currentTimeMillis();
        this.baseCuboidId = this.cubeInstance.getCuboidScheduler().getBaseCuboidId();
        LayoutEntity layoutEntity = (LayoutEntity) this.originalSegInfo.getAllLayoutJava().stream().filter(layoutEntity2 -> {
            return layoutEntity2.getId() == this.baseCuboidId;
        }).iterator().next();
        Dataset<Row> from = ((NSparkCubingEngine.NSparkCubingStorage) StorageFactory.createEngineAdapter(layoutEntity, NSparkCubingEngine.NSparkCubingStorage.class)).getFrom(PathManager.getParquetStoragePath(this.config, this.cubeInstance.getName(), this.optSeg.getName(), this.optSeg.getStorageLocationIdentifier(), String.valueOf(layoutEntity.getId())), this.ss);
        HashMap hashMap = new HashMap();
        for (Tuple2<Object, AggInfo> tuple2 : CuboidStatisticsJob.statistics(from, this.originalSegInfo, getNewCuboidIds())) {
            hashMap.put((Long) tuple2._1, ((AggInfo) tuple2._2).cuboid().counter());
        }
        CubeStatsWriter.writeCuboidStatistics(this.conf, new Path((this.config.getJobTmpDir(this.project) + "/" + this.jobId) + "//cube_statistics/" + this.cubeInstance.getUuid() + "/" + this.optSeg.getUuid() + "/"), hashMap, 1, -1L);
        logger.info("Calculate cuboid statistics from base cuboid job takes {} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    private void buildCuboidFromParent(String str) throws IOException {
        logger.info("Start to build recommend cuboid for optimized segment");
        long currentTimeMillis = System.currentTimeMillis();
        this.optSegInfo = ManagerHub.getSegmentInfo(this.config, str, this.optSeg.getUuid(), CuboidModeEnum.RECOMMEND);
        this.buildLayoutWithUpdate = new BuildLayoutWithUpdate(this.config);
        this.infos.clearAddCuboids();
        try {
            SpanningTree forestSpanningTree = new ForestSpanningTree(JavaConversions.asJavaCollection(this.optSegInfo.toBuildLayouts()));
            logger.info("There are {} cuboids to be built in segment {}.", Integer.valueOf(this.optSegInfo.toBuildLayouts().size()), this.optSegInfo.name());
            for (LayoutEntity layoutEntity : JavaConversions.asJavaCollection(this.optSegInfo.toBuildLayouts())) {
                logger.debug("Cuboid {} has row keys: {}", Long.valueOf(layoutEntity.getId()), Joiner.on(", ").join(layoutEntity.getOrderedDimensions().keySet()));
            }
            this.optSegInfo.removeLayout(this.baseCuboidId);
            ParentSourceChooser parentSourceChooser = new ParentSourceChooser(forestSpanningTree, this.optSegInfo, this.optSeg, this.jobId, this.ss, this.config, false);
            parentSourceChooser.decideSources();
            Map<Long, NBuildSourceInfo> reuseSources = parentSourceChooser.reuseSources();
            this.infos.clearCuboidsNumPerLayer(this.optSegInfo.id());
            if (!reuseSources.isEmpty()) {
                build(reuseSources.values(), this.optSegInfo, forestSpanningTree);
            }
            this.infos.recordSpanningTree(this.optSegInfo.id(), forestSpanningTree);
            logger.info("Updating segment info");
            updateOptimizeSegmentInfo();
            logger.info("Building job takes {} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
        } catch (Throwable th) {
            logger.info("Building job takes {} ms", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
            throw th;
        }
    }

    private long[] getNewCuboidIds() {
        Set cuboidsByMode = this.cubeInstance.getCuboidsByMode(CuboidModeEnum.RECOMMEND_MISSING);
        Preconditions.checkNotNull(cuboidsByMode, "The recommend cuboid map could not be null");
        long[] jArr = new long[cuboidsByMode.size()];
        int i = 0;
        Iterator it = cuboidsByMode.iterator();
        while (it.hasNext()) {
            int i2 = i;
            i++;
            jArr[i2] = ((Long) it.next()).longValue();
        }
        return jArr;
    }

    /* JADX WARN: Multi-variable type inference failed */
    protected void updateOptimizeSegmentInfo() throws IOException {
        CubeInstance latestCopyForWrite = this.optSeg.getCubeInstance().latestCopyForWrite();
        ArrayList newArrayList = Lists.newArrayList();
        CubeUpdate cubeUpdate = new CubeUpdate(latestCopyForWrite);
        this.optSeg.setSizeKB(this.optSegInfo.getAllLayoutSize() / 1024);
        this.optSeg.setLastBuildTime(System.currentTimeMillis());
        this.optSeg.setLastBuildJobID(this.jobId);
        this.optSeg.setInputRecords(this.originalSeg.getInputRecords());
        Map cuboidShardNums = this.originalSeg.getCuboidShardNums();
        for (Long l : latestCopyForWrite.getCuboidsByMode(CuboidModeEnum.RECOMMEND_EXISTING)) {
            this.cuboidShardNum.putIfAbsent(l, cuboidShardNums.get(l));
        }
        this.optSeg.setCuboidShardNums(this.cuboidShardNum);
        this.optSeg.setInputRecordsSize(this.originalSeg.getInputRecordsSize());
        Map additionalInfo = this.optSeg.getAdditionalInfo();
        additionalInfo.put("storageType", "4");
        this.optSeg.setAdditionalInfo(additionalInfo);
        newArrayList.add(this.optSeg);
        cubeUpdate.setToUpdateSegs((CubeSegment[]) newArrayList.toArray(new CubeSegment[0]));
        this.cubeManager.updateCube(cubeUpdate, true);
    }

    private void build(Collection<NBuildSourceInfo> collection, SegmentInfo segmentInfo, SpanningTree spanningTree) {
        List<NBuildSourceInfo> buildLayer = buildLayer(collection, segmentInfo, spanningTree);
        LinkedList linkedList = new LinkedList();
        if (!buildLayer.isEmpty()) {
            linkedList.offer(buildLayer);
        }
        while (!linkedList.isEmpty()) {
            List<NBuildSourceInfo> buildLayer2 = buildLayer((List) linkedList.poll(), segmentInfo, spanningTree);
            if (!buildLayer2.isEmpty()) {
                linkedList.offer(buildLayer2);
            }
        }
    }

    private List<NBuildSourceInfo> buildLayer(Collection<NBuildSourceInfo> collection, final SegmentInfo segmentInfo, final SpanningTree spanningTree) {
        int i = 0;
        ArrayList arrayList = new ArrayList();
        for (final NBuildSourceInfo nBuildSourceInfo : collection) {
            Collection<LayoutEntity> toBuildCuboids = nBuildSourceInfo.getToBuildCuboids();
            this.infos.recordParent2Children(nBuildSourceInfo.getLayout(), (List) toBuildCuboids.stream().map((v0) -> {
                return v0.getId();
            }).collect(Collectors.toList()));
            i += toBuildCuboids.size();
            Preconditions.checkState(!toBuildCuboids.isEmpty(), "To be built cuboids is empty.");
            final Dataset<Row> parentDS = nBuildSourceInfo.getParentDS();
            if (nBuildSourceInfo.getLayoutId() == ParentSourceChooser.FLAT_TABLE_FLAG()) {
                this.cuboidsRowCount.putIfAbsent(Long.valueOf(nBuildSourceInfo.getLayoutId()), Long.valueOf(parentDS.count()));
            }
            for (final LayoutEntity layoutEntity : toBuildCuboids) {
                Preconditions.checkNotNull(parentDS, "Parent dataset is null when building.");
                if (this.cubeInstance.getCuboidsByMode(CuboidModeEnum.RECOMMEND_EXISTING).contains(Long.valueOf(layoutEntity.getId()))) {
                    try {
                        updateExistingLayout(layoutEntity, nBuildSourceInfo.getLayoutId());
                    } catch (IOException e) {
                        logger.error("Failed to update existing cuboid info: {}", Long.valueOf(layoutEntity.getId()));
                    }
                } else {
                    this.infos.recordAddCuboids(layoutEntity.getId());
                    this.buildLayoutWithUpdate.submit(new BuildLayoutWithUpdate.JobEntity() { // from class: org.apache.kylin.engine.spark.job.OptimizeBuildJob.1
                        @Override // org.apache.kylin.engine.spark.job.BuildLayoutWithUpdate.JobEntity
                        public String getName() {
                            return "build-cuboid-" + layoutEntity.getId();
                        }

                        @Override // org.apache.kylin.engine.spark.job.BuildLayoutWithUpdate.JobEntity
                        public LayoutEntity build() throws IOException {
                            return OptimizeBuildJob.this.buildCuboid(segmentInfo, layoutEntity, parentDS, spanningTree, nBuildSourceInfo.getLayoutId());
                        }

                        @Override // org.apache.kylin.engine.spark.job.BuildLayoutWithUpdate.JobEntity
                        public NBuildSourceInfo getBuildSourceInfo() {
                            return null;
                        }
                    }, this.config);
                }
                arrayList.add(layoutEntity);
            }
        }
        this.infos.recordCuboidsNumPerLayer(segmentInfo.id(), i);
        this.buildLayoutWithUpdate.updateLayout(segmentInfo, this.config);
        spanningTree.decideTheNextLayer(arrayList, segmentInfo);
        return constructTheNextLayerBuildInfos(spanningTree, segmentInfo, arrayList);
    }

    private List<NBuildSourceInfo> constructTheNextLayerBuildInfos(SpanningTree spanningTree, SegmentInfo segmentInfo, Collection<LayoutEntity> collection) {
        ArrayList arrayList = new ArrayList();
        for (LayoutEntity layoutEntity : collection) {
            Collection<LayoutEntity> childrenByIndexPlan = spanningTree.getChildrenByIndexPlan(layoutEntity);
            if (!childrenByIndexPlan.isEmpty()) {
                NBuildSourceInfo nBuildSourceInfo = new NBuildSourceInfo();
                nBuildSourceInfo.setSparkSession(this.ss);
                String parquetStoragePath = PathManager.getParquetStoragePath(this.config, getParam("cubeName"), segmentInfo.name(), segmentInfo.identifier(), String.valueOf(layoutEntity.getId()));
                nBuildSourceInfo.setLayoutId(layoutEntity.getId());
                nBuildSourceInfo.setParentStoragePath(parquetStoragePath);
                nBuildSourceInfo.setToBuildCuboids(childrenByIndexPlan);
                arrayList.add(nBuildSourceInfo);
            }
        }
        return arrayList;
    }

    @Override // org.apache.kylin.engine.spark.application.SparkApplication
    protected String calculateRequiredCores() throws Exception {
        if (!this.config.getSparkEngineTaskImpactInstanceEnabled().booleanValue()) {
            return this.config.getSparkEngineRequiredTotalCores();
        }
        String maxLeafTasksNums = maxLeafTasksNums(this.config.getJobTmpShareDir(this.project, this.jobId));
        logger.info("The maximum number of tasks required to run the job is {}", maxLeafTasksNums);
        int intValue = Double.valueOf(maxLeafTasksNums).intValue() / this.config.getSparkEngineTaskCoreFactor();
        logger.info("require cores: " + intValue);
        return String.valueOf(intValue);
    }

    private String maxLeafTasksNums(Path path) throws IOException {
        return ResourceDetectUtils.selectMaxValueInFiles(HadoopUtil.getWorkingFileSystem().listStatus(path, path2 -> {
            return path2.toString().endsWith(ResourceDetectUtils.cubingDetectItemFileSuffix());
        }));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public LayoutEntity buildCuboid(SegmentInfo segmentInfo, LayoutEntity layoutEntity, Dataset<Row> dataset, SpanningTree spanningTree, long j) throws IOException {
        String valueOf = String.valueOf(j);
        if (j == ParentSourceChooser.FLAT_TABLE_FLAG()) {
            valueOf = "flat table";
        }
        logger.info("Build index:{}, in segment:{}", Long.valueOf(layoutEntity.getId()), segmentInfo.id());
        Set keySet = layoutEntity.getOrderedDimensions().keySet();
        if (layoutEntity.isTableIndex()) {
            Dataset select = dataset.select(NSparkCubingUtil.getColumns(keySet));
            logger.info("Build layout:{}, in index:{}", Long.valueOf(layoutEntity.getId()), Long.valueOf(layoutEntity.getId()));
            this.ss.sparkContext().setJobDescription("build " + layoutEntity.getId() + " from parent " + valueOf);
            Set keySet2 = layoutEntity.getOrderedDimensions().keySet();
            saveAndUpdateLayout(select.select(NSparkCubingUtil.getColumns(keySet2)).sortWithinPartitions(NSparkCubingUtil.getColumns(keySet2)), segmentInfo, layoutEntity, j);
        } else {
            Dataset<Row> agg = CuboidAggregator.agg(this.ss, dataset, keySet, layoutEntity.getOrderedMeasures(), spanningTree, false);
            logger.info("Build layout:{}, in index:{}", Long.valueOf(layoutEntity.getId()), Long.valueOf(layoutEntity.getId()));
            this.ss.sparkContext().setJobDescription("build " + layoutEntity.getId() + " from parent " + valueOf);
            Set keySet3 = layoutEntity.getOrderedDimensions().keySet();
            saveAndUpdateLayout(agg.select(NSparkCubingUtil.getColumns(keySet3, layoutEntity.getOrderedMeasures().keySet())).sortWithinPartitions(NSparkCubingUtil.getColumns(keySet3)), segmentInfo, layoutEntity, j);
        }
        this.ss.sparkContext().setJobDescription((String) null);
        logger.info("Finished Build index :{}, in segment:{}", Long.valueOf(layoutEntity.getId()), segmentInfo.id());
        return layoutEntity;
    }

    private void saveAndUpdateLayout(Dataset<Row> dataset, SegmentInfo segmentInfo, LayoutEntity layoutEntity, long j) throws IOException {
        long id = layoutEntity.getId();
        String uuid = UUID.randomUUID().toString();
        this.ss.sparkContext().setLocalProperty(QueryExecutionCache.N_EXECUTION_ID_KEY(), uuid);
        NSparkCubingEngine.NSparkCubingStorage nSparkCubingStorage = (NSparkCubingEngine.NSparkCubingStorage) StorageFactory.createEngineAdapter(layoutEntity, NSparkCubingEngine.NSparkCubingStorage.class);
        String parquetStoragePath = PathManager.getParquetStoragePath(this.config, getParam("cubeName"), segmentInfo.name(), segmentInfo.identifier(), String.valueOf(id));
        String str = parquetStoragePath + TEMP_DIR_SUFFIX;
        logger.info("Cuboids are saved to temp path : " + str);
        nSparkCubingStorage.saveTo(str, dataset, this.ss);
        JobMetrics collectMetrics = JobMetricsUtils.collectMetrics(uuid);
        long metrics = collectMetrics.getMetrics(Metrics.CUBOID_ROWS_CNT());
        if (metrics == -1) {
            this.infos.recordAbnormalLayouts(id, "'Job metrics seems null, use count() to collect cuboid rows.'");
            logger.debug("Can not get cuboid row cnt, use count() to collect cuboid rows.");
            long count = dataset.count();
            layoutEntity.setRows(count);
            this.cuboidsRowCount.putIfAbsent(Long.valueOf(id), Long.valueOf(count));
            layoutEntity.setSourceRows(this.cuboidsRowCount.get(Long.valueOf(j)).longValue());
        } else {
            layoutEntity.setRows(metrics);
            layoutEntity.setSourceRows(collectMetrics.getMetrics(Metrics.SOURCE_ROWS_CNT()));
        }
        int repartitionIfNeed = BuildUtils.repartitionIfNeed(layoutEntity, nSparkCubingStorage, parquetStoragePath, str, this.cubeInstance.getConfig(), this.ss);
        layoutEntity.setShardNum(repartitionIfNeed);
        this.cuboidShardNum.put(Long.valueOf(id), Short.valueOf((short) repartitionIfNeed));
        this.ss.sparkContext().setLocalProperty(QueryExecutionCache.N_EXECUTION_ID_KEY(), (String) null);
        QueryExecutionCache.removeQueryExecution(uuid);
        BuildUtils.fillCuboidInfo(layoutEntity, parquetStoragePath);
    }

    private void updateExistingLayout(LayoutEntity layoutEntity, long j) throws IOException {
        FileSystem workingFileSystem = HadoopUtil.getWorkingFileSystem();
        long id = layoutEntity.getId();
        String parquetStoragePath = PathManager.getParquetStoragePath(this.config, this.cubeInstance.getName(), this.optSegInfo.name(), this.optSegInfo.identifier(), String.valueOf(id));
        Dataset<Row> from = ((NSparkCubingEngine.NSparkCubingStorage) StorageFactory.createEngineAdapter(layoutEntity, NSparkCubingEngine.NSparkCubingStorage.class)).getFrom(parquetStoragePath, this.ss);
        logger.debug("Existing cuboid, use count() to collect cuboid rows.");
        long count = from.count();
        ContentSummary contentSummary = HadoopUtil.getContentSummary(workingFileSystem, new Path(parquetStoragePath));
        layoutEntity.setRows(count);
        layoutEntity.setFileCount(contentSummary.getFileCount());
        layoutEntity.setByteSize(contentSummary.getLength());
        this.cuboidsRowCount.putIfAbsent(Long.valueOf(id), Long.valueOf(count));
        layoutEntity.setSourceRows(this.cuboidsRowCount.get(Long.valueOf(j)).longValue());
        layoutEntity.setShardNum(((Short) this.originalSeg.getCuboidShardNums().get(Long.valueOf(id))).shortValue());
        this.optSegInfo.updateLayout(layoutEntity);
    }

    @Override // org.apache.kylin.engine.spark.application.SparkApplication
    protected String generateInfo() {
        return LogJobInfoUtils.dfOptimizeJobInfo();
    }
}
