package org.apache.kylin.source.kafka;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.IMRInput;
import org.apache.kylin.engine.spark.ISparkInput;
import org.apache.kylin.metadata.model.ColumnDesc;
import org.apache.kylin.metadata.model.IBuildable;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.TableDesc;
import org.apache.kylin.metadata.model.TableExtDesc;
import org.apache.kylin.metadata.streaming.StreamingConfig;
import org.apache.kylin.metadata.streaming.StreamingManager;
import org.apache.kylin.source.IReadableTable;
import org.apache.kylin.source.ISampleDataDeployer;
import org.apache.kylin.source.ISource;
import org.apache.kylin.source.ISourceMetadataExplorer;
import org.apache.kylin.source.SourcePartition;
import org.apache.kylin.source.kafka.config.KafkaConfig;
import org.apache.kylin.source.kafka.util.KafkaClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kylin/source/kafka/KafkaSource.class */
public class KafkaSource implements ISource {
    private static final Logger logger = LoggerFactory.getLogger(KafkaSource.class);

    public KafkaSource(KylinConfig kylinConfig) {
    }

    public <I> I adaptToBuildEngine(Class<I> cls) {
        if (cls == IMRInput.class) {
            return (I) new KafkaMRInput();
        }
        if (cls == ISparkInput.class) {
            return (I) new KafkaSparkInput();
        }
        throw new RuntimeException("Cannot adapt to " + cls);
    }

    public IReadableTable createReadableTable(TableDesc tableDesc, String str) {
        throw new UnsupportedOperationException();
    }

    public SourcePartition enrichSourcePartitionBeforeBuild(IBuildable iBuildable, SourcePartition sourcePartition) {
        checkSourceOffsets(sourcePartition);
        SourcePartition copyOf = SourcePartition.getCopyOf(sourcePartition);
        SegmentRange segRange = copyOf.getSegRange();
        CubeInstance cubeInstance = (CubeInstance) iBuildable;
        if (segRange == null || segRange.start.v.equals(0L)) {
            CubeSegment lastSegment = cubeInstance.getLastSegment();
            if (lastSegment != null) {
                logger.debug("Last segment exists, continue from last segment " + lastSegment.getName() + "'s end position: " + lastSegment.getSourcePartitionOffsetEnd());
                copyOf.setSourcePartitionOffsetStart(lastSegment.getSourcePartitionOffsetEnd());
            } else if (cubeInstance.getDescriptor().getPartitionOffsetStart() == null || cubeInstance.getDescriptor().getPartitionOffsetStart().size() <= 0) {
                logger.debug("Last segment doesn't exist, and didn't initiate the start offset, will seek from topic's earliest offset.");
                copyOf.setSourcePartitionOffsetStart(KafkaClient.getEarliestOffsets(cubeInstance));
            } else {
                logger.debug("Last segment doesn't exist, use the start offset that be initiated previously: " + cubeInstance.getDescriptor().getPartitionOffsetStart());
                copyOf.setSourcePartitionOffsetStart(cubeInstance.getDescriptor().getPartitionOffsetStart());
            }
        }
        KafkaConfig kafkaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cubeInstance.getRootFactTable());
        String kafkaBrokers = KafkaClient.getKafkaBrokers(kafkaConfig);
        String topic = kafkaConfig.getTopic();
        KafkaConsumer kafkaConsumer = KafkaClient.getKafkaConsumer(kafkaBrokers, cubeInstance.getName(), null);
        Throwable th = null;
        try {
            try {
                List<PartitionInfo> partitionsFor = kafkaConsumer.partitionsFor(topic);
                logger.info("Get {} partitions for topic {} ", Integer.valueOf(partitionsFor.size()), topic);
                for (PartitionInfo partitionInfo : partitionsFor) {
                    if (!copyOf.getSourcePartitionOffsetStart().containsKey(Integer.valueOf(partitionInfo.partition()))) {
                        long earliestOffset = KafkaClient.getEarliestOffset(kafkaConsumer, topic, partitionInfo.partition());
                        logger.debug("New partition {} added, with start offset {}", Integer.valueOf(partitionInfo.partition()), Long.valueOf(earliestOffset));
                        copyOf.getSourcePartitionOffsetStart().put(Integer.valueOf(partitionInfo.partition()), Long.valueOf(earliestOffset));
                    }
                }
                if (kafkaConsumer != null) {
                    if (0 != 0) {
                        try {
                            kafkaConsumer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        kafkaConsumer.close();
                    }
                }
                if (segRange == null || segRange.end.v.equals(Long.MAX_VALUE)) {
                    logger.debug("Seek end offsets from topic {}", topic);
                    Map<Integer, Long> latestOffsets = KafkaClient.getLatestOffsets(cubeInstance);
                    logger.debug("The end offsets are " + latestOffsets);
                    for (Integer num : latestOffsets.keySet()) {
                        if (!copyOf.getSourcePartitionOffsetStart().containsKey(num)) {
                            throw new IllegalStateException("New partition added in between, retry.");
                        }
                        if (((Long) copyOf.getSourcePartitionOffsetStart().get(num)).longValue() > latestOffsets.get(num).longValue()) {
                            throw new IllegalArgumentException("Partition " + num + " end offset (" + latestOffsets.get(num) + ") is smaller than start offset ( " + copyOf.getSourcePartitionOffsetStart().get(num) + ")");
                        }
                    }
                    copyOf.setSourcePartitionOffsetEnd(latestOffsets);
                }
                long j = 0;
                long j2 = 0;
                Iterator it = copyOf.getSourcePartitionOffsetStart().values().iterator();
                while (it.hasNext()) {
                    j += ((Long) it.next()).longValue();
                }
                Iterator it2 = copyOf.getSourcePartitionOffsetEnd().values().iterator();
                while (it2.hasNext()) {
                    j2 += ((Long) it2.next()).longValue();
                }
                if (j > j2) {
                    throw new IllegalArgumentException("Illegal offset: start: " + j + ", end: " + j2);
                }
                if (j == j2) {
                    throw new IllegalArgumentException("No new message comes, startOffset = endOffset:" + j);
                }
                copyOf.setSegRange(new SegmentRange(Long.valueOf(j), Long.valueOf(j2)));
                return copyOf;
            } finally {
            }
        } catch (Throwable th3) {
            if (kafkaConsumer != null) {
                if (th != null) {
                    try {
                        kafkaConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaConsumer.close();
                }
            }
            throw th3;
        }
    }

    private void checkSourceOffsets(SourcePartition sourcePartition) {
        if (sourcePartition.getSegRange() == null) {
            return;
        }
        long longValue = ((Long) sourcePartition.getSegRange().start.v).longValue();
        long longValue2 = ((Long) sourcePartition.getSegRange().end.v).longValue();
        Map sourcePartitionOffsetStart = sourcePartition.getSourcePartitionOffsetStart();
        Map sourcePartitionOffsetEnd = sourcePartition.getSourcePartitionOffsetEnd();
        if (longValue2 <= 0 || longValue >= longValue2) {
            throw new IllegalArgumentException("'startOffset' need be smaller than 'endOffset'");
        }
        if (longValue > 0) {
            if (sourcePartitionOffsetStart == null || sourcePartitionOffsetStart.size() == 0) {
                throw new IllegalArgumentException("When 'startOffset' is > 0, need provide each partition's start offset");
            }
            long j = 0;
            Iterator it = sourcePartitionOffsetStart.values().iterator();
            while (it.hasNext()) {
                j += ((Long) it.next()).longValue();
            }
            if (j != longValue) {
                throw new IllegalArgumentException("Invalid 'sourcePartitionOffsetStart', doesn't match with 'startOffset'");
            }
        }
        if (longValue2 <= 0 || longValue2 == Long.MAX_VALUE) {
            return;
        }
        if (sourcePartitionOffsetEnd == null || sourcePartitionOffsetEnd.size() == 0) {
            throw new IllegalArgumentException("When 'endOffset' is not Long.MAX_VALUE, need provide each partition's start offset");
        }
        long j2 = 0;
        Iterator it2 = sourcePartitionOffsetEnd.values().iterator();
        while (it2.hasNext()) {
            j2 += ((Long) it2.next()).longValue();
        }
        if (j2 != longValue2) {
            throw new IllegalArgumentException("Invalid 'sourcePartitionOffsetEnd', doesn't match with 'endOffset'");
        }
    }

    public ISourceMetadataExplorer getSourceMetadataExplorer() {
        return new ISourceMetadataExplorer() { // from class: org.apache.kylin.source.kafka.KafkaSource.1
            public List<String> listDatabases() throws Exception {
                throw new UnsupportedOperationException();
            }

            public List<String> listTables(String str) throws Exception {
                throw new UnsupportedOperationException();
            }

            public Pair<TableDesc, TableExtDesc> loadTableMetadata(String str, String str2, String str3) throws Exception {
                throw new UnsupportedOperationException();
            }

            public List<String> getRelatedKylinResources(TableDesc tableDesc) {
                ArrayList newArrayList = Lists.newArrayList();
                newArrayList.add(KafkaConfig.concatResourcePath(tableDesc.getIdentity()));
                if (tableDesc.getSourceType() == 1) {
                    newArrayList.add(StreamingConfig.concatResourcePath(tableDesc.getIdentity()));
                }
                return newArrayList;
            }

            public ColumnDesc[] evalQueryMetadata(String str) {
                throw new UnsupportedOperationException();
            }

            public void validateSQL(String str) throws Exception {
                throw new UnsupportedOperationException();
            }
        };
    }

    public ISampleDataDeployer getSampleDataDeployer() {
        throw new UnsupportedOperationException();
    }

    public void unloadTable(String str, String str2) throws IOException {
        KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
        StreamingManager streamingManager = StreamingManager.getInstance(instanceFromEnv);
        KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(instanceFromEnv);
        StreamingConfig streamingConfig = streamingManager.getStreamingConfig(str);
        KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(str);
        streamingManager.removeStreamingConfig(streamingConfig);
        kafkaConfigManager.removeKafkaConfig(kafkaConfig);
    }

    public void close() throws IOException {
    }
}
