package kafka.javaapi.consumer;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import kafka.consumer.KafkaStream;
import kafka.consumer.OriginalConsumerConfig;
import kafka.consumer.TopicFilter;
import kafka.serializer.Decoder;
import org.apache.kafka.clients.producer.PulsarClientKafkaConfig;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.com.google.common.collect.Lists;
import org.apache.pulsar.shade.com.google.common.collect.Maps;
import org.apache.pulsar.shade.com.google.common.collect.Sets;
import org.apache.pulsar.shade.io.netty.util.concurrent.DefaultThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:kafka/javaapi/consumer/ConsumerConnector.class */
public class ConsumerConnector {
    private static final Logger log = LoggerFactory.getLogger(ConsumerConnector.class);
    private final PulsarClient client;
    private final boolean isAutoCommit;
    private final ConsumerBuilder<byte[]> consumerBuilder;
    private String clientId;
    private String groupId;
    private final Set<KafkaStream> topicStreams;
    private SubscriptionInitialPosition strategy;
    private final ScheduledExecutorService executor;

    public ConsumerConnector(OriginalConsumerConfig originalConsumerConfig) {
        this.strategy = null;
        Preconditions.checkNotNull(originalConsumerConfig, "ConsumerConfig can't be null");
        this.clientId = originalConsumerConfig.clientId();
        this.groupId = originalConsumerConfig.groupId();
        this.isAutoCommit = originalConsumerConfig.autoCommitEnable();
        if ("largest".equalsIgnoreCase(originalConsumerConfig.autoOffsetReset())) {
            this.strategy = SubscriptionInitialPosition.Latest;
        } else if ("smallest".equalsIgnoreCase(originalConsumerConfig.autoOffsetReset())) {
            this.strategy = SubscriptionInitialPosition.Earliest;
        }
        String str = !originalConsumerConfig.consumerId().isEmpty() ? (String) originalConsumerConfig.consumerId().get() : null;
        int queuedMaxMessages = originalConsumerConfig.queuedMaxMessages();
        String zkConnect = originalConsumerConfig.zkConnect();
        Properties properties = (originalConsumerConfig.props() == null || originalConsumerConfig.props().props() == null) ? new Properties() : originalConsumerConfig.props().props();
        try {
            this.client = PulsarClientKafkaConfig.getClientBuilder(properties).serviceUrl(zkConnect).build();
            this.topicStreams = Sets.newConcurrentHashSet();
            this.consumerBuilder = this.client.newConsumer();
            this.consumerBuilder.subscriptionName(this.groupId);
            if (properties.containsKey("queued.max.message.chunks") && originalConsumerConfig.queuedMaxMessages() > 0) {
                this.consumerBuilder.receiverQueueSize(queuedMaxMessages);
            }
            if (str != null) {
                this.consumerBuilder.consumerName(str);
            }
            if (properties.containsKey("auto.commit.interval.ms") && originalConsumerConfig.autoCommitIntervalMs() > 0) {
                this.consumerBuilder.acknowledgmentGroupTime(originalConsumerConfig.autoCommitIntervalMs(), TimeUnit.MILLISECONDS);
            }
            this.executor = Executors.newScheduledThreadPool(1, new DefaultThreadFactory("pulsar-kafka"));
        } catch (PulsarClientException e) {
            throw new IllegalArgumentException("Failed to create pulsar-client using url = " + zkConnect + ", properties = " + properties, e);
        }
    }

    public <K, V> Map<String, List<KafkaStream<byte[], byte[]>>> createMessageStreams(Map<String, Integer> map) {
        return createMessageStreamsByFilter((TopicFilter) null, map, (Decoder) null, (Decoder) null);
    }

    public <K, V> Map<String, List<KafkaStream<K, V>>> createMessageStreams(Map<String, Integer> map, Decoder<K> decoder, Decoder<V> decoder2) {
        return createMessageStreamsByFilter((TopicFilter) null, map, decoder, decoder2);
    }

    public <K, V> Map<String, List<KafkaStream<K, V>>> createMessageStreamsByFilter(TopicFilter topicFilter, Map<String, Integer> map, Decoder<K> decoder, Decoder<V> decoder2) {
        HashMap newHashMap = Maps.newHashMap();
        map.forEach((str, num) -> {
            try {
                Consumer<byte[]> subscribe = this.consumerBuilder.topic(new String[]{str}).subscribe();
                resetOffsets(subscribe, this.strategy);
                log.info("Creating stream for {}-{} with config {}", new Object[]{str, this.groupId, this.consumerBuilder.toString()});
                for (int i = 0; i < num.intValue(); i++) {
                    KafkaStream kafkaStream = new KafkaStream(decoder, decoder2, subscribe, this.isAutoCommit, this.clientId);
                    ((List) newHashMap.computeIfAbsent(str, str -> {
                        return Lists.newArrayList();
                    })).add(kafkaStream);
                    this.topicStreams.add(kafkaStream);
                }
            } catch (PulsarClientException e) {
                log.error("Failed to subscribe on topic {} with group-id {}, {}", new Object[]{str, this.groupId, e.getMessage(), e});
                throw new RuntimeException("Failed to subscribe on topic " + str, e);
            }
        });
        return newHashMap;
    }

    public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter) {
        throw new UnsupportedOperationException("method not supported");
    }

    public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter, int i) {
        throw new UnsupportedOperationException("method not supported");
    }

    public <K, V> List<KafkaStream<K, V>> createMessageStreamsByFilter(TopicFilter topicFilter, int i, Decoder<K> decoder, Decoder<V> decoder2) {
        throw new UnsupportedOperationException("method not supported");
    }

    public List<CompletableFuture<Void>> commitOffsetsAsync() {
        return (List) this.topicStreams.stream().map(kafkaStream -> {
            return kafkaStream.commitOffsets();
        }).collect(Collectors.toList());
    }

    public void commitOffsets() {
        commitOffsetsAsync();
    }

    public void commitOffsets(boolean z) {
        FutureUtil.waitForAll(commitOffsetsAsync()).handle((r8, th) -> {
            if (th == null) {
                return null;
            }
            if (log.isDebugEnabled()) {
                log.debug("Failed to commit offset {}, retrying {}", th.getMessage(), Boolean.valueOf(z));
            }
            if (!z) {
                return null;
            }
            this.executor.schedule(() -> {
                commitOffsets(z);
            }, 30L, TimeUnit.SECONDS);
            return null;
        });
    }

    public void shutdown() {
        if (this.executor != null) {
            this.executor.shutdown();
        }
        if (this.topicStreams != null) {
            this.topicStreams.forEach(kafkaStream -> {
                try {
                    kafkaStream.close();
                } catch (Exception e) {
                    log.warn("Failed to close stream {}, {}", kafkaStream, e.getMessage());
                }
            });
        }
        try {
            this.client.close();
        } catch (PulsarClientException e) {
            log.warn("Failed to close client {}", e.getMessage());
        }
    }

    private void resetOffsets(Consumer<byte[]> consumer, SubscriptionInitialPosition subscriptionInitialPosition) {
        if (subscriptionInitialPosition == null) {
            return;
        }
        log.info("Resetting partition {} for group-id {} and seeking to {} position", new Object[]{consumer.getTopic(), consumer.getSubscription(), subscriptionInitialPosition});
        try {
            if (subscriptionInitialPosition == SubscriptionInitialPosition.Earliest) {
                consumer.seek(MessageId.earliest);
            } else {
                consumer.seek(MessageId.latest);
            }
        } catch (PulsarClientException e) {
            log.warn("Failed to reset offset for consumer {} to {}, {}", new Object[]{consumer.getTopic(), subscriptionInitialPosition, e.getMessage(), e});
        }
    }

    @VisibleForTesting
    public ConsumerBuilder<byte[]> getConsumerBuilder() {
        return this.consumerBuilder;
    }
}
