package org.apache.kafka.connect.runtime;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/kafka/connect/runtime/WorkerSinkTask.class */
public class WorkerSinkTask implements WorkerTask {
    private static final Logger log;
    private final ConnectorTaskId id;
    private final SinkTask task;
    private final WorkerConfig workerConfig;
    private final Time time;
    private final Converter keyConverter;
    private final Converter valueConverter;
    private WorkerSinkTaskThread workThread;
    private Map<String, String> taskProps;
    private KafkaConsumer<byte[], byte[]> consumer;
    private WorkerSinkTaskContext context;
    private Map<TopicPartition, OffsetAndMetadata> lastCommittedOffsets;
    static final /* synthetic */ boolean $assertionsDisabled;
    private boolean started = false;
    private final List<SinkRecord> messageBatch = new ArrayList();
    private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap();
    private boolean pausedForRedelivery = false;
    private RuntimeException rebalanceException = null;

    /* loaded from: input_file:org/apache/kafka/connect/runtime/WorkerSinkTask$HandleRebalance.class */
    private class HandleRebalance implements ConsumerRebalanceListener {
        private HandleRebalance() {
        }

        public void onPartitionsAssigned(Collection<TopicPartition> collection) {
            if (WorkerSinkTask.this.rebalanceException != null) {
                return;
            }
            WorkerSinkTask.this.lastCommittedOffsets = new HashMap();
            WorkerSinkTask.this.currentOffsets = new HashMap();
            for (TopicPartition topicPartition : collection) {
                long position = WorkerSinkTask.this.consumer.position(topicPartition);
                WorkerSinkTask.this.lastCommittedOffsets.put(topicPartition, new OffsetAndMetadata(position));
                WorkerSinkTask.this.currentOffsets.put(topicPartition, new OffsetAndMetadata(position));
                WorkerSinkTask.log.debug("{} assigned topic partition {} with offset {}", new Object[]{WorkerSinkTask.this.id, topicPartition, Long.valueOf(position)});
            }
            if (WorkerSinkTask.this.pausedForRedelivery) {
                WorkerSinkTask.this.pausedForRedelivery = false;
                HashSet hashSet = new HashSet(collection);
                Set<TopicPartition> pausedPartitions = WorkerSinkTask.this.context.pausedPartitions();
                for (TopicPartition topicPartition2 : collection) {
                    if (!pausedPartitions.contains(topicPartition2)) {
                        WorkerSinkTask.this.consumer.resume(new TopicPartition[]{topicPartition2});
                    }
                }
                Iterator<TopicPartition> it = pausedPartitions.iterator();
                while (it.hasNext()) {
                    if (hashSet.contains(it.next())) {
                        it.remove();
                    }
                }
            }
            if (WorkerSinkTask.this.started) {
                try {
                    WorkerSinkTask.this.task.onPartitionsAssigned(collection);
                } catch (RuntimeException e) {
                    WorkerSinkTask.this.rebalanceException = e;
                }
            }
        }

        public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            if (WorkerSinkTask.this.started) {
                try {
                    WorkerSinkTask.this.task.onPartitionsRevoked(collection);
                    WorkerSinkTask.this.commitOffsets(true, -1);
                } catch (RuntimeException e) {
                    WorkerSinkTask.this.rebalanceException = e;
                }
            }
            WorkerSinkTask.this.messageBatch.clear();
        }
    }

    public WorkerSinkTask(ConnectorTaskId connectorTaskId, SinkTask sinkTask, WorkerConfig workerConfig, Converter converter, Converter converter2, Time time) {
        this.id = connectorTaskId;
        this.task = sinkTask;
        this.workerConfig = workerConfig;
        this.keyConverter = converter;
        this.valueConverter = converter2;
        this.time = time;
    }

    @Override // org.apache.kafka.connect.runtime.WorkerTask
    public void start(Map<String, String> map) {
        this.taskProps = map;
        this.consumer = createConsumer();
        this.context = new WorkerSinkTaskContext(this.consumer);
        this.workThread = createWorkerThread();
        this.workThread.start();
    }

    @Override // org.apache.kafka.connect.runtime.WorkerTask
    public void stop() {
        if (this.workThread != null) {
            this.workThread.startGracefulShutdown();
        }
        this.consumer.wakeup();
    }

    @Override // org.apache.kafka.connect.runtime.WorkerTask
    public boolean awaitStop(long j) {
        boolean z = true;
        if (this.workThread != null) {
            try {
                z = this.workThread.awaitShutdown(j, TimeUnit.MILLISECONDS);
                if (!z) {
                    this.workThread.forceShutdown();
                }
            } catch (InterruptedException e) {
                z = false;
            }
        }
        this.task.stop();
        return z;
    }

    @Override // org.apache.kafka.connect.runtime.WorkerTask
    public void close() {
        if (this.consumer != null) {
            this.consumer.close();
        }
    }

    public boolean joinConsumerGroupAndStart() {
        String str = this.taskProps.get(ConnectorConfig.TOPICS_CONFIG);
        if (str == null || str.isEmpty()) {
            throw new ConnectException("Sink tasks require a list of topics.");
        }
        String[] split = str.split(",");
        log.debug("Task {} subscribing to topics {}", this.id, split);
        this.consumer.subscribe(Arrays.asList(split), new HandleRebalance());
        try {
            pollConsumer(0L);
            this.task.initialize(this.context);
            this.task.start(this.taskProps);
            log.info("Sink task {} finished initialization and start", this);
            this.started = true;
            return true;
        } catch (WakeupException e) {
            log.error("Sink task {} was stopped before completing join group. Task initialization and start is being skipped", this);
            return false;
        }
    }

    public void poll(long j) {
        try {
            rewind();
            long timeout = this.context.timeout();
            if (timeout > 0) {
                j = Math.min(j, timeout);
                this.context.timeout(-1L);
            }
            log.trace("{} polling consumer with timeout {} ms", this.id, Long.valueOf(j));
            ConsumerRecords<byte[], byte[]> pollConsumer = pollConsumer(j);
            if (!$assertionsDisabled && !this.messageBatch.isEmpty() && !pollConsumer.isEmpty()) {
                throw new AssertionError();
            }
            log.trace("{} polling returned {} messages", this.id, Integer.valueOf(pollConsumer.count()));
            convertMessages(pollConsumer);
            deliverMessages();
        } catch (WakeupException e) {
            log.trace("{} consumer woken up", this.id);
        }
    }

    public void commitOffsets(boolean z, final int i) {
        log.info("{} Committing offsets", this);
        HashMap hashMap = new HashMap(this.currentOffsets);
        try {
            this.task.flush(hashMap);
            if (!z) {
                this.consumer.commitAsync(hashMap, new OffsetCommitCallback() { // from class: org.apache.kafka.connect.runtime.WorkerSinkTask.1
                    public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception exc) {
                        WorkerSinkTask.this.lastCommittedOffsets = map;
                        WorkerSinkTask.this.workThread.onCommitCompleted(exc, i);
                    }
                });
                return;
            }
            try {
                this.consumer.commitSync(hashMap);
                this.lastCommittedOffsets = hashMap;
                this.workThread.onCommitCompleted(null, i);
            } catch (KafkaException e) {
                this.workThread.onCommitCompleted(e, i);
            }
        } catch (Throwable th) {
            log.error("Commit of {} offsets failed due to exception while flushing:", this, th);
            log.error("Rewinding offsets to last committed offsets");
            for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : this.lastCommittedOffsets.entrySet()) {
                log.debug("{} Rewinding topic partition {} to offset {}", new Object[]{this.id, entry.getKey(), Long.valueOf(entry.getValue().offset())});
                this.consumer.seek(entry.getKey(), entry.getValue().offset());
            }
            this.currentOffsets = new HashMap(this.lastCommittedOffsets);
            this.workThread.onCommitCompleted(th, i);
        }
    }

    public Time time() {
        return this.time;
    }

    public WorkerConfig workerConfig() {
        return this.workerConfig;
    }

    public String toString() {
        return "WorkerSinkTask{id=" + this.id + '}';
    }

    private ConsumerRecords<byte[], byte[]> pollConsumer(long j) {
        ConsumerRecords<byte[], byte[]> poll = this.consumer.poll(j);
        if (this.rebalanceException == null) {
            return poll;
        }
        RuntimeException runtimeException = this.rebalanceException;
        this.rebalanceException = null;
        throw runtimeException;
    }

    private KafkaConsumer<byte[], byte[]> createConsumer() {
        HashMap hashMap = new HashMap();
        hashMap.put(DistributedConfig.GROUP_ID_CONFIG, "connect-" + this.id.connector());
        hashMap.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, Utils.join(this.workerConfig.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
        hashMap.put("enable.auto.commit", "false");
        hashMap.put("auto.offset.reset", "earliest");
        hashMap.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        hashMap.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        hashMap.putAll(this.workerConfig.originalsWithPrefix("consumer."));
        try {
            return new KafkaConsumer<>(hashMap);
        } catch (Throwable th) {
            throw new ConnectException("Failed to create consumer", th);
        }
    }

    private WorkerSinkTaskThread createWorkerThread() {
        return new WorkerSinkTaskThread(this, "WorkerSinkTask-" + this.id, this.time, this.workerConfig);
    }

    private void convertMessages(ConsumerRecords<byte[], byte[]> consumerRecords) {
        Iterator it = consumerRecords.iterator();
        while (it.hasNext()) {
            ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
            log.trace("Consuming message with key {}, value {}", consumerRecord.key(), consumerRecord.value());
            SchemaAndValue connectData = this.keyConverter.toConnectData(consumerRecord.topic(), (byte[]) consumerRecord.key());
            SchemaAndValue connectData2 = this.valueConverter.toConnectData(consumerRecord.topic(), (byte[]) consumerRecord.value());
            this.messageBatch.add(new SinkRecord(consumerRecord.topic(), consumerRecord.partition(), connectData.schema(), connectData.value(), connectData2.schema(), connectData2.value(), consumerRecord.offset()));
        }
    }

    private void deliverMessages() {
        try {
            this.task.put(new ArrayList(this.messageBatch));
            for (SinkRecord sinkRecord : this.messageBatch) {
                this.currentOffsets.put(new TopicPartition(sinkRecord.topic(), sinkRecord.kafkaPartition().intValue()), new OffsetAndMetadata(sinkRecord.kafkaOffset() + 1));
            }
            this.messageBatch.clear();
            if (this.pausedForRedelivery) {
                for (TopicPartition topicPartition : this.consumer.assignment()) {
                    if (!this.context.pausedPartitions().contains(topicPartition)) {
                        this.consumer.resume(new TopicPartition[]{topicPartition});
                    }
                }
                this.pausedForRedelivery = false;
            }
        } catch (RetriableException e) {
            log.error("RetriableException from SinkTask {}:", this.id, e);
            this.pausedForRedelivery = true;
            Iterator it = this.consumer.assignment().iterator();
            while (it.hasNext()) {
                this.consumer.pause(new TopicPartition[]{(TopicPartition) it.next()});
            }
        } catch (Throwable th) {
            log.error("Task {} threw an uncaught and unrecoverable exception", this.id);
            log.error("Task is being killed and will not recover until manually restarted:", th);
            throw new ConnectException("Exiting WorkerSinkTask due to unrecoverable exception.");
        }
    }

    private void rewind() {
        Map<TopicPartition, Long> offsets = this.context.offsets();
        if (offsets.isEmpty()) {
            return;
        }
        for (TopicPartition topicPartition : offsets.keySet()) {
            Long l = offsets.get(topicPartition);
            if (l != null) {
                log.trace("Rewind {} to offset {}.", topicPartition, l);
                this.consumer.seek(topicPartition, l.longValue());
                this.lastCommittedOffsets.put(topicPartition, new OffsetAndMetadata(l.longValue()));
                this.currentOffsets.put(topicPartition, new OffsetAndMetadata(l.longValue()));
            }
        }
        this.context.clearOffsets();
    }

    static {
        $assertionsDisabled = !WorkerSinkTask.class.desiredAssertionStatus();
        log = LoggerFactory.getLogger(WorkerSinkTask.class);
    }
}
