package org.apache.kylin.streaming.jobs;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.RandomUtil;
import org.apache.kylin.engine.spark.builder.NBuildSourceInfo;
import org.apache.kylin.engine.spark.job.BuildJobInfos;
import org.apache.kylin.engine.spark.job.DFBuildJob;
import org.apache.kylin.guava30.shaded.common.base.Preconditions;
import org.apache.kylin.guava30.shaded.common.collect.Sets;
import org.apache.kylin.job.execution.JobTypeEnum;
import org.apache.kylin.metadata.cube.cuboid.NSpanningTree;
import org.apache.kylin.metadata.cube.model.IndexEntity;
import org.apache.kylin.metadata.cube.model.LayoutEntity;
import org.apache.kylin.metadata.cube.model.NDataLayout;
import org.apache.kylin.metadata.cube.model.NDataSegment;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.cube.model.NDataflowUpdate;
import org.apache.kylin.metadata.cube.utils.StreamingUtils;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.streaming.common.BuildJobEntry;
import org.apache.kylin.streaming.metadata.BuildLayoutWithRestUpdate;
import org.apache.kylin.streaming.request.StreamingSegmentRequest;
import org.apache.kylin.streaming.rest.RestSupport;
import org.apache.kylin.streaming.util.JobExecutionIdHolder;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

/* loaded from: input_file:org/apache/kylin/streaming/jobs/StreamingDFBuildJob.class */
public class StreamingDFBuildJob extends DFBuildJob {
    private Map<Long, Dataset<Row>> cuboidDatasetMap;

    public StreamingDFBuildJob(String str) {
        this.buildLayoutWithUpdate = new BuildLayoutWithRestUpdate(JobTypeEnum.STREAMING_BUILD);
        this.config = KylinConfig.getInstanceFromEnv();
        this.dfMgr = NDataflowManager.getInstance(this.config, str);
        this.project = str;
    }

    public void streamBuild(BuildJobEntry buildJobEntry) throws IOException {
        if (this.ss == null) {
            this.ss = buildJobEntry.spark();
            this.ss.sparkContext().setLocalProperty("spark.sql.execution.id", (String) null);
        }
        this.jobId = RandomUtil.randomUUIDStr();
        if (this.infos == null) {
            this.infos = new BuildJobInfos();
        }
        if (this.cuboidDatasetMap == null) {
            this.cuboidDatasetMap = new ConcurrentHashMap();
        }
        setParam("dataflowId", buildJobEntry.dataflowId());
        Preconditions.checkState(buildJobEntry.toBuildTree().getRootIndexEntities().size() != 0, "streaming mast have one root index");
        NBuildSourceInfo nBuildSourceInfo = new NBuildSourceInfo();
        nBuildSourceInfo.setFlattableDS(buildJobEntry.streamingFlatDS());
        nBuildSourceInfo.setSparkSession(this.ss);
        nBuildSourceInfo.setToBuildCuboids(buildJobEntry.toBuildTree().getRootIndexEntities());
        build(Sets.newHashSet(new NBuildSourceInfo[]{nBuildSourceInfo}), buildJobEntry.batchSegment().getId(), buildJobEntry.toBuildTree());
        logger.info("start update segment");
        if (this.config.isUTEnv()) {
            EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
                NDataflowManager nDataflowManager = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), this.project);
                NDataSegment segment = nDataflowManager.getDataflow(buildJobEntry.dataflowId()).copy().getSegment(buildJobEntry.batchSegment().getId());
                segment.setStatus(SegmentStatusEnum.READY);
                segment.setSourceCount(buildJobEntry.flatTableCount());
                NDataflowUpdate nDataflowUpdate = new NDataflowUpdate(buildJobEntry.dataflowId());
                nDataflowUpdate.setToUpdateSegs(new NDataSegment[]{segment});
                nDataflowUpdate.setStatus(RealizationStatusEnum.ONLINE);
                nDataflowManager.updateDataflow(nDataflowUpdate);
                return 0;
            }, this.project);
        } else {
            updateSegment(buildJobEntry);
        }
        this.infos.clear();
        this.cuboidDatasetMap.clear();
    }

    public void updateSegment(BuildJobEntry buildJobEntry) {
        StreamingSegmentRequest streamingSegmentRequest = new StreamingSegmentRequest(this.project, buildJobEntry.dataflowId(), Long.valueOf(buildJobEntry.flatTableCount()));
        streamingSegmentRequest.setNewSegId(buildJobEntry.batchSegment().getId());
        streamingSegmentRequest.setStatus(RealizationStatusEnum.ONLINE.name());
        streamingSegmentRequest.setJobType(JobTypeEnum.STREAMING_BUILD.name());
        streamingSegmentRequest.setJobExecutionId(JobExecutionIdHolder.getJobExecutionId(StreamingUtils.getJobId(buildJobEntry.dataflowId(), streamingSegmentRequest.getJobType())).intValue());
        RestSupport createRestSupport = createRestSupport();
        Throwable th = null;
        try {
            try {
                createRestSupport.execute(createRestSupport.createHttpPut("/streaming_jobs/dataflow/segment"), streamingSegmentRequest);
                if (createRestSupport != null) {
                    if (0 != 0) {
                        try {
                            createRestSupport.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createRestSupport.close();
                    }
                }
                StreamingUtils.replayAuditlog();
            } finally {
            }
        } catch (Throwable th3) {
            if (createRestSupport != null) {
                if (th != null) {
                    try {
                        createRestSupport.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createRestSupport.close();
                }
            }
            throw th3;
        }
    }

    public RestSupport createRestSupport() {
        return new RestSupport(this.config);
    }

    protected List<NBuildSourceInfo> constructTheNextLayerBuildInfos(NSpanningTree nSpanningTree, NDataSegment nDataSegment, Collection<IndexEntity> collection) {
        ArrayList arrayList = new ArrayList();
        for (IndexEntity indexEntity : collection) {
            Collection childrenByIndexPlan = nSpanningTree.getChildrenByIndexPlan(indexEntity);
            if (!childrenByIndexPlan.isEmpty()) {
                NBuildSourceInfo nBuildSourceInfo = new NBuildSourceInfo();
                nBuildSourceInfo.setSparkSession(this.ss);
                LayoutEntity layoutEntity = (LayoutEntity) new ArrayList(nSpanningTree.getLayouts(indexEntity)).get(0);
                Dataset<Row> dataset = this.cuboidDatasetMap.get(Long.valueOf(layoutEntity.getId()));
                nBuildSourceInfo.setLayoutId(layoutEntity.getId());
                nBuildSourceInfo.setToBuildCuboids(childrenByIndexPlan);
                nBuildSourceInfo.setFlattableDS(dataset);
                arrayList.add(nBuildSourceInfo);
            }
        }
        return arrayList;
    }

    protected NDataLayout saveAndUpdateLayout(Dataset<Row> dataset, NDataSegment nDataSegment, LayoutEntity layoutEntity) throws IOException {
        this.cuboidDatasetMap.put(Long.valueOf(layoutEntity.getId()), dataset);
        return super.saveAndUpdateLayout(dataset, nDataSegment, layoutEntity);
    }

    public NDataSegment getSegment(String str) {
        StreamingUtils.replayAuditlog();
        return super.getSegment(str);
    }

    public void shutdown() {
        if (this.buildLayoutWithUpdate != null) {
            this.buildLayoutWithUpdate.shutDown();
        }
    }
}
