package org.apache.kafka.connect.mirror;

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
import org.apache.kafka.connect.util.ConnectorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/connect/mirror/MirrorCheckpointConnector.class */
public class MirrorCheckpointConnector extends SourceConnector {
    private static final Logger log = LoggerFactory.getLogger(MirrorCheckpointConnector.class);
    private Scheduler scheduler;
    private MirrorConnectorConfig config;
    private GroupFilter groupFilter;
    private AdminClient sourceAdminClient;
    private SourceAndTarget sourceAndTarget;
    private String connectorName;
    private List<String> knownConsumerGroups = Collections.emptyList();

    public void start(Map<String, String> map) {
        this.config = new MirrorConnectorConfig(map);
        if (this.config.enabled()) {
            this.connectorName = this.config.connectorName();
            this.sourceAndTarget = new SourceAndTarget(this.config.sourceClusterAlias(), this.config.targetClusterAlias());
            this.groupFilter = this.config.groupFilter();
            this.sourceAdminClient = AdminClient.create(this.config.sourceAdminConfig());
            this.scheduler = new Scheduler(MirrorCheckpointConnector.class, this.config.adminTimeout());
            this.scheduler.execute(this::createInternalTopics, "creating internal topics");
            this.scheduler.execute(this::loadInitialConsumerGroups, "loading initial consumer groups");
            this.scheduler.scheduleRepeatingDelayed(this::refreshConsumerGroups, this.config.refreshGroupsInterval(), "refreshing consumer groups");
            log.info("Started {} with {} consumer groups.", this.connectorName, Integer.valueOf(this.knownConsumerGroups.size()));
            log.debug("Started {} with consumer groups: {}", this.connectorName, this.knownConsumerGroups);
        }
    }

    public void stop() {
        if (this.config.enabled()) {
            Utils.closeQuietly(this.scheduler, "scheduler");
            Utils.closeQuietly(this.groupFilter, "group filter");
            Utils.closeQuietly(this.sourceAdminClient, "source admin client");
        }
    }

    public Class<? extends Task> taskClass() {
        return MirrorCheckpointTask.class;
    }

    public List<Map<String, String>> taskConfigs(int i) {
        if (!this.config.enabled() || this.knownConsumerGroups.isEmpty()) {
            return Collections.emptyList();
        }
        Stream stream = ConnectorUtils.groupPartitions(this.knownConsumerGroups, Math.min(i, this.knownConsumerGroups.size())).stream();
        MirrorConnectorConfig mirrorConnectorConfig = this.config;
        mirrorConnectorConfig.getClass();
        return (List) stream.map(mirrorConnectorConfig::taskConfigForConsumerGroups).collect(Collectors.toList());
    }

    public ConfigDef config() {
        return MirrorConnectorConfig.CONNECTOR_CONFIG_DEF;
    }

    public String version() {
        return "1";
    }

    private void refreshConsumerGroups() throws InterruptedException, ExecutionException {
        List<String> findConsumerGroups = findConsumerGroups();
        HashSet hashSet = new HashSet();
        hashSet.addAll(findConsumerGroups);
        hashSet.removeAll(this.knownConsumerGroups);
        HashSet hashSet2 = new HashSet();
        hashSet2.addAll(this.knownConsumerGroups);
        hashSet2.removeAll(findConsumerGroups);
        if (hashSet.isEmpty() && hashSet2.isEmpty()) {
            return;
        }
        log.info("Found {} consumer groups for {}. {} are new. {} were removed. Previously had {}.", new Object[]{Integer.valueOf(findConsumerGroups.size()), this.sourceAndTarget, Integer.valueOf(hashSet.size()), Integer.valueOf(hashSet2.size()), Integer.valueOf(this.knownConsumerGroups.size())});
        log.debug("Found new consumer groups: {}", hashSet);
        this.knownConsumerGroups = findConsumerGroups;
        this.context.requestTaskReconfiguration();
    }

    private void loadInitialConsumerGroups() throws InterruptedException, ExecutionException {
        this.knownConsumerGroups = findConsumerGroups();
    }

    private List<String> findConsumerGroups() throws InterruptedException, ExecutionException {
        return (List) listConsumerGroups().stream().filter(consumerGroupListing -> {
            return !consumerGroupListing.isSimpleConsumerGroup();
        }).map(consumerGroupListing2 -> {
            return consumerGroupListing2.groupId();
        }).filter(this::shouldReplicate).collect(Collectors.toList());
    }

    private Collection<ConsumerGroupListing> listConsumerGroups() throws InterruptedException, ExecutionException {
        return (Collection) this.sourceAdminClient.listConsumerGroups().valid().get();
    }

    private void createInternalTopics() {
        MirrorUtils.createSinglePartitionCompactedTopic(this.config.checkpointsTopic(), this.config.checkpointsTopicReplicationFactor(), this.config.targetAdminConfig());
    }

    boolean shouldReplicate(String str) {
        return this.groupFilter.shouldReplicateGroup(str);
    }
}
