package com.snowflake.kafka.connector.internal;

import com.snowflake.kafka.connector.SnowflakeSinkConnectorConfig;
import com.snowflake.kafka.connector.Utils;
import com.snowflake.kafka.connector.internal.InternalUtils;
import com.snowflake.kafka.connector.records.RecordService;
import com.snowflake.kafka.connector.records.SnowflakeJsonSchema;
import com.snowflake.kafka.connector.records.SnowflakeMetadataConfig;
import com.snowflake.kafka.connector.records.SnowflakeRecordContent;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.sink.SinkRecord;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1.class */
public class SnowflakeSinkServiceV1 extends Logging implements SnowflakeSinkService {
    private static final long ONE_HOUR = 3600000;
    private static final long TEN_MINUTES = 600000;
    private static final long CLEAN_TIME = 60000;
    private long flushTime;
    private long fileSize;
    private long recordNum;
    private final SnowflakeConnectionService conn;
    private final Map<String, ServiceContext> pipes;
    private final RecordService recordService;
    private boolean isStopped;
    private final SnowflakeTelemetryService telemetryService;
    private Map<String, String> topic2TableMap;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1$ServiceContext.class */
    public class ServiceContext {
        private final String tableName;
        private final String stageName;
        private final String pipeName;
        private final SnowflakeConnectionService conn;
        private final SnowflakeIngestionService ingestionService;
        private List<String> fileNames;
        private List<String> cleanerFileNames;
        private PartitionBuffer buffer;
        private final String prefix;
        private final AtomicLong committedOffset;
        private final AtomicLong flushedOffset;
        private final AtomicLong processedOffset;
        private long previousFlushTimeStamp;
        private final ExecutorService cleanerExecutor;
        private final Lock bufferLock;
        private final Lock fileListLock;
        private final Lock usageDataLock;
        private long startTime;
        private long totalNumberOfRecord;
        private long totalSizeOfData;
        private boolean hasInitialized;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:com/snowflake/kafka/connector/internal/SnowflakeSinkServiceV1$ServiceContext$PartitionBuffer.class */
        public class PartitionBuffer {
            private final StringBuilder stringBuilder;
            private int numOfRecord;
            private int bufferSize;
            private long firstOffset;
            private long lastOffset;

            /* JADX INFO: Access modifiers changed from: private */
            public int getNumOfRecord() {
                return this.numOfRecord;
            }

            /* JADX INFO: Access modifiers changed from: private */
            public int getBufferSize() {
                return this.bufferSize;
            }

            /* JADX INFO: Access modifiers changed from: private */
            public long getFirstOffset() {
                return this.firstOffset;
            }

            /* JADX INFO: Access modifiers changed from: private */
            public long getLastOffset() {
                return this.lastOffset;
            }

            private PartitionBuffer() {
                this.stringBuilder = new StringBuilder();
                this.numOfRecord = 0;
                this.bufferSize = 0;
                this.firstOffset = -1L;
                this.lastOffset = -1L;
            }

            /* JADX INFO: Access modifiers changed from: private */
            public void insert(SinkRecord sinkRecord) {
                String processRecord = SnowflakeSinkServiceV1.this.recordService.processRecord(sinkRecord);
                if (this.bufferSize == 0) {
                    this.firstOffset = sinkRecord.kafkaOffset();
                }
                this.stringBuilder.append(processRecord);
                this.numOfRecord++;
                this.bufferSize += processRecord.length() * 2;
                this.lastOffset = sinkRecord.kafkaOffset();
            }

            /* JADX INFO: Access modifiers changed from: private */
            public boolean isEmpty() {
                return this.numOfRecord == 0;
            }

            /* JADX INFO: Access modifiers changed from: private */
            public String getData() {
                String sb = this.stringBuilder.toString();
                SnowflakeSinkServiceV1.this.logDebug("flush buffer: {} records, {} bytes, offset {} - {}", Integer.valueOf(this.numOfRecord), Integer.valueOf(this.bufferSize), Long.valueOf(this.firstOffset), Long.valueOf(this.lastOffset));
                ServiceContext.this.updateUsageData(this.numOfRecord, this.bufferSize);
                return sb;
            }
        }

        private ServiceContext(String str, String str2, String str3, SnowflakeConnectionService snowflakeConnectionService, int i) {
            this.hasInitialized = false;
            this.pipeName = str3;
            this.tableName = str;
            this.stageName = str2;
            this.conn = snowflakeConnectionService;
            this.fileNames = new LinkedList();
            this.cleanerFileNames = new LinkedList();
            this.buffer = new PartitionBuffer();
            this.ingestionService = snowflakeConnectionService.buildIngestService(str2, str3);
            this.prefix = FileNameUtils.filePrefix(snowflakeConnectionService.getConnectorName(), str, i);
            this.processedOffset = new AtomicLong(-1L);
            this.flushedOffset = new AtomicLong(-1L);
            this.committedOffset = new AtomicLong(0L);
            this.previousFlushTimeStamp = System.currentTimeMillis();
            this.bufferLock = new ReentrantLock();
            this.fileListLock = new ReentrantLock();
            this.usageDataLock = new ReentrantLock();
            this.totalNumberOfRecord = 0L;
            this.totalSizeOfData = 0L;
            this.startTime = System.currentTimeMillis();
            this.cleanerExecutor = Executors.newSingleThreadExecutor();
            SnowflakeSinkServiceV1.this.logInfo("pipe: {} - service started", str3);
        }

        private void init() {
            SnowflakeSinkServiceV1.this.logInfo("init pipe: {}", this.pipeName);
            createTableAndStage();
            recover();
            try {
                startCleaner();
            } catch (Exception e) {
                SnowflakeSinkServiceV1.this.logWarn("Cleaner and Flusher threads shut down before initialization");
            }
        }

        private void startCleaner() {
            List<String> listStage = this.conn.listStage(this.stageName, this.prefix);
            this.fileListLock.lock();
            try {
                this.cleanerFileNames.addAll(listStage);
                this.cleanerExecutor.submit(() -> {
                    SnowflakeSinkServiceV1.this.logInfo("pipe {}: cleaner started", this.pipeName);
                    while (!SnowflakeSinkServiceV1.this.isStopped) {
                        try {
                            Thread.sleep(60000L);
                            checkStatus();
                            if (System.currentTimeMillis() - this.startTime > SnowflakeSinkServiceV1.ONE_HOUR) {
                                sendUsageReport();
                            }
                        } catch (InterruptedException e) {
                            SnowflakeSinkServiceV1.this.logInfo("Cleaner terminated by an interrupt:\n{}", e.getMessage());
                            return;
                        }
                    }
                });
            } finally {
                this.fileListLock.unlock();
            }
        }

        private void stopCleaner() {
            this.cleanerExecutor.shutdownNow();
            SnowflakeSinkServiceV1.this.logInfo("pipe {}: cleaner terminated", this.pipeName);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void insert(SinkRecord sinkRecord) {
            SnowflakeRecordContent snowflakeRecordContent;
            SinkRecord sinkRecord2;
            if (!this.hasInitialized) {
                init();
                this.hasInitialized = true;
            }
            if (sinkRecord.kafkaOffset() > this.processedOffset.get()) {
                if (sinkRecord.value() instanceof SnowflakeRecordContent) {
                    sinkRecord2 = sinkRecord;
                } else {
                    try {
                        snowflakeRecordContent = new SnowflakeRecordContent(sinkRecord.valueSchema(), sinkRecord.value());
                    } catch (Exception e) {
                        snowflakeRecordContent = new SnowflakeRecordContent();
                        SnowflakeSinkServiceV1.this.logError("native content parser error:\n{}", e.getMessage());
                    }
                    sinkRecord2 = new SinkRecord(sinkRecord.topic(), sinkRecord.kafkaPartition().intValue(), sinkRecord.keySchema(), sinkRecord.key(), new SnowflakeJsonSchema(), snowflakeRecordContent, sinkRecord.kafkaOffset(), sinkRecord.timestamp(), sinkRecord.timestampType(), sinkRecord.headers());
                }
                if (((SnowflakeRecordContent) sinkRecord2.value()).isBroken()) {
                    writeBrokenDataToTableStage(sinkRecord2);
                    return;
                }
                PartitionBuffer partitionBuffer = null;
                this.bufferLock.lock();
                try {
                    this.processedOffset.set(sinkRecord2.kafkaOffset());
                    this.buffer.insert(sinkRecord2);
                    if (this.buffer.getBufferSize() >= SnowflakeSinkServiceV1.this.getFileSize() || (SnowflakeSinkServiceV1.this.getRecordNumber() != 0 && this.buffer.getNumOfRecord() >= SnowflakeSinkServiceV1.this.getRecordNumber())) {
                        partitionBuffer = this.buffer;
                        this.buffer = new PartitionBuffer();
                    }
                    if (partitionBuffer != null) {
                        flush(partitionBuffer);
                    }
                } finally {
                    this.bufferLock.unlock();
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean shouldFlush() {
            return System.currentTimeMillis() - this.previousFlushTimeStamp >= SnowflakeSinkServiceV1.this.getFlushTime() * 1000;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void flushBuffer() {
            if (this.buffer.isEmpty()) {
                return;
            }
            this.bufferLock.lock();
            try {
                PartitionBuffer partitionBuffer = this.buffer;
                this.buffer = new PartitionBuffer();
                flush(partitionBuffer);
            } finally {
                this.bufferLock.unlock();
            }
        }

        private void writeBrokenDataToTableStage(SinkRecord sinkRecord) {
            this.conn.putToTableStage(this.tableName, FileNameUtils.brokenRecordFileName(this.prefix, sinkRecord.kafkaOffset()), ((SnowflakeRecordContent) sinkRecord.value()).getBrokenData());
        }

        /* JADX INFO: Access modifiers changed from: private */
        public long getOffset() {
            if (this.fileNames.isEmpty()) {
                return this.committedOffset.get();
            }
            HashSet hashSet = new HashSet();
            this.fileListLock.lock();
            try {
                hashSet.addAll(this.fileNames);
                this.fileNames = new LinkedList();
                this.ingestionService.ingestFiles(hashSet);
                this.committedOffset.set(this.flushedOffset.get());
                return this.committedOffset.get();
            } finally {
                this.fileListLock.unlock();
            }
        }

        private void flush(PartitionBuffer partitionBuffer) {
            if (partitionBuffer == null || partitionBuffer.isEmpty()) {
                return;
            }
            this.previousFlushTimeStamp = System.currentTimeMillis();
            String fileName = FileNameUtils.fileName(this.prefix, partitionBuffer.getFirstOffset(), partitionBuffer.getLastOffset());
            this.conn.put(this.stageName, fileName, partitionBuffer.getData());
            this.flushedOffset.set(Math.max(partitionBuffer.getLastOffset() + 1, this.flushedOffset.get()));
            this.fileListLock.lock();
            try {
                this.fileNames.add(fileName);
                this.cleanerFileNames.add(fileName);
                this.fileListLock.unlock();
                SnowflakeSinkServiceV1.this.logInfo("pipe {}, flush pipe: {}", this.pipeName, fileName);
            } catch (Throwable th) {
                this.fileListLock.unlock();
                throw th;
            }
        }

        private void checkStatus() {
            this.fileListLock.lock();
            try {
                List<String> list = this.cleanerFileNames;
                this.cleanerFileNames = new LinkedList();
                this.fileListLock.unlock();
                long currentTimeMillis = System.currentTimeMillis();
                LinkedList linkedList = new LinkedList();
                LinkedList linkedList2 = new LinkedList();
                filterResult(this.ingestionService.readIngestReport(list), list, linkedList, linkedList2);
                LinkedList linkedList3 = new LinkedList();
                new LinkedList(list).forEach(str -> {
                    long fileNameToTimeIngested = FileNameUtils.fileNameToTimeIngested(str);
                    if (fileNameToTimeIngested < currentTimeMillis - SnowflakeSinkServiceV1.ONE_HOUR) {
                        linkedList2.add(str);
                        list.remove(str);
                    } else if (fileNameToTimeIngested < currentTimeMillis - SnowflakeSinkServiceV1.TEN_MINUTES) {
                        linkedList3.add(str);
                    }
                });
                if (!linkedList3.isEmpty()) {
                    filterResult(this.ingestionService.readOneHourHistory(list, currentTimeMillis - SnowflakeSinkServiceV1.ONE_HOUR), list, linkedList, linkedList2);
                }
                purge(linkedList);
                moveToTableStage(linkedList2);
                this.fileListLock.lock();
                try {
                    this.cleanerFileNames.addAll(list);
                } finally {
                }
            } finally {
            }
        }

        private void filterResult(Map<String, InternalUtils.IngestedFileStatus> map, List<String> list, List<String> list2, List<String> list3) {
            map.forEach((str, ingestedFileStatus) -> {
                switch (ingestedFileStatus) {
                    case LOADED:
                        list2.add(str);
                        list.remove(str);
                        return;
                    case FAILED:
                    case PARTIALLY_LOADED:
                        list3.add(str);
                        list.remove(str);
                        return;
                    default:
                        return;
                }
            });
        }

        private void purge(List<String> list) {
            if (list.isEmpty()) {
                return;
            }
            this.conn.purgeStage(this.stageName, list);
        }

        private void moveToTableStage(List<String> list) {
            if (list.isEmpty()) {
                return;
            }
            this.conn.moveToTableStage(this.tableName, this.stageName, list);
            SnowflakeSinkServiceV1.this.telemetryService.reportKafkaFileFailure(this.tableName, this.stageName, list);
        }

        private void recover() {
            if (!this.conn.pipeExist(this.pipeName)) {
                this.conn.createPipe(this.tableName, this.stageName, this.pipeName);
            } else {
                if (!this.conn.isPipeCompatible(this.tableName, this.stageName, this.pipeName)) {
                    throw SnowflakeErrors.ERROR_5005.getException("pipe name: " + this.pipeName, this.conn.getTelemetryClient());
                }
                SnowflakeSinkServiceV1.this.logInfo("pipe {}, recovered from existing pipe", this.pipeName);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void close() {
            try {
                stopCleaner();
            } catch (Exception e) {
                SnowflakeSinkServiceV1.this.logWarn("Failed to terminate Cleaner or Flusher");
            }
            this.ingestionService.close();
            sendUsageReport();
            SnowflakeSinkServiceV1.this.logInfo("pipe {}: service closed", this.pipeName);
        }

        private void createTableAndStage() {
            if (!this.conn.tableExist(this.tableName)) {
                SnowflakeSinkServiceV1.this.logInfo("Creating new table {}.", this.tableName);
                this.conn.createTable(this.tableName);
            } else {
                if (!this.conn.isTableCompatible(this.tableName)) {
                    throw SnowflakeErrors.ERROR_5003.getException("table name: " + this.tableName, SnowflakeSinkServiceV1.this.telemetryService);
                }
                SnowflakeSinkServiceV1.this.logInfo("Using existing table {}.", this.tableName);
                SnowflakeSinkServiceV1.this.telemetryService.reportKafkaReuseTable(this.tableName);
            }
            if (!this.conn.stageExist(this.stageName)) {
                SnowflakeSinkServiceV1.this.logInfo("Creating new stage {}.", this.stageName);
                this.conn.createStage(this.stageName);
            } else {
                if (!this.conn.isStageCompatible(this.stageName)) {
                    throw SnowflakeErrors.ERROR_5004.getException("stage name: " + this.stageName, SnowflakeSinkServiceV1.this.telemetryService);
                }
                SnowflakeSinkServiceV1.this.logInfo("Using existing stage {}.", this.stageName);
                SnowflakeSinkServiceV1.this.telemetryService.reportKafkaReuseStage(this.stageName);
            }
        }

        private void sendUsageReport() {
            this.usageDataLock.lock();
            try {
                long j = this.totalNumberOfRecord;
                this.totalNumberOfRecord = 0L;
                long j2 = this.totalSizeOfData;
                this.totalSizeOfData = 0L;
                long j3 = this.startTime;
                this.startTime = System.currentTimeMillis();
                long j4 = this.startTime;
                this.usageDataLock.unlock();
                SnowflakeSinkServiceV1.this.telemetryService.reportKafkaUsage(j3, j4, j, j2);
            } catch (Throwable th) {
                this.usageDataLock.unlock();
                throw th;
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void updateUsageData(long j, long j2) {
            this.usageDataLock.lock();
            try {
                this.totalSizeOfData += j2;
                this.totalNumberOfRecord += j;
                this.usageDataLock.unlock();
            } catch (Throwable th) {
                this.usageDataLock.unlock();
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SnowflakeSinkServiceV1(SnowflakeConnectionService snowflakeConnectionService) {
        if (snowflakeConnectionService == null || snowflakeConnectionService.isClosed()) {
            throw SnowflakeErrors.ERROR_5010.getException();
        }
        this.fileSize = SnowflakeSinkConnectorConfig.BUFFER_SIZE_BYTES_DEFAULT;
        this.recordNum = SnowflakeSinkConnectorConfig.BUFFER_COUNT_RECORDS_DEFAULT;
        this.flushTime = 30L;
        this.pipes = new HashMap();
        this.conn = snowflakeConnectionService;
        this.recordService = new RecordService();
        this.isStopped = false;
        this.telemetryService = snowflakeConnectionService.getTelemetryClient();
        this.topic2TableMap = new HashMap();
    }

    @Override // com.snowflake.kafka.connector.internal.SnowflakeSinkService
    public void startTask(String str, String str2, int i) {
        String stageName = Utils.stageName(this.conn.getConnectorName(), str);
        String nameIndex = getNameIndex(str2, i);
        if (this.pipes.containsKey(nameIndex)) {
            logError("task is already registered, name: {}", nameIndex);
        } else {
            this.pipes.put(nameIndex, new ServiceContext(str, stageName, Utils.pipeName(this.conn.getConnectorName(), str, i), this.conn, i));
        }
    }

    @Override // com.snowflake.kafka.connector.internal.SnowflakeSinkService
    public void insert(Collection<SinkRecord> collection) {
        Iterator<SinkRecord> it = collection.iterator();
        while (it.hasNext()) {
            insert(it.next());
        }
        for (ServiceContext serviceContext : this.pipes.values()) {
            if (serviceContext.shouldFlush()) {
                serviceContext.flushBuffer();
            }
        }
    }

    @Override // com.snowflake.kafka.connector.internal.SnowflakeSinkService
    public void insert(SinkRecord sinkRecord) {
        String nameIndex = getNameIndex(sinkRecord.topic(), sinkRecord.kafkaPartition().intValue());
        if (!this.pipes.containsKey(nameIndex)) {
            logWarn("Topic: {} Partition: {} hasn't been initialized by OPEN function", sinkRecord.topic(), sinkRecord.kafkaPartition());
            startTask(Utils.tableName(sinkRecord.topic(), this.topic2TableMap), sinkRecord.topic(), sinkRecord.kafkaPartition().intValue());
        }
        this.pipes.get(nameIndex).insert(sinkRecord);
    }

    @Override // com.snowflake.kafka.connector.internal.SnowflakeSinkService
    public long getOffset(TopicPartition topicPartition) {
        String nameIndex = getNameIndex(topicPartition.topic(), topicPartition.partition());
        if (this.pipes.containsKey(nameIndex)) {
            return this.pipes.get(nameIndex).getOffset();
        }
        logError("Failed to find offset of Topic: {}, Partition: {}, sink service hasn't been initialized", topicPartition.topic(), Integer.valueOf(topicPartition.partition()));
        return 0L;
    }

    @Override // com.snowflake.kafka.connector.internal.SnowflakeSinkService
    public void callAllGetOffset() {
        Iterator<ServiceContext> it = this.pipes.values().iterator();
        while (it.hasNext()) {
            it.next().getOffset();
        }
    }

    @Override // com.snowflake.kafka.connector.internal.SnowflakeSinkService
    public void close(Collection<TopicPartition> collection) {
        collection.forEach(topicPartition -> {
            ServiceContext remove = this.pipes.remove(getNameIndex(topicPartition.topic(), topicPartition.partition()));
            if (remove == null) {
                logWarn("Failed to close sink service for Topic: {}, Partition: {}, sink service hasn't been initialized", topicPartition.topic(), Integer.valueOf(topicPartition.partition()));
                return;
            }
            try {
                remove.close();
            } catch (Exception e) {
                logError("Failed to close sink service for Topic: {}, Partition: {}\nMessage:{}", topicPartition.topic(), Integer.valueOf(topicPartition.partition()), e.getMessage());
            }
        });
    }

    @Override // com.snowflake.kafka.connector.internal.SnowflakeSinkService
    public void closeAll() {
        this.isStopped = true;
        this.pipes.forEach((str, serviceContext) -> {
            serviceContext.close();
        });
        this.pipes.clear();
    }

    @Override // com.snowflake.kafka.connector.internal.SnowflakeSinkService
    public boolean isClosed() {
        return this.isStopped;
    }

    @Override // com.snowflake.kafka.connector.internal.SnowflakeSinkService
    public void setRecordNumber(long j) {
        if (j < 0) {
            logError("number of record in each file is {}, it is negative, reset to 0");
            this.recordNum = 0L;
        } else {
            this.recordNum = j;
            logInfo("set number of record limitation to {}", Long.valueOf(j));
        }
    }

    @Override // com.snowflake.kafka.connector.internal.SnowflakeSinkService
    public void setFileSize(long j) {
        if (j > SnowflakeSinkConnectorConfig.BUFFER_SIZE_BYTES_MAX) {
            logError("file size is {} bytes, it is larger than the maximum file size {} bytes, reset to the maximum file size", Long.valueOf(j), Long.valueOf(SnowflakeSinkConnectorConfig.BUFFER_SIZE_BYTES_MAX));
            this.fileSize = SnowflakeSinkConnectorConfig.BUFFER_SIZE_BYTES_MAX;
        } else {
            this.fileSize = j;
            logInfo("set file size limitation to {} bytes", Long.valueOf(j));
        }
    }

    @Override // com.snowflake.kafka.connector.internal.SnowflakeSinkService
    public void setFlushTime(long j) {
        if (j < 10) {
            logError("flush time is {} seconds, it is smaller than the minimum flush time {} seconds, reset to the minimum flush time", Long.valueOf(j), 10L);
            this.flushTime = 10L;
        } else {
            this.flushTime = j;
            logInfo("set flush time to {} seconds", Long.valueOf(j));
        }
    }

    @Override // com.snowflake.kafka.connector.internal.SnowflakeSinkService
    public void setTopic2TableMap(Map<String, String> map) {
        this.topic2TableMap = map;
    }

    @Override // com.snowflake.kafka.connector.internal.SnowflakeSinkService
    public void setMetadataConfig(SnowflakeMetadataConfig snowflakeMetadataConfig) {
        this.recordService.setMetadataConfig(snowflakeMetadataConfig);
    }

    @Override // com.snowflake.kafka.connector.internal.SnowflakeSinkService
    public long getRecordNumber() {
        return this.recordNum;
    }

    @Override // com.snowflake.kafka.connector.internal.SnowflakeSinkService
    public long getFlushTime() {
        return this.flushTime;
    }

    @Override // com.snowflake.kafka.connector.internal.SnowflakeSinkService
    public long getFileSize() {
        return this.fileSize;
    }

    private static String getNameIndex(String str, int i) {
        return str + "_" + i;
    }
}
