package org.apache.flume.channel.kafka;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import kafka.utils.ZKGroupTopicDirs;
import kafka.utils.ZkUtils;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.flume.ChannelException;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.flume.channel.BasicChannelSemantics;
import org.apache.flume.channel.BasicTransactionSemantics;
import org.apache.flume.conf.ConfigurationException;
import org.apache.flume.conf.LogPrivacyUtil;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.instrumentation.kafka.KafkaChannelCounter;
import org.apache.flume.source.avro.AvroFlumeEvent;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.security.JaasUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.collection.JavaConverters;

/* loaded from: input_file:org/apache/flume/channel/kafka/KafkaChannel.class */
public class KafkaChannel extends BasicChannelSemantics {
    private static final Logger logger = LoggerFactory.getLogger(KafkaChannel.class);
    private static final int ZK_SESSION_TIMEOUT = 30000;
    private static final int ZK_CONNECTION_TIMEOUT = 30000;
    private KafkaProducer<String, byte[]> producer;
    private Integer staticPartitionId;
    private KafkaChannelCounter counter;
    private final Properties consumerProps = new Properties();
    private final Properties producerProps = new Properties();
    private final String channelUUID = UUID.randomUUID().toString();
    private AtomicReference<String> topic = new AtomicReference<>();
    private boolean parseAsFlumeEvent = true;
    private String zookeeperConnect = null;
    private String topicStr = KafkaChannelConfiguration.DEFAULT_TOPIC;
    private String groupId = KafkaChannelConfiguration.DEFAULT_GROUP_ID;
    private String partitionHeader = null;
    private boolean migrateZookeeperOffsets = true;
    AtomicBoolean rebalanceFlag = new AtomicBoolean();
    private long pollTimeout = 500;
    private final List<ConsumerAndRecords> consumers = Collections.synchronizedList(new LinkedList());
    private final ThreadLocal<ConsumerAndRecords> consumerAndRecords = new ThreadLocal<ConsumerAndRecords>() { // from class: org.apache.flume.channel.kafka.KafkaChannel.1
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.lang.ThreadLocal
        public ConsumerAndRecords initialValue() {
            return KafkaChannel.this.createConsumerAndRecords();
        }
    };

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flume/channel/kafka/KafkaChannel$ConsumerAndRecords.class */
    public class ConsumerAndRecords {
        final KafkaConsumer<String, byte[]> consumer;
        final String uuid;
        final LinkedList<Event> failedEvents = new LinkedList<>();
        ConsumerRecords<String, byte[]> records = ConsumerRecords.empty();
        Iterator<ConsumerRecord<String, byte[]>> recordIterator = this.records.iterator();
        Map<TopicPartition, OffsetAndMetadata> offsets;

        ConsumerAndRecords(KafkaConsumer<String, byte[]> kafkaConsumer, String str) {
            this.consumer = kafkaConsumer;
            this.uuid = str;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void poll() {
            KafkaChannel.logger.trace("Polling with timeout: {}ms channel-{}", Long.valueOf(KafkaChannel.this.pollTimeout), KafkaChannel.this.getName());
            try {
                this.records = this.consumer.poll(KafkaChannel.this.pollTimeout);
                this.recordIterator = this.records.iterator();
                KafkaChannel.logger.debug("{} returned {} records from last poll", KafkaChannel.this.getName(), Integer.valueOf(this.records.count()));
            } catch (WakeupException e) {
                KafkaChannel.logger.trace("Consumer woken up for channel {}.", KafkaChannel.this.getName());
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void commitOffsets() {
            try {
                try {
                    this.consumer.commitSync(this.offsets);
                    KafkaChannel.logger.trace("About to clear offsets map.");
                    this.offsets.clear();
                } catch (Exception e) {
                    KafkaChannel.logger.info("Error committing offsets.", e);
                    KafkaChannel.logger.trace("About to clear offsets map.");
                    this.offsets.clear();
                }
            } catch (Throwable th) {
                KafkaChannel.logger.trace("About to clear offsets map.");
                this.offsets.clear();
                throw th;
            }
        }

        private String getOffsetMapString() {
            StringBuilder sb = new StringBuilder();
            sb.append(KafkaChannel.this.getName()).append(" current offsets map: ");
            for (TopicPartition topicPartition : this.offsets.keySet()) {
                sb.append("p").append(topicPartition.partition()).append("-").append(this.offsets.get(topicPartition).offset()).append(" ");
            }
            return sb.toString();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public String getCommittedOffsetsString() {
            StringBuilder sb = new StringBuilder();
            sb.append(KafkaChannel.this.getName()).append(" committed: ");
            for (TopicPartition topicPartition : this.consumer.assignment()) {
                try {
                    sb.append("[").append(topicPartition).append(",").append(this.consumer.committed(topicPartition).offset()).append("] ");
                } catch (NullPointerException e) {
                    KafkaChannel.logger.debug("Committed {}", topicPartition);
                }
            }
            return sb.toString();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void saveOffsets(TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata) {
            this.offsets.put(topicPartition, offsetAndMetadata);
            if (KafkaChannel.logger.isTraceEnabled()) {
                KafkaChannel.logger.trace(getOffsetMapString());
            }
        }
    }

    /* loaded from: input_file:org/apache/flume/channel/kafka/KafkaChannel$KafkaTransaction.class */
    private class KafkaTransaction extends BasicTransactionSemantics {
        private TransactionType type;
        private Optional<ByteArrayOutputStream> tempOutStream;
        private Optional<LinkedList<ProducerRecord<String, byte[]>>> producerRecords;
        private Optional<LinkedList<Event>> events;
        private Optional<SpecificDatumWriter<AvroFlumeEvent>> writer;
        private Optional<SpecificDatumReader<AvroFlumeEvent>> reader;
        private Optional<LinkedList<Future<RecordMetadata>>> kafkaFutures;
        private final String batchUUID;
        private BinaryEncoder encoder;
        private BinaryDecoder decoder;
        private boolean eventTaken;

        private KafkaTransaction() {
            this.type = TransactionType.NONE;
            this.tempOutStream = Optional.absent();
            this.producerRecords = Optional.absent();
            this.events = Optional.absent();
            this.writer = Optional.absent();
            this.reader = Optional.absent();
            this.kafkaFutures = Optional.absent();
            this.batchUUID = UUID.randomUUID().toString();
            this.encoder = null;
            this.decoder = null;
            this.eventTaken = false;
        }

        protected void doBegin() throws InterruptedException {
            KafkaChannel.this.rebalanceFlag.set(false);
        }

        protected void doPut(Event event) throws InterruptedException {
            String str;
            this.type = TransactionType.PUT;
            if (!this.producerRecords.isPresent()) {
                this.producerRecords = Optional.of(new LinkedList());
            }
            String str2 = (String) event.getHeaders().get(KafkaChannelConfiguration.KEY_HEADER);
            Integer num = null;
            try {
                if (KafkaChannel.this.staticPartitionId != null) {
                    num = KafkaChannel.this.staticPartitionId;
                }
                if (KafkaChannel.this.partitionHeader != null && (str = (String) event.getHeaders().get(KafkaChannel.this.partitionHeader)) != null) {
                    num = Integer.valueOf(Integer.parseInt(str));
                }
                if (num != null) {
                    ((LinkedList) this.producerRecords.get()).add(new ProducerRecord((String) KafkaChannel.this.topic.get(), num, str2, serializeValue(event, KafkaChannel.this.parseAsFlumeEvent)));
                } else {
                    ((LinkedList) this.producerRecords.get()).add(new ProducerRecord((String) KafkaChannel.this.topic.get(), str2, serializeValue(event, KafkaChannel.this.parseAsFlumeEvent)));
                }
            } catch (NumberFormatException e) {
                throw new ChannelException("Non integer partition id specified", e);
            } catch (Exception e2) {
                throw new ChannelException("Error while serializing event", e2);
            }
        }

        protected Event doTake() throws InterruptedException {
            Event deserializeValue;
            KafkaChannel.logger.trace("Starting event take");
            this.type = TransactionType.TAKE;
            try {
                if (!((ConsumerAndRecords) KafkaChannel.this.consumerAndRecords.get()).uuid.equals(KafkaChannel.this.channelUUID)) {
                    KafkaChannel.logger.info("UUID mismatch, creating new consumer");
                    KafkaChannel.this.decommissionConsumerAndRecords((ConsumerAndRecords) KafkaChannel.this.consumerAndRecords.get());
                    KafkaChannel.this.consumerAndRecords.remove();
                }
            } catch (Exception e) {
                KafkaChannel.logger.warn("Error while shutting down consumer", e);
            }
            if (!this.events.isPresent()) {
                this.events = Optional.of(new LinkedList());
            }
            if (KafkaChannel.this.rebalanceFlag.get()) {
                KafkaChannel.logger.debug("Returning null event after Consumer rebalance.");
                return null;
            }
            if (((ConsumerAndRecords) KafkaChannel.this.consumerAndRecords.get()).failedEvents.isEmpty()) {
                if (KafkaChannel.logger.isTraceEnabled()) {
                    KafkaChannel.logger.trace("Assignment during take: {}", ((ConsumerAndRecords) KafkaChannel.this.consumerAndRecords.get()).consumer.assignment().toString());
                }
                try {
                    long nanoTime = System.nanoTime();
                    if (!((ConsumerAndRecords) KafkaChannel.this.consumerAndRecords.get()).recordIterator.hasNext()) {
                        ((ConsumerAndRecords) KafkaChannel.this.consumerAndRecords.get()).poll();
                    }
                    if (!((ConsumerAndRecords) KafkaChannel.this.consumerAndRecords.get()).recordIterator.hasNext()) {
                        return null;
                    }
                    ConsumerRecord<String, byte[]> next = ((ConsumerAndRecords) KafkaChannel.this.consumerAndRecords.get()).recordIterator.next();
                    deserializeValue = deserializeValue((byte[]) next.value(), KafkaChannel.this.parseAsFlumeEvent);
                    ((ConsumerAndRecords) KafkaChannel.this.consumerAndRecords.get()).saveOffsets(new TopicPartition(next.topic(), next.partition()), new OffsetAndMetadata(next.offset() + 1, this.batchUUID));
                    if (next.key() != null) {
                        deserializeValue.getHeaders().put(KafkaChannelConfiguration.KEY_HEADER, next.key());
                    }
                    KafkaChannel.this.counter.addToKafkaEventGetTimer((System.nanoTime() - nanoTime) / 1000000);
                    if (KafkaChannel.logger.isDebugEnabled()) {
                        KafkaChannel.logger.debug("{} processed output from partition {} offset {}", new Object[]{KafkaChannel.this.getName(), Integer.valueOf(next.partition()), Long.valueOf(next.offset())});
                    }
                } catch (Exception e2) {
                    KafkaChannel.logger.warn("Error while getting events from Kafka. This is usually caused by trying to read a non-flume event. Ensure the setting for parseAsFlumeEvent is correct", e2);
                    throw new ChannelException("Error while getting events from Kafka", e2);
                }
            } else {
                deserializeValue = ((ConsumerAndRecords) KafkaChannel.this.consumerAndRecords.get()).failedEvents.removeFirst();
            }
            this.eventTaken = true;
            ((LinkedList) this.events.get()).add(deserializeValue);
            return deserializeValue;
        }

        protected void doCommit() throws InterruptedException {
            KafkaChannel.logger.trace("Starting commit");
            if (this.type.equals(TransactionType.NONE)) {
                return;
            }
            if (!this.type.equals(TransactionType.PUT)) {
                if (((ConsumerAndRecords) KafkaChannel.this.consumerAndRecords.get()).failedEvents.isEmpty() && this.eventTaken) {
                    KafkaChannel.logger.trace("About to commit batch");
                    long nanoTime = System.nanoTime();
                    ((ConsumerAndRecords) KafkaChannel.this.consumerAndRecords.get()).commitOffsets();
                    KafkaChannel.this.counter.addToKafkaCommitTimer((System.nanoTime() - nanoTime) / 1000000);
                    if (KafkaChannel.logger.isDebugEnabled()) {
                        KafkaChannel.logger.debug(((ConsumerAndRecords) KafkaChannel.this.consumerAndRecords.get()).getCommittedOffsetsString());
                    }
                }
                int size = ((LinkedList) this.events.get()).size();
                if (size > 0) {
                    KafkaChannel.this.counter.addToEventTakeSuccessCount(size);
                    ((LinkedList) this.events.get()).clear();
                    return;
                }
                return;
            }
            if (!this.kafkaFutures.isPresent()) {
                this.kafkaFutures = Optional.of(new LinkedList());
            }
            try {
                long size2 = ((LinkedList) this.producerRecords.get()).size();
                long nanoTime2 = System.nanoTime();
                int i = 0;
                Iterator it = ((LinkedList) this.producerRecords.get()).iterator();
                while (it.hasNext()) {
                    i++;
                    ((LinkedList) this.kafkaFutures.get()).add(KafkaChannel.this.producer.send((ProducerRecord) it.next(), new ChannelCallback(i, nanoTime2)));
                }
                KafkaChannel.this.producer.flush();
                Iterator it2 = ((LinkedList) this.kafkaFutures.get()).iterator();
                while (it2.hasNext()) {
                    ((Future) it2.next()).get();
                }
                KafkaChannel.this.counter.addToKafkaEventSendTimer((System.nanoTime() - nanoTime2) / 1000000);
                KafkaChannel.this.counter.addToEventPutSuccessCount(size2);
                ((LinkedList) this.producerRecords.get()).clear();
                ((LinkedList) this.kafkaFutures.get()).clear();
            } catch (Exception e) {
                KafkaChannel.logger.warn("Sending events to Kafka failed", e);
                throw new ChannelException("Commit failed as send to Kafka failed", e);
            }
        }

        protected void doRollback() throws InterruptedException {
            if (this.type.equals(TransactionType.NONE)) {
                return;
            }
            if (this.type.equals(TransactionType.PUT)) {
                ((LinkedList) this.producerRecords.get()).clear();
                ((LinkedList) this.kafkaFutures.get()).clear();
            } else {
                KafkaChannel.this.counter.addToRollbackCounter(((LinkedList) this.events.get()).size());
                ((ConsumerAndRecords) KafkaChannel.this.consumerAndRecords.get()).failedEvents.addAll((Collection) this.events.get());
                ((LinkedList) this.events.get()).clear();
            }
        }

        private byte[] serializeValue(Event event, boolean z) throws IOException {
            byte[] body;
            if (z) {
                if (!this.tempOutStream.isPresent()) {
                    this.tempOutStream = Optional.of(new ByteArrayOutputStream());
                }
                if (!this.writer.isPresent()) {
                    this.writer = Optional.of(new SpecificDatumWriter(AvroFlumeEvent.class));
                }
                ((ByteArrayOutputStream) this.tempOutStream.get()).reset();
                AvroFlumeEvent avroFlumeEvent = new AvroFlumeEvent(KafkaChannel.toCharSeqMap(event.getHeaders()), ByteBuffer.wrap(event.getBody()));
                this.encoder = EncoderFactory.get().directBinaryEncoder((OutputStream) this.tempOutStream.get(), this.encoder);
                ((SpecificDatumWriter) this.writer.get()).write(avroFlumeEvent, this.encoder);
                this.encoder.flush();
                body = ((ByteArrayOutputStream) this.tempOutStream.get()).toByteArray();
            } else {
                body = event.getBody();
            }
            return body;
        }

        private Event deserializeValue(byte[] bArr, boolean z) throws IOException {
            Event withBody;
            if (z) {
                this.decoder = DecoderFactory.get().directBinaryDecoder(new ByteArrayInputStream(bArr), this.decoder);
                if (!this.reader.isPresent()) {
                    this.reader = Optional.of(new SpecificDatumReader(AvroFlumeEvent.class));
                }
                AvroFlumeEvent avroFlumeEvent = (AvroFlumeEvent) ((SpecificDatumReader) this.reader.get()).read((Object) null, this.decoder);
                withBody = EventBuilder.withBody(avroFlumeEvent.getBody().array(), KafkaChannel.toStringMap(avroFlumeEvent.getHeaders()));
            } else {
                withBody = EventBuilder.withBody(bArr, Collections.EMPTY_MAP);
            }
            return withBody;
        }
    }

    /* loaded from: input_file:org/apache/flume/channel/kafka/KafkaChannel$TransactionType.class */
    private enum TransactionType {
        PUT,
        TAKE,
        NONE
    }

    public void start() {
        logger.info("Starting Kafka Channel: {}", getName());
        if (this.migrateZookeeperOffsets && this.zookeeperConnect != null && !this.zookeeperConnect.isEmpty()) {
            migrateOffsets();
        }
        this.producer = new KafkaProducer<>(this.producerProps);
        logger.info("Topic = {}", this.topic.get());
        this.counter.start();
        super.start();
    }

    public void stop() {
        Iterator<ConsumerAndRecords> it = this.consumers.iterator();
        while (it.hasNext()) {
            try {
                decommissionConsumerAndRecords(it.next());
            } catch (Exception e) {
                logger.warn("Error while shutting down consumer.", e);
            }
        }
        this.producer.close();
        this.counter.stop();
        super.stop();
        logger.info("Kafka channel {} stopped.", getName());
    }

    protected BasicTransactionSemantics createTransaction() {
        return new KafkaTransaction();
    }

    public void configure(Context context) {
        translateOldProps(context);
        this.topicStr = context.getString(KafkaChannelConfiguration.TOPIC_CONFIG);
        if (this.topicStr == null || this.topicStr.isEmpty()) {
            this.topicStr = KafkaChannelConfiguration.DEFAULT_TOPIC;
            logger.info("Topic was not specified. Using {} as the topic.", this.topicStr);
        }
        this.topic.set(this.topicStr);
        this.groupId = context.getString("kafka.consumer.group.id");
        if (this.groupId == null || this.groupId.isEmpty()) {
            this.groupId = KafkaChannelConfiguration.DEFAULT_GROUP_ID;
            logger.info("Group ID was not specified. Using {} as the group id.", this.groupId);
        }
        String string = context.getString(KafkaChannelConfiguration.BOOTSTRAP_SERVERS_CONFIG);
        if (string == null || string.isEmpty()) {
            throw new ConfigurationException("Bootstrap Servers must be specified");
        }
        setProducerProps(context, string);
        setConsumerProps(context, string);
        this.parseAsFlumeEvent = context.getBoolean(KafkaChannelConfiguration.PARSE_AS_FLUME_EVENT, true).booleanValue();
        this.pollTimeout = context.getLong(KafkaChannelConfiguration.POLL_TIMEOUT, 500L).longValue();
        this.staticPartitionId = context.getInteger(KafkaChannelConfiguration.STATIC_PARTITION_CONF);
        this.partitionHeader = context.getString(KafkaChannelConfiguration.PARTITION_HEADER_NAME);
        this.migrateZookeeperOffsets = context.getBoolean(KafkaChannelConfiguration.MIGRATE_ZOOKEEPER_OFFSETS, true).booleanValue();
        this.zookeeperConnect = context.getString(KafkaChannelConfiguration.ZOOKEEPER_CONNECT_FLUME_KEY);
        if (logger.isDebugEnabled() && LogPrivacyUtil.allowLogPrintConfig()) {
            logger.debug("Kafka properties: {}", context);
        }
        if (this.counter == null) {
            this.counter = new KafkaChannelCounter(getName());
        }
    }

    private void translateOldProps(Context context) {
        Boolean bool;
        String string;
        if (!context.containsKey(KafkaChannelConfiguration.TOPIC_CONFIG)) {
            context.put(KafkaChannelConfiguration.TOPIC_CONFIG, context.getString("topic"));
            logger.warn("{} is deprecated. Please use the parameter {}", "topic", KafkaChannelConfiguration.TOPIC_CONFIG);
        }
        if (!context.containsKey(KafkaChannelConfiguration.BOOTSTRAP_SERVERS_CONFIG)) {
            String string2 = context.getString(KafkaChannelConfiguration.BROKER_LIST_FLUME_KEY);
            if (string2 == null || string2.isEmpty()) {
                throw new ConfigurationException("Bootstrap Servers must be specified");
            }
            context.put(KafkaChannelConfiguration.BOOTSTRAP_SERVERS_CONFIG, string2);
            logger.warn("{} is deprecated. Please use the parameter {}", KafkaChannelConfiguration.BROKER_LIST_FLUME_KEY, KafkaChannelConfiguration.BOOTSTRAP_SERVERS_CONFIG);
        }
        if (!context.containsKey("kafka.consumer.group.id") && (string = context.getString(KafkaChannelConfiguration.GROUP_ID_FLUME)) != null && !string.isEmpty()) {
            context.put("kafka.consumer.group.id", string);
            logger.warn("{} is deprecated. Please use the parameter {}", KafkaChannelConfiguration.GROUP_ID_FLUME, "kafka.consumer.group.id");
        }
        if (context.containsKey("kafka.consumer.auto.offset.reset") || (bool = context.getBoolean(KafkaChannelConfiguration.READ_SMALLEST_OFFSET)) == null) {
            return;
        }
        context.put("kafka.consumer.auto.offset.reset", bool.booleanValue() ? KafkaChannelConfiguration.DEFAULT_AUTO_OFFSET_RESET : "latest");
        logger.warn("{} is deprecated. Please use the parameter {}", KafkaChannelConfiguration.READ_SMALLEST_OFFSET, "kafka.consumer.auto.offset.reset");
    }

    private void setProducerProps(Context context, String str) {
        this.producerProps.clear();
        this.producerProps.put("acks", KafkaChannelConfiguration.DEFAULT_ACKS);
        this.producerProps.put("key.serializer", KafkaChannelConfiguration.DEFAULT_KEY_SERIALIZER);
        this.producerProps.put("value.serializer", KafkaChannelConfiguration.DEFAULT_VALUE_SERIAIZER);
        this.producerProps.putAll(context.getSubProperties(KafkaChannelConfiguration.KAFKA_PRODUCER_PREFIX));
        this.producerProps.put("bootstrap.servers", str);
    }

    protected Properties getProducerProps() {
        return this.producerProps;
    }

    private void setConsumerProps(Context context, String str) {
        this.consumerProps.clear();
        this.consumerProps.put("key.deserializer", KafkaChannelConfiguration.DEFAULT_KEY_DESERIALIZER);
        this.consumerProps.put("value.deserializer", KafkaChannelConfiguration.DEFAULT_VALUE_DESERIAIZER);
        this.consumerProps.put("auto.offset.reset", KafkaChannelConfiguration.DEFAULT_AUTO_OFFSET_RESET);
        this.consumerProps.putAll(context.getSubProperties(KafkaChannelConfiguration.KAFKA_CONSUMER_PREFIX));
        this.consumerProps.put("bootstrap.servers", str);
        this.consumerProps.put("group.id", this.groupId);
        this.consumerProps.put("enable.auto.commit", false);
    }

    protected Properties getConsumerProps() {
        return this.consumerProps;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized ConsumerAndRecords createConsumerAndRecords() {
        try {
            ConsumerAndRecords consumerAndRecords = new ConsumerAndRecords(new KafkaConsumer(this.consumerProps), this.channelUUID);
            logger.info("Created new consumer to connect to Kafka");
            consumerAndRecords.consumer.subscribe(Arrays.asList(this.topic.get()), new ChannelRebalanceListener(this.rebalanceFlag));
            consumerAndRecords.offsets = new HashMap();
            this.consumers.add(consumerAndRecords);
            return consumerAndRecords;
        } catch (Exception e) {
            throw new FlumeException("Unable to connect to Kafka", e);
        }
    }

    private void migrateOffsets() {
        ZkUtils apply = ZkUtils.apply(this.zookeeperConnect, 30000, 30000, JaasUtils.isZkSecurityEnabled());
        KafkaConsumer<String, byte[]> kafkaConsumer = new KafkaConsumer<>(this.consumerProps);
        try {
            Map<TopicPartition, OffsetAndMetadata> kafkaOffsets = getKafkaOffsets(kafkaConsumer);
            if (!kafkaOffsets.isEmpty()) {
                logger.info("Found Kafka offsets for topic {}. Will not migrate from zookeeper", this.topicStr);
                logger.debug("Offsets found: {}", kafkaOffsets);
                apply.close();
                kafkaConsumer.close();
                return;
            }
            logger.info("No Kafka offsets found. Migrating zookeeper offsets");
            Map<TopicPartition, OffsetAndMetadata> zookeeperOffsets = getZookeeperOffsets(apply);
            if (zookeeperOffsets.isEmpty()) {
                logger.warn("No offsets to migrate found in Zookeeper");
                apply.close();
                kafkaConsumer.close();
                return;
            }
            logger.info("Committing Zookeeper offsets to Kafka");
            logger.debug("Offsets to commit: {}", zookeeperOffsets);
            kafkaConsumer.commitSync(zookeeperOffsets);
            Map<TopicPartition, OffsetAndMetadata> kafkaOffsets2 = getKafkaOffsets(kafkaConsumer);
            logger.debug("Offsets committed: {}", kafkaOffsets2);
            if (!kafkaOffsets2.keySet().containsAll(zookeeperOffsets.keySet())) {
                throw new FlumeException("Offsets could not be committed");
            }
        } finally {
            apply.close();
            kafkaConsumer.close();
        }
    }

    private Map<TopicPartition, OffsetAndMetadata> getKafkaOffsets(KafkaConsumer<String, byte[]> kafkaConsumer) {
        HashMap hashMap = new HashMap();
        Iterator it = kafkaConsumer.partitionsFor(this.topicStr).iterator();
        while (it.hasNext()) {
            TopicPartition topicPartition = new TopicPartition(this.topicStr, ((PartitionInfo) it.next()).partition());
            OffsetAndMetadata committed = kafkaConsumer.committed(topicPartition);
            if (committed != null) {
                hashMap.put(topicPartition, committed);
            }
        }
        return hashMap;
    }

    private Map<TopicPartition, OffsetAndMetadata> getZookeeperOffsets(ZkUtils zkUtils) {
        HashMap hashMap = new HashMap();
        ZKGroupTopicDirs zKGroupTopicDirs = new ZKGroupTopicDirs(this.groupId, this.topicStr);
        for (String str : (List) JavaConverters.asJavaListConverter(zkUtils.getChildrenParentMayNotExist(zKGroupTopicDirs.consumerOffsetDir())).asJava()) {
            TopicPartition topicPartition = new TopicPartition(this.topicStr, Integer.valueOf(str).intValue());
            Option option = (Option) zkUtils.readDataMaybeNull(zKGroupTopicDirs.consumerOffsetDir() + "/" + str)._1();
            if (option.isDefined()) {
                hashMap.put(topicPartition, new OffsetAndMetadata(Long.valueOf((String) option.get()).longValue()));
            }
        }
        return hashMap;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void decommissionConsumerAndRecords(ConsumerAndRecords consumerAndRecords) {
        consumerAndRecords.consumer.wakeup();
        consumerAndRecords.consumer.close();
    }

    @VisibleForTesting
    void registerThread() {
        try {
            this.consumerAndRecords.get();
        } catch (Exception e) {
            logger.error(e.getMessage());
            e.printStackTrace();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Map<CharSequence, CharSequence> toCharSeqMap(Map<String, String> map) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, String> entry : map.entrySet()) {
            hashMap.put(entry.getKey(), entry.getValue());
        }
        return hashMap;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Map<String, String> toStringMap(Map<CharSequence, CharSequence> map) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<CharSequence, CharSequence> entry : map.entrySet()) {
            hashMap.put(entry.getKey().toString(), entry.getValue().toString());
        }
        return hashMap;
    }
}
