package org.apache.kylin.source.kafka;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
import org.apache.kylin.engine.mr.IMRInput;
import org.apache.kylin.engine.mr.JobBuilderSupport;
import org.apache.kylin.engine.mr.common.MapReduceExecutable;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.job.JoinedFlatTable;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
import org.apache.kylin.job.execution.ExecuteResult;
import org.apache.kylin.metadata.model.DataModelDesc;
import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
import org.apache.kylin.metadata.model.ISegment;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.source.hive.CreateFlatHiveTableStep;
import org.apache.kylin.source.hive.HiveMRInput;
import org.apache.kylin.source.kafka.hadoop.KafkaFlatTableJob;
import org.apache.kylin.source.kafka.job.MergeOffsetStep;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kylin/source/kafka/KafkaMRInput.class */
public class KafkaMRInput implements IMRInput {
    private static final Logger logger = LoggerFactory.getLogger(KafkaMRInput.class);
    private CubeSegment cubeSegment;

    /* loaded from: input_file:org/apache/kylin/source/kafka/KafkaMRInput$BatchCubingInputSide.class */
    public static class BatchCubingInputSide implements IMRInput.IMRBatchCubingInputSide {
        final CubeSegment seg;
        private CubeDesc cubeDesc;
        private KylinConfig config;
        protected IJoinedFlatTableDesc flatDesc;
        protected String hiveTableDatabase;
        private String cubeName;
        private List<String> intermediateTables = Lists.newArrayList();
        private List<String> intermediatePaths = Lists.newArrayList();
        final JobEngineConfig conf = new JobEngineConfig(KylinConfig.getInstanceFromEnv());

        public BatchCubingInputSide(CubeSegment cubeSegment, IJoinedFlatTableDesc iJoinedFlatTableDesc) {
            this.config = cubeSegment.getConfig();
            this.flatDesc = iJoinedFlatTableDesc;
            this.hiveTableDatabase = this.config.getHiveDatabaseForIntermediateTable();
            this.seg = cubeSegment;
            this.cubeDesc = cubeSegment.getCubeDesc();
            this.cubeName = cubeSegment.getCubeInstance().getName();
        }

        public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable defaultChainedExecutable) {
            boolean z = this.cubeDesc.getModel().getLookupTables().size() == 0;
            String jobWorkingDir = getJobWorkingDir(defaultChainedExecutable);
            if (z) {
                String str = jobWorkingDir + "/" + this.flatDesc.getTableName();
                defaultChainedExecutable.addTask(createSaveKafkaDataStep(defaultChainedExecutable.getId(), str));
                this.intermediatePaths.add(str);
            } else {
                String str2 = "kylin_intermediate_" + this.cubeName.toLowerCase() + "_" + this.seg.getUuid().replaceAll("-", "_") + "_fact";
                defaultChainedExecutable.addTask(createSaveKafkaDataStep(defaultChainedExecutable.getId(), jobWorkingDir + "/" + str2));
                defaultChainedExecutable.addTask(createFlatTable(str2, jobWorkingDir));
            }
        }

        private AbstractExecutable createFlatTable(final String str, String str2) {
            String generateHiveInitStatements = JoinedFlatTable.generateHiveInitStatements(this.hiveTableDatabase);
            IJoinedFlatTableDesc iJoinedFlatTableDesc = new IJoinedFlatTableDesc() { // from class: org.apache.kylin.source.kafka.KafkaMRInput.BatchCubingInputSide.1
                public String getTableName() {
                    return str;
                }

                public DataModelDesc getDataModel() {
                    return BatchCubingInputSide.this.cubeDesc.getModel();
                }

                public List<TblColRef> getAllColumns() {
                    return BatchCubingInputSide.this.flatDesc.getFactColumns();
                }

                public List<TblColRef> getFactColumns() {
                    return null;
                }

                public int getColumnIndex(TblColRef tblColRef) {
                    return 0;
                }

                public SegmentRange getSegRange() {
                    return null;
                }

                public TblColRef getDistributedBy() {
                    return null;
                }

                public TblColRef getClusterBy() {
                    return null;
                }

                public ISegment getSegment() {
                    return null;
                }

                public boolean useAlias() {
                    return false;
                }
            };
            String generateDropTableStatement = JoinedFlatTable.generateDropTableStatement(iJoinedFlatTableDesc);
            String generateCreateTableStatement = JoinedFlatTable.generateCreateTableStatement(iJoinedFlatTableDesc, str2, "SEQUENCEFILE");
            String generateDropTableStatement2 = JoinedFlatTable.generateDropTableStatement(this.flatDesc);
            String generateCreateTableStatement2 = JoinedFlatTable.generateCreateTableStatement(this.flatDesc, str2);
            String replace = JoinedFlatTable.generateInsertDataStatement(this.flatDesc).replace(this.flatDesc.getDataModel().getRootFactTableName() + " ", str + " ");
            CreateFlatHiveTableStep createFlatHiveTableStep = new CreateFlatHiveTableStep();
            CubingExecutableUtil.setCubeName(this.cubeName, createFlatHiveTableStep.getParams());
            createFlatHiveTableStep.setInitStatement(generateHiveInitStatements);
            createFlatHiveTableStep.setCreateTableStatement(generateDropTableStatement + generateCreateTableStatement + generateDropTableStatement2 + generateCreateTableStatement2 + replace);
            createFlatHiveTableStep.setName("Create Intermediate Flat Hive Table");
            this.intermediateTables.add(this.flatDesc.getTableName());
            this.intermediateTables.add(str);
            this.intermediatePaths.add(str2 + "/" + this.flatDesc.getTableName());
            this.intermediatePaths.add(str2 + "/" + str);
            return createFlatHiveTableStep;
        }

        protected String getJobWorkingDir(DefaultChainedExecutable defaultChainedExecutable) {
            return JobBuilderSupport.getJobWorkingDir(this.config.getHdfsWorkingDirectory(), defaultChainedExecutable.getId());
        }

        private MapReduceExecutable createSaveKafkaDataStep(String str, String str2) {
            MapReduceExecutable mapReduceExecutable = new MapReduceExecutable();
            mapReduceExecutable.setName("Save data from Kafka");
            mapReduceExecutable.setMapReduceJobClass(KafkaFlatTableJob.class);
            JobBuilderSupport jobBuilderSupport = new JobBuilderSupport(this.seg, "system");
            StringBuilder sb = new StringBuilder();
            jobBuilderSupport.appendMapReduceParameters(sb);
            JobBuilderSupport.appendExecCmdParameters(sb, "cubename", this.seg.getRealization().getName());
            JobBuilderSupport.appendExecCmdParameters(sb, "output", str2);
            JobBuilderSupport.appendExecCmdParameters(sb, "segmentid", this.seg.getUuid());
            JobBuilderSupport.appendExecCmdParameters(sb, "jobname", "Kylin_Save_Kafka_Data_" + this.seg.getRealization().getName() + "_Step");
            mapReduceExecutable.setMapReduceParams(sb.toString());
            return mapReduceExecutable;
        }

        public void addStepPhase4_Cleanup(DefaultChainedExecutable defaultChainedExecutable) {
            HiveMRInput.GarbageCollectionStep garbageCollectionStep = new HiveMRInput.GarbageCollectionStep();
            garbageCollectionStep.setName("Hive Cleanup");
            garbageCollectionStep.setIntermediateTables(this.intermediateTables);
            garbageCollectionStep.setExternalDataPaths(this.intermediatePaths);
            defaultChainedExecutable.addTask(garbageCollectionStep);
        }

        public IMRInput.IMRTableInputFormat getFlatTableInputFormat() {
            return new KafkaTableInputFormat(this.seg, this.conf);
        }
    }

    @Deprecated
    /* loaded from: input_file:org/apache/kylin/source/kafka/KafkaMRInput$GarbageCollectionStep.class */
    public static class GarbageCollectionStep extends AbstractExecutable {
        private static final Logger logger = LoggerFactory.getLogger(GarbageCollectionStep.class);

        protected ExecuteResult doWork(ExecutableContext executableContext) throws ExecuteException {
            try {
                rmdirOnHDFS(getDataPath());
                return new ExecuteResult(ExecuteResult.State.SUCCEED, "HDFS path " + getDataPath() + " is dropped.\n");
            } catch (IOException e) {
                logger.error("job:" + getId() + " execute finished with exception", e);
                return ExecuteResult.createError(e);
            }
        }

        private void rmdirOnHDFS(String str) throws IOException {
            Path path = new Path(str);
            FileSystem workingFileSystem = HadoopUtil.getWorkingFileSystem();
            if (workingFileSystem.exists(path)) {
                workingFileSystem.delete(path, true);
            }
        }

        public void setDataPath(String str) {
            setParam("dataPath", str);
        }

        private String getDataPath() {
            return getParam("dataPath");
        }
    }

    /* loaded from: input_file:org/apache/kylin/source/kafka/KafkaMRInput$KafkaMRBatchMergeInputSide.class */
    class KafkaMRBatchMergeInputSide implements IMRInput.IMRBatchMergeInputSide {
        private CubeSegment cubeSegment;

        KafkaMRBatchMergeInputSide(CubeSegment cubeSegment) {
            this.cubeSegment = cubeSegment;
        }

        public void addStepPhase1_MergeDictionary(DefaultChainedExecutable defaultChainedExecutable) {
            MergeOffsetStep mergeOffsetStep = new MergeOffsetStep();
            mergeOffsetStep.setName("Merge offset step");
            CubingExecutableUtil.setCubeName(this.cubeSegment.getCubeInstance().getName(), mergeOffsetStep.getParams());
            CubingExecutableUtil.setSegmentId(this.cubeSegment.getUuid(), mergeOffsetStep.getParams());
            CubingExecutableUtil.setCubingJobId(defaultChainedExecutable.getId(), mergeOffsetStep.getParams());
            defaultChainedExecutable.addTask(mergeOffsetStep);
        }
    }

    /* loaded from: input_file:org/apache/kylin/source/kafka/KafkaMRInput$KafkaTableInputFormat.class */
    public static class KafkaTableInputFormat implements IMRInput.IMRTableInputFormat {
        private final CubeSegment cubeSegment;
        private final JobEngineConfig conf;
        private String delimiter = "\u0001";

        public KafkaTableInputFormat(CubeSegment cubeSegment, JobEngineConfig jobEngineConfig) {
            this.cubeSegment = cubeSegment;
            this.conf = jobEngineConfig;
        }

        public void configureJob(Job job) {
            job.setInputFormatClass(SequenceFileInputFormat.class);
            try {
                FileInputFormat.addInputPath(job, new Path(JoinedFlatTable.getTableDir(new CubeJoinedFlatTableDesc(this.cubeSegment), JobBuilderSupport.getJobWorkingDir(this.conf, job.getConfiguration().get("cubingJobId")))));
            } catch (IOException e) {
                throw new IllegalStateException(e);
            }
        }

        public Collection<String[]> parseMapperInput(Object obj) {
            Text text = (Text) obj;
            return Collections.singletonList(Bytes.toString(text.getBytes(), 0, text.getLength()).split(this.delimiter));
        }
    }

    public IMRInput.IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc iJoinedFlatTableDesc) {
        this.cubeSegment = iJoinedFlatTableDesc.getSegment();
        return new BatchCubingInputSide(this.cubeSegment, iJoinedFlatTableDesc);
    }

    public IMRInput.IMRTableInputFormat getTableInputFormat(TableDesc tableDesc) {
        return new KafkaTableInputFormat(this.cubeSegment, null);
    }

    public IMRInput.IMRBatchMergeInputSide getBatchMergeInputSide(ISegment iSegment) {
        return new KafkaMRBatchMergeInputSide((CubeSegment) iSegment);
    }
}
