package org.apache.kylin.streaming.jobs;

import java.util.Collections;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.common.exception.ServerErrorCode;
import org.apache.kylin.job.execution.JobTypeEnum;
import org.apache.kylin.metadata.cube.model.NDataSegment;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.cube.model.NDataflowUpdate;
import org.apache.kylin.metadata.cube.utils.StreamingUtils;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
import org.apache.kylin.streaming.common.MergeJobEntry;
import org.apache.kylin.streaming.request.StreamingSegmentRequest;
import org.apache.kylin.streaming.rest.RestSupport;
import org.apache.kylin.streaming.util.JobExecutionIdHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kylin/streaming/jobs/SyncMerger.class */
public class SyncMerger {
    private static final Logger logger = LoggerFactory.getLogger(SyncMerger.class);
    private MergeJobEntry mergeJobEntry;

    public SyncMerger(MergeJobEntry mergeJobEntry) {
        this.mergeJobEntry = mergeJobEntry;
    }

    public void run(StreamingDFMergeJob streamingDFMergeJob) {
        logger.info("start merge streaming segment");
        logger.info(this.mergeJobEntry.toString());
        long currentTimeMillis = System.currentTimeMillis();
        try {
            streamingDFMergeJob.streamingMergeSegment(this.mergeJobEntry);
            logger.info("merge segment cost {}", Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
            logger.info("delete merged segment and change the status");
            this.mergeJobEntry.globalMergeTime().set(System.currentTimeMillis() - currentTimeMillis);
            KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
            if (instanceFromEnv.isUTEnv()) {
                EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
                    NDataflowManager nDataflowManager = NDataflowManager.getInstance(KylinConfig.getInstanceFromEnv(), this.mergeJobEntry.project());
                    NDataSegment segment = nDataflowManager.getDataflow(this.mergeJobEntry.dataflowId()).copy().getSegment(this.mergeJobEntry.afterMergeSegment().getId());
                    segment.setStatus(SegmentStatusEnum.READY);
                    segment.setSourceCount(this.mergeJobEntry.afterMergeSegmentSourceCount());
                    NDataflowUpdate nDataflowUpdate = new NDataflowUpdate(this.mergeJobEntry.dataflowId());
                    nDataflowUpdate.setToUpdateSegs(new NDataSegment[]{segment});
                    nDataflowUpdate.setToRemoveSegs((NDataSegment[]) this.mergeJobEntry.unMergedSegments().toArray(new NDataSegment[0]));
                    nDataflowManager.updateDataflow(nDataflowUpdate);
                    return 0;
                }, this.mergeJobEntry.project());
            } else {
                StreamingSegmentRequest streamingSegmentRequest = new StreamingSegmentRequest(this.mergeJobEntry.project(), this.mergeJobEntry.dataflowId(), Long.valueOf(this.mergeJobEntry.afterMergeSegmentSourceCount()));
                streamingSegmentRequest.setRemoveSegment(this.mergeJobEntry.unMergedSegments());
                streamingSegmentRequest.setNewSegId(this.mergeJobEntry.afterMergeSegment().getId());
                streamingSegmentRequest.setJobType(JobTypeEnum.STREAMING_MERGE.name());
                streamingSegmentRequest.setJobExecutionId(JobExecutionIdHolder.getJobExecutionId(StreamingUtils.getJobId(this.mergeJobEntry.dataflowId(), streamingSegmentRequest.getJobType())).intValue());
                RestSupport restSupport = new RestSupport(instanceFromEnv);
                Throwable th = null;
                try {
                    try {
                        restSupport.execute(restSupport.createHttpPut("/streaming_jobs/dataflow/segment"), streamingSegmentRequest);
                        if (restSupport != null) {
                            if (0 != 0) {
                                try {
                                    restSupport.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                restSupport.close();
                            }
                        }
                        StreamingUtils.replayAuditlog();
                    } finally {
                    }
                } finally {
                }
            }
        } catch (Exception e) {
            logger.info("merge failed reason: {} stackTrace is: {}", e.toString(), e.getStackTrace());
            KylinConfig instanceFromEnv2 = KylinConfig.getInstanceFromEnv();
            if (!instanceFromEnv2.isUTEnv()) {
                StreamingSegmentRequest streamingSegmentRequest2 = new StreamingSegmentRequest(this.mergeJobEntry.project(), this.mergeJobEntry.dataflowId());
                streamingSegmentRequest2.setRemoveSegment(Collections.singletonList(this.mergeJobEntry.afterMergeSegment()));
                streamingSegmentRequest2.setJobType(JobTypeEnum.STREAMING_MERGE.name());
                streamingSegmentRequest2.setJobExecutionId(JobExecutionIdHolder.getJobExecutionId(StreamingUtils.getJobId(this.mergeJobEntry.dataflowId(), streamingSegmentRequest2.getJobType())).intValue());
                RestSupport restSupport2 = new RestSupport(instanceFromEnv2);
                Throwable th3 = null;
                try {
                    try {
                        restSupport2.execute(restSupport2.createHttpPost("/streaming_jobs/dataflow/segment/deletion"), streamingSegmentRequest2);
                        if (restSupport2 != null) {
                            if (0 != 0) {
                                try {
                                    restSupport2.close();
                                } catch (Throwable th4) {
                                    th3.addSuppressed(th4);
                                }
                            } else {
                                restSupport2.close();
                            }
                        }
                        StreamingUtils.replayAuditlog();
                    } finally {
                    }
                } catch (Throwable th5) {
                    if (restSupport2 != null) {
                        if (th3 != null) {
                            try {
                                restSupport2.close();
                            } catch (Throwable th6) {
                                th3.addSuppressed(th6);
                            }
                        } else {
                            restSupport2.close();
                        }
                    }
                    throw th5;
                }
            }
            throw new KylinException(ServerErrorCode.SEGMENT_MERGE_FAILURE, this.mergeJobEntry.afterMergeSegment().getId());
        }
    }
}
