package org.apache.kafka.streams.processor.internals;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskAssignmentException;
import org.apache.kafka.streams.processor.PartitionGrouper;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration;
import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
import org.apache.kafka.streams.processor.internals.assignment.ClientState;
import org.apache.kafka.streams.processor.internals.assignment.CopartitionedTopicsEnforcer;
import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.HostInfo;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.class */
public class StreamsPartitionAssignor implements ConsumerPartitionAssignor, Configurable {
    private Logger log;
    private String logPrefix;
    protected static final Comparator<TopicPartition> PARTITION_COMPARATOR = Comparator.comparing((v0) -> {
        return v0.topic();
    }).thenComparingInt((v0) -> {
        return v0.partition();
    });
    private String userEndPoint;
    private int numStandbyReplicas;
    private TaskManager taskManager;
    private PartitionGrouper partitionGrouper;
    private AtomicInteger assignmentErrorCode;
    protected int usedSubscriptionMetadataVersion = 6;
    private InternalTopicManager internalTopicManager;
    private CopartitionedTopicsEnforcer copartitionedTopicsEnforcer;
    private ConsumerPartitionAssignor.RebalanceProtocol rebalanceProtocol;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor$AssignedPartition.class */
    public static class AssignedPartition implements Comparable<AssignedPartition> {
        private final TaskId taskId;
        private final TopicPartition partition;

        AssignedPartition(TaskId taskId, TopicPartition topicPartition) {
            this.taskId = taskId;
            this.partition = topicPartition;
        }

        @Override // java.lang.Comparable
        public int compareTo(AssignedPartition assignedPartition) {
            return StreamsPartitionAssignor.PARTITION_COMPARATOR.compare(this.partition, assignedPartition.partition);
        }

        public boolean equals(Object obj) {
            return (obj instanceof AssignedPartition) && compareTo((AssignedPartition) obj) == 0;
        }

        public int hashCode() {
            return this.partition.hashCode();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor$ClientMetadata.class */
    public static class ClientMetadata {
        private final HostInfo hostInfo;
        private final Set<String> consumers;
        private final ClientState state;

        ClientMetadata(String str) {
            if (str != null) {
                String host = Utils.getHost(str);
                Integer port = Utils.getPort(str);
                if (host == null || port == null) {
                    throw new ConfigException(String.format("Error parsing host address %s. Expected format host:port.", str));
                }
                this.hostInfo = new HostInfo(host, port.intValue());
            } else {
                this.hostInfo = null;
            }
            this.consumers = new HashSet();
            this.state = new ClientState();
        }

        void addConsumer(String str, List<TopicPartition> list) {
            this.consumers.add(str);
            this.state.incrementCapacity();
            this.state.addOwnedPartitions(list, str);
        }

        void addPreviousTasks(SubscriptionInfo subscriptionInfo) {
            this.state.addPreviousActiveTasks(subscriptionInfo.prevTasks());
            this.state.addPreviousStandbyTasks(subscriptionInfo.standbyTasks());
        }

        public String toString() {
            return "ClientMetadata{hostInfo=" + this.hostInfo + ", consumers=" + this.consumers + ", state=" + this.state + '}';
        }
    }

    protected String userEndPoint() {
        return this.userEndPoint;
    }

    protected TaskManager taskManger() {
        return this.taskManager;
    }

    @Override // org.apache.kafka.common.Configurable
    public void configure(Map<String, ?> map) {
        AssignorConfiguration assignorConfiguration = new AssignorConfiguration(map);
        this.logPrefix = assignorConfiguration.logPrefix();
        this.log = new LogContext(this.logPrefix).logger(getClass());
        this.usedSubscriptionMetadataVersion = assignorConfiguration.configuredMetadataVersion(this.usedSubscriptionMetadataVersion);
        this.taskManager = assignorConfiguration.getTaskManager();
        this.assignmentErrorCode = assignorConfiguration.getAssignmentErrorCode(map);
        this.numStandbyReplicas = assignorConfiguration.getNumStandbyReplicas();
        this.partitionGrouper = assignorConfiguration.getPartitionGrouper();
        this.userEndPoint = assignorConfiguration.getUserEndPoint();
        this.internalTopicManager = assignorConfiguration.getInternalTopicManager();
        this.copartitionedTopicsEnforcer = assignorConfiguration.getCopartitionedTopicsEnforcer();
        this.rebalanceProtocol = assignorConfiguration.rebalanceProtocol();
    }

    @Override // org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
    public String name() {
        return StreamsMetricsImpl.GROUP_PREFIX_WO_DELIMITER;
    }

    @Override // org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
    public List<ConsumerPartitionAssignor.RebalanceProtocol> supportedProtocols() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(ConsumerPartitionAssignor.RebalanceProtocol.EAGER);
        if (this.rebalanceProtocol == ConsumerPartitionAssignor.RebalanceProtocol.COOPERATIVE) {
            arrayList.add(this.rebalanceProtocol);
        }
        return arrayList;
    }

    @Override // org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
    public ByteBuffer subscriptionUserData(Set<String> set) {
        Set<TaskId> cachedTasksIds = this.taskManager.cachedTasksIds();
        return new SubscriptionInfo(this.usedSubscriptionMetadataVersion, 6, this.taskManager.processId(), prepareForSubscription(this.taskManager, set, cachedTasksIds, this.rebalanceProtocol), cachedTasksIds, this.userEndPoint).encode();
    }

    protected static Set<TaskId> prepareForSubscription(TaskManager taskManager, Set<String> set, Set<TaskId> set2, ConsumerPartitionAssignor.RebalanceProtocol rebalanceProtocol) {
        Set<TaskId> emptySet;
        switch (rebalanceProtocol) {
            case EAGER:
                emptySet = taskManager.previousRunningTaskIds();
                set2.removeAll(emptySet);
                break;
            case COOPERATIVE:
                emptySet = Collections.emptySet();
                set2.removeAll(taskManager.activeTaskIds());
                break;
            default:
                throw new IllegalStateException("Streams partition assignor's rebalance protocol is unknown");
        }
        taskManager.updateSubscriptionsFromMetadata(set);
        taskManager.setRebalanceInProgress(true);
        return emptySet;
    }

    private Map<String, ConsumerPartitionAssignor.Assignment> errorAssignment(Map<UUID, ClientMetadata> map, String str, int i) {
        this.log.error("{} is unknown yet during rebalance, please make sure they have been pre-created before starting the Streams application.", str);
        HashMap hashMap = new HashMap();
        Iterator<ClientMetadata> it = map.values().iterator();
        while (it.hasNext()) {
            Iterator it2 = it.next().consumers.iterator();
            while (it2.hasNext()) {
                hashMap.put((String) it2.next(), new ConsumerPartitionAssignor.Assignment(Collections.emptyList(), new AssignmentInfo(6, Collections.emptyList(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyMap(), i).encode()));
            }
        }
        return hashMap;
    }

    @Override // org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
    public ConsumerPartitionAssignor.GroupAssignment assign(Cluster cluster, ConsumerPartitionAssignor.GroupSubscription groupSubscription) {
        boolean z;
        boolean z2;
        UUID processId;
        Map<String, ConsumerPartitionAssignor.Subscription> groupSubscription2 = groupSubscription.groupSubscription();
        HashMap hashMap = new HashMap();
        HashSet hashSet = new HashSet();
        UUID randomUUID = UUID.randomUUID();
        hashMap.put(randomUUID, new ClientMetadata(null));
        int i = 6;
        int i2 = 6;
        int i3 = -1;
        for (Map.Entry<String, ConsumerPartitionAssignor.Subscription> entry : groupSubscription2.entrySet()) {
            String key = entry.getKey();
            ConsumerPartitionAssignor.Subscription value = entry.getValue();
            SubscriptionInfo decode = SubscriptionInfo.decode(value.userData());
            int version = decode.version();
            i = updateMinReceivedVersion(version, i);
            i2 = updateMinSupportedVersion(decode.latestSupportedVersion(), i2);
            if (version > 6) {
                i3 = version;
                processId = randomUUID;
            } else {
                processId = decode.processId();
            }
            ClientMetadata clientMetadata = hashMap.get(processId);
            if (clientMetadata == null) {
                clientMetadata = new ClientMetadata(decode.userEndPoint());
                hashMap.put(decode.processId(), clientMetadata);
            }
            clientMetadata.addConsumer(key, value.ownedPartitions());
            hashSet.addAll(value.ownedPartitions());
            clientMetadata.addPreviousTasks(decode);
        }
        if (i3 == -1) {
            z = false;
            hashMap.remove(randomUUID);
        } else {
            if (i < 3) {
                throw new IllegalStateException("Received a future (version probing) subscription (version: " + i3 + ") and an incompatible pre Kafka 2.0 subscription (version: " + i + ") at the same time.");
            }
            z = true;
            this.log.info("Received a future (version probing) subscription (version: {}). Sending assignment back (with supported version {}).", Integer.valueOf(i3), Integer.valueOf(i2));
        }
        if (i < 6) {
            this.log.info("Downgrade metadata to version {}. Latest supported version is {}.", (Object) Integer.valueOf(i), (Object) 6);
        }
        if (i2 < 6) {
            this.log.info("Downgrade latest supported metadata to version {}. Latest supported version is {}.", (Object) Integer.valueOf(i2), (Object) 6);
        }
        this.log.debug("Constructed client metadata {} from the member subscriptions.", hashMap);
        Map<Integer, InternalTopologyBuilder.TopicsInfo> map = this.taskManager.builder().topicGroups();
        HashMap hashMap2 = new HashMap();
        for (InternalTopologyBuilder.TopicsInfo topicsInfo : map.values()) {
            for (String str : topicsInfo.sourceTopics) {
                if (!topicsInfo.repartitionSourceTopics.keySet().contains(str) && !cluster.topics().contains(str)) {
                    this.log.error("Missing source topic {} during assignment. Returning error {}.", str, AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.name());
                    return new ConsumerPartitionAssignor.GroupAssignment(errorAssignment(hashMap, str, AssignorError.INCOMPLETE_SOURCE_TOPIC_METADATA.code()));
                }
            }
            for (InternalTopicConfig internalTopicConfig : topicsInfo.repartitionSourceTopics.values()) {
                hashMap2.put(internalTopicConfig.name(), internalTopicConfig);
            }
        }
        do {
            z2 = false;
            Iterator<InternalTopologyBuilder.TopicsInfo> it = map.values().iterator();
            while (it.hasNext()) {
                for (String str2 : it.next().repartitionSourceTopics.keySet()) {
                    Integer num = null;
                    if (!hashMap2.get(str2).numberOfPartitions().isPresent()) {
                        for (InternalTopologyBuilder.TopicsInfo topicsInfo2 : map.values()) {
                            if (topicsInfo2.sinkTopics.contains(str2)) {
                                for (String str3 : topicsInfo2.sourceTopics) {
                                    Integer num2 = null;
                                    if (!hashMap2.containsKey(str3)) {
                                        Integer partitionCountForTopic = cluster.partitionCountForTopic(str3);
                                        if (partitionCountForTopic == null) {
                                            throw new IllegalStateException("No partition count found for source topic " + str3 + ", but it should have been.");
                                        }
                                        num2 = partitionCountForTopic;
                                    } else if (hashMap2.get(str3).numberOfPartitions().isPresent()) {
                                        num2 = hashMap2.get(str3).numberOfPartitions().get();
                                    }
                                    if (num2 != null && (num == null || num2.intValue() > num.intValue())) {
                                        num = num2;
                                    }
                                }
                            }
                        }
                        if (num == null) {
                            z2 = true;
                        } else {
                            hashMap2.get(str2).setNumberOfPartitions(num.intValue());
                        }
                    }
                }
            }
        } while (z2);
        ensureCopartitioning(this.taskManager.builder().copartitionGroups(), hashMap2, cluster);
        prepareTopic(hashMap2);
        HashMap hashMap3 = new HashMap();
        for (Map.Entry<String, InternalTopicConfig> entry2 : hashMap2.entrySet()) {
            String key2 = entry2.getKey();
            int intValue = entry2.getValue().numberOfPartitions().orElse(-1).intValue();
            for (int i4 = 0; i4 < intValue; i4++) {
                hashMap3.put(new TopicPartition(key2, i4), new PartitionInfo(key2, i4, null, new Node[0], new Node[0]));
            }
        }
        Cluster withPartitions = cluster.withPartitions(hashMap3);
        this.taskManager.setClusterMetadata(withPartitions);
        this.log.debug("Created repartition topics {} from the parsed topology.", hashMap3.values());
        HashSet<String> hashSet2 = new HashSet();
        HashMap hashMap4 = new HashMap();
        for (Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry3 : map.entrySet()) {
            hashSet2.addAll(entry3.getValue().sourceTopics);
            hashMap4.put(entry3.getKey(), entry3.getValue().sourceTopics);
        }
        Map<TaskId, Set<TopicPartition>> partitionGroups = this.partitionGrouper.partitionGroups(hashMap4, withPartitions);
        HashMap hashMap5 = new HashMap();
        HashSet hashSet3 = new HashSet();
        HashMap hashMap6 = new HashMap();
        for (Map.Entry<TaskId, Set<TopicPartition>> entry4 : partitionGroups.entrySet()) {
            TaskId key3 = entry4.getKey();
            Set<TopicPartition> value2 = entry4.getValue();
            for (TopicPartition topicPartition : value2) {
                hashMap5.put(topicPartition, key3);
                if (hashSet3.contains(topicPartition)) {
                    this.log.warn("Partition {} is assigned to more than one tasks: {}", topicPartition, partitionGroups);
                }
            }
            hashSet3.addAll(value2);
            ((Set) hashMap6.computeIfAbsent(Integer.valueOf(key3.topicGroupId), num3 -> {
                return new HashSet();
            })).add(key3);
        }
        for (String str4 : hashSet2) {
            List<PartitionInfo> partitionsForTopic = withPartitions.partitionsForTopic(str4);
            if (partitionsForTopic.isEmpty()) {
                this.log.warn("No partitions found for topic {}", str4);
            } else {
                for (PartitionInfo partitionInfo : partitionsForTopic) {
                    TopicPartition topicPartition2 = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
                    if (!hashSet3.contains(topicPartition2)) {
                        this.log.warn("Partition {} is not assigned to any tasks: {} Possible causes of a partition not getting assigned is that another topic defined in the topology has not been created when starting your streams application, resulting in no tasks created for this topology at all.", topicPartition2, partitionGroups);
                    }
                }
            }
        }
        HashMap hashMap7 = new HashMap();
        for (Map.Entry<Integer, InternalTopologyBuilder.TopicsInfo> entry5 : map.entrySet()) {
            int intValue2 = entry5.getKey().intValue();
            for (InternalTopicConfig internalTopicConfig2 : entry5.getValue().stateChangelogTopics.values()) {
                int i5 = -1;
                if (hashMap6.get(Integer.valueOf(intValue2)) != null) {
                    for (TaskId taskId : (Set) hashMap6.get(Integer.valueOf(intValue2))) {
                        if (i5 < taskId.partition + 1) {
                            i5 = taskId.partition + 1;
                        }
                    }
                    internalTopicConfig2.setNumberOfPartitions(i5);
                    hashMap7.put(internalTopicConfig2.name(), internalTopicConfig2);
                } else {
                    this.log.debug("No tasks found for topic group {}", Integer.valueOf(intValue2));
                }
            }
        }
        prepareTopic(hashMap7);
        this.log.debug("Created state changelog topics {} from the parsed topology.", hashMap7.values());
        HashMap hashMap8 = new HashMap();
        for (Map.Entry<UUID, ClientMetadata> entry6 : hashMap.entrySet()) {
            ClientState clientState = entry6.getValue().state;
            hashMap8.put(entry6.getKey(), clientState);
            if (!clientState.ownedPartitions().isEmpty()) {
                HashSet hashSet4 = new HashSet();
                Iterator<Map.Entry<TopicPartition, String>> it2 = clientState.ownedPartitions().entrySet().iterator();
                while (it2.hasNext()) {
                    TopicPartition key4 = it2.next().getKey();
                    TaskId taskId2 = (TaskId) hashMap5.get(key4);
                    if (taskId2 != null) {
                        hashSet4.add(taskId2);
                    } else {
                        this.log.error("No task found for topic partition {}", key4);
                    }
                }
                clientState.addPreviousActiveTasks(hashSet4);
            }
        }
        this.log.debug("Assigning tasks {} to clients {} with number of replicas {}", partitionGroups.keySet(), hashMap8, Integer.valueOf(this.numStandbyReplicas));
        new StickyTaskAssignor(hashMap8, partitionGroups.keySet()).assign(this.numStandbyReplicas);
        this.log.info("Assigned tasks to clients as {}{}.", Utils.NL, hashMap8.entrySet().stream().map((v0) -> {
            return v0.toString();
        }).collect(Collectors.joining(Utils.NL)));
        HashMap hashMap9 = new HashMap();
        HashMap hashMap10 = new HashMap();
        if (i >= 2) {
            for (Map.Entry<UUID, ClientMetadata> entry7 : hashMap.entrySet()) {
                HostInfo hostInfo = entry7.getValue().hostInfo;
                if (hostInfo != null) {
                    HashSet hashSet5 = new HashSet();
                    HashSet hashSet6 = new HashSet();
                    ClientState clientState2 = entry7.getValue().state;
                    Iterator<TaskId> it3 = clientState2.activeTasks().iterator();
                    while (it3.hasNext()) {
                        hashSet5.addAll(partitionGroups.get(it3.next()));
                    }
                    Iterator<TaskId> it4 = clientState2.standbyTasks().iterator();
                    while (it4.hasNext()) {
                        hashSet6.addAll(partitionGroups.get(it4.next()));
                    }
                    hashMap9.put(hostInfo, hashSet5);
                    hashMap10.put(hostInfo, hashSet6);
                }
            }
        }
        this.taskManager.setHostPartitionMappings(hashMap9, hashMap10);
        return new ConsumerPartitionAssignor.GroupAssignment(z ? versionProbingAssignment(hashMap, partitionGroups, hashMap9, hashMap10, hashSet, i, i2) : computeNewAssignment(hashMap, partitionGroups, hashMap9, hashMap10, hashSet, i, i2));
    }

    private Map<String, ConsumerPartitionAssignor.Assignment> computeNewAssignment(Map<UUID, ClientMetadata> map, Map<TaskId, Set<TopicPartition>> map2, Map<HostInfo, Set<TopicPartition>> map3, Map<HostInfo, Set<TopicPartition>> map4, Set<TopicPartition> set, int i, int i2) {
        Map<String, List<TaskId>> interleaveConsumerTasksByGroupId;
        boolean z = false;
        HashMap hashMap = new HashMap();
        for (ClientMetadata clientMetadata : map.values()) {
            ClientState clientState = clientMetadata.state;
            Set<String> set2 = clientMetadata.consumers;
            if (z || clientState.ownedPartitions().isEmpty()) {
                interleaveConsumerTasksByGroupId = interleaveConsumerTasksByGroupId(clientState.activeTasks(), set2);
            } else {
                Map<String, List<TaskId>> tryStickyAndBalancedTaskAssignmentWithinClient = tryStickyAndBalancedTaskAssignmentWithinClient(clientState, set2, map2, set);
                interleaveConsumerTasksByGroupId = tryStickyAndBalancedTaskAssignmentWithinClient;
                if (tryStickyAndBalancedTaskAssignmentWithinClient.equals(Collections.emptyMap())) {
                    z = true;
                    interleaveConsumerTasksByGroupId = interleaveConsumerTasksByGroupId(clientState.activeTasks(), set2);
                }
            }
            addClientAssignments(hashMap, clientMetadata, map2, map3, map4, set, interleaveConsumerTasksByGroupId, interleaveConsumerTasksByGroupId(clientState.standbyTasks(), set2), i, i2);
        }
        return hashMap;
    }

    private Map<String, ConsumerPartitionAssignor.Assignment> versionProbingAssignment(Map<UUID, ClientMetadata> map, Map<TaskId, Set<TopicPartition>> map2, Map<HostInfo, Set<TopicPartition>> map3, Map<HostInfo, Set<TopicPartition>> map4, Set<TopicPartition> set, int i, int i2) {
        HashMap hashMap = new HashMap();
        for (ClientMetadata clientMetadata : map.values()) {
            ClientState clientState = clientMetadata.state;
            addClientAssignments(hashMap, clientMetadata, map2, map3, map4, set, interleaveConsumerTasksByGroupId(clientState.activeTasks(), clientMetadata.consumers), interleaveConsumerTasksByGroupId(clientState.standbyTasks(), clientMetadata.consumers), i, i2);
        }
        return hashMap;
    }

    private void addClientAssignments(Map<String, ConsumerPartitionAssignor.Assignment> map, ClientMetadata clientMetadata, Map<TaskId, Set<TopicPartition>> map2, Map<HostInfo, Set<TopicPartition>> map3, Map<HostInfo, Set<TopicPartition>> map4, Set<TopicPartition> set, Map<String, List<TaskId>> map5, Map<String, List<TaskId>> map6, int i, int i2) {
        for (String str : clientMetadata.consumers) {
            List<TaskId> list = map5.get(str);
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            buildAssignedActiveTaskAndPartitionsList(str, clientMetadata.state, list, map2, set, arrayList, arrayList2);
            map.put(str, new ConsumerPartitionAssignor.Assignment(arrayList, new AssignmentInfo(i, i2, arrayList2, buildStandbyTaskMap(map6.get(str), map2), map3, map4, AssignorError.NONE.code()).encode()));
        }
    }

    private void buildAssignedActiveTaskAndPartitionsList(String str, ClientState clientState, List<TaskId> list, Map<TaskId, Set<TopicPartition>> map, Set<TopicPartition> set, List<TopicPartition> list2, List<TaskId> list3) {
        ArrayList<AssignedPartition> arrayList = new ArrayList();
        for (TaskId taskId : list) {
            ArrayList arrayList2 = new ArrayList();
            Iterator<TopicPartition> it = map.get(taskId).iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                TopicPartition next = it.next();
                String str2 = clientState.ownedPartitions().get(next);
                if ((str2 == null || !str2.equals(str)) && set.contains(next)) {
                    this.log.debug("Removing task {} from assignment until it is safely revoked", taskId);
                    clientState.removeFromAssignment(taskId);
                    arrayList2.clear();
                    break;
                }
                arrayList2.add(new AssignedPartition(taskId, next));
            }
            arrayList.addAll(arrayList2);
        }
        Collections.sort(arrayList);
        for (AssignedPartition assignedPartition : arrayList) {
            list3.add(assignedPartition.taskId);
            list2.add(assignedPartition.partition);
        }
    }

    private static Map<TaskId, Set<TopicPartition>> buildStandbyTaskMap(Collection<TaskId> collection, Map<TaskId, Set<TopicPartition>> map) {
        HashMap hashMap = new HashMap();
        for (TaskId taskId : collection) {
            hashMap.put(taskId, map.get(taskId));
        }
        return hashMap;
    }

    Map<String, List<TaskId>> tryStickyAndBalancedTaskAssignmentWithinClient(ClientState clientState, Set<String> set, Map<TaskId, Set<TopicPartition>> map, Set<TopicPartition> set2) {
        HashMap hashMap = new HashMap();
        LinkedList linkedList = new LinkedList();
        HashSet hashSet = new HashSet(set);
        int ceil = (int) Math.ceil(clientState.activeTaskCount() / set.size());
        Iterator<String> it = set.iterator();
        while (it.hasNext()) {
            hashMap.put(it.next(), new ArrayList());
        }
        for (TaskId taskId : clientState.activeTasks()) {
            Set<String> previousConsumersOfTaskPartitions = previousConsumersOfTaskPartitions(map.get(taskId), clientState.ownedPartitions(), set2);
            if (previousConsumersOfTaskPartitions.size() > 1) {
                this.log.warn("The partitions of task {} were claimed as owned by different StreamThreads. This indicates the mapping from partitions to tasks has changed!", taskId);
                return Collections.emptyMap();
            }
            if (previousConsumersOfTaskPartitions.isEmpty()) {
                this.log.debug("Task {} was not previously owned by any consumers still in the group. It's owner may have died or it may be a new task", taskId);
                linkedList.add(taskId);
            } else {
                String next = previousConsumersOfTaskPartitions.iterator().next();
                if (!set.contains(next)) {
                    this.log.debug("This client was assigned a task {} whose partition(s) were previously owned by another client, falling back to an interleaved assignment since a rebalance is inevitable.", taskId);
                    return Collections.emptyMap();
                }
                if (((List) hashMap.get(next)).size() >= ceil) {
                    this.log.debug("Cannot create a sticky and balanced assignment as this client's consumers owned more previous tasks than it has capacity for during this assignment, falling back to interleaved assignment since a realance is inevitable.");
                    return Collections.emptyMap();
                }
                ((List) hashMap.get(next)).add(taskId);
                if (((List) hashMap.get(next)).size() == ceil) {
                    hashSet.remove(next);
                }
            }
        }
        Collections.sort(linkedList);
        while (!linkedList.isEmpty()) {
            if (hashSet.isEmpty()) {
                throw new IllegalStateException("Some tasks could not be distributed");
            }
            Iterator it2 = hashSet.iterator();
            while (it2.hasNext()) {
                List list = (List) hashMap.get((String) it2.next());
                TaskId taskId2 = (TaskId) linkedList.poll();
                if (taskId2 == null) {
                    break;
                }
                list.add(taskId2);
                if (list.size() == ceil) {
                    it2.remove();
                }
            }
        }
        return hashMap;
    }

    Set<String> previousConsumersOfTaskPartitions(Set<TopicPartition> set, Map<TopicPartition, String> map, Set<TopicPartition> set2) {
        HashSet hashSet = new HashSet();
        for (TopicPartition topicPartition : set) {
            String str = map.get(topicPartition);
            if (str != null) {
                hashSet.add(str);
            } else if (set2.contains(topicPartition)) {
                hashSet.add("");
            }
        }
        return hashSet;
    }

    static Map<String, List<TaskId>> interleaveConsumerTasksByGroupId(Collection<TaskId> collection, Set<String> set) {
        LinkedList linkedList = new LinkedList(collection);
        Collections.sort(linkedList);
        TreeMap treeMap = new TreeMap();
        Iterator<String> it = set.iterator();
        while (it.hasNext()) {
            treeMap.put(it.next(), new ArrayList());
        }
        while (!linkedList.isEmpty()) {
            Iterator it2 = treeMap.entrySet().iterator();
            while (it2.hasNext()) {
                List list = (List) ((Map.Entry) it2.next()).getValue();
                TaskId taskId = (TaskId) linkedList.poll();
                if (taskId == null) {
                    break;
                }
                list.add(taskId);
            }
        }
        return treeMap;
    }

    private void validateMetadataVersions(int i, int i2) {
        if (i > this.usedSubscriptionMetadataVersion) {
            this.log.error("Leader sent back an assignment with version {} which was greater than our used version {}", Integer.valueOf(i), Integer.valueOf(this.usedSubscriptionMetadataVersion));
            throw new TaskAssignmentException("Sent a version " + this.usedSubscriptionMetadataVersion + " subscription but got an assignment with higher version " + i + ".");
        }
        if (i2 > 6) {
            this.log.error("Leader sent back assignment with commonly supported version {} that is greater than our actual latest supported version {}", (Object) Integer.valueOf(i2), (Object) 6);
            throw new TaskAssignmentException("Can't upgrade to metadata version greater than we support");
        }
    }

    protected boolean maybeUpdateSubscriptionVersion(int i, int i2) {
        if (i < 3) {
            this.log.debug("Received an assignment version {} that is less than the earliest version that allows version probing {}. If this is not during a rolling upgrade from version 2.0 or below, this is an error.", (Object) Integer.valueOf(i), (Object) 3);
            return false;
        }
        if (i2 > this.usedSubscriptionMetadataVersion) {
            this.log.info("Sent a version {} subscription and group's latest commonly supported version is {} (successful version probing and end of rolling upgrade). Upgrading subscription metadata version to {} for next rebalance.", Integer.valueOf(this.usedSubscriptionMetadataVersion), Integer.valueOf(i2), Integer.valueOf(i2));
            this.usedSubscriptionMetadataVersion = i2;
            return true;
        }
        if (i >= this.usedSubscriptionMetadataVersion) {
            return false;
        }
        this.log.info("Sent a version {} subscription and got version {} assignment back (successful version probing). Downgrade subscription metadata to commonly supported version {} and trigger new rebalance.", Integer.valueOf(this.usedSubscriptionMetadataVersion), Integer.valueOf(i), Integer.valueOf(i2));
        this.usedSubscriptionMetadataVersion = i2;
        return true;
    }

    @Override // org.apache.kafka.clients.consumer.ConsumerPartitionAssignor
    public void onAssignment(ConsumerPartitionAssignor.Assignment assignment, ConsumerGroupMetadata consumerGroupMetadata) {
        Map<HostInfo, Set<TopicPartition>> partitionsByHost;
        Map<HostInfo, Set<TopicPartition>> standbyPartitionByHost;
        ArrayList arrayList = new ArrayList(assignment.partitions());
        arrayList.sort(PARTITION_COMPARATOR);
        AssignmentInfo decode = AssignmentInfo.decode(assignment.userData());
        if (decode.errCode() != AssignorError.NONE.code()) {
            setAssignmentErrorCode(Integer.valueOf(decode.errCode()));
            return;
        }
        int version = decode.version();
        int commonlySupportedVersion = decode.commonlySupportedVersion();
        validateMetadataVersions(version, commonlySupportedVersion);
        if (maybeUpdateSubscriptionVersion(version, commonlySupportedVersion)) {
            setAssignmentErrorCode(Integer.valueOf(AssignorError.VERSION_PROBING.code()));
        }
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        HashMap hashMap3 = new HashMap();
        switch (version) {
            case 1:
                processVersionOneAssignment(this.logPrefix, decode, arrayList, hashMap, hashMap3);
                partitionsByHost = Collections.emptyMap();
                standbyPartitionByHost = Collections.emptyMap();
                break;
            case 2:
            case 3:
            case 4:
            case 5:
                processVersionTwoAssignment(this.logPrefix, decode, arrayList, hashMap, hashMap2, hashMap3);
                partitionsByHost = decode.partitionsByHost();
                standbyPartitionByHost = Collections.emptyMap();
                break;
            case 6:
                processVersionTwoAssignment(this.logPrefix, decode, arrayList, hashMap, hashMap2, hashMap3);
                partitionsByHost = decode.partitionsByHost();
                standbyPartitionByHost = decode.standbyPartitionByHost();
                break;
            default:
                throw new IllegalStateException("This code should never be reached. Please file a bug report at https://issues.apache.org/jira/projects/KAFKA/");
        }
        this.taskManager.setClusterMetadata(Cluster.empty().withPartitions(hashMap2));
        this.taskManager.setHostPartitionMappings(partitionsByHost, standbyPartitionByHost);
        this.taskManager.setPartitionsToTaskId(hashMap3);
        this.taskManager.setAssignmentMetadata(hashMap, decode.standbyTasks());
        this.taskManager.updateSubscriptionsFromAssignment(arrayList);
        this.taskManager.setRebalanceInProgress(false);
    }

    private static void processVersionOneAssignment(String str, AssignmentInfo assignmentInfo, List<TopicPartition> list, Map<TaskId, Set<TopicPartition>> map, Map<TopicPartition, TaskId> map2) {
        if (list.size() != assignmentInfo.activeTasks().size()) {
            throw new TaskAssignmentException(String.format("%sNumber of assigned partitions %d is not equal to the number of active taskIds %d, assignmentInfo=%s", str, Integer.valueOf(list.size()), Integer.valueOf(assignmentInfo.activeTasks().size()), assignmentInfo.toString()));
        }
        for (int i = 0; i < list.size(); i++) {
            TopicPartition topicPartition = list.get(i);
            TaskId taskId = assignmentInfo.activeTasks().get(i);
            map.computeIfAbsent(taskId, taskId2 -> {
                return new HashSet();
            }).add(topicPartition);
            map2.put(topicPartition, taskId);
        }
    }

    public static void processVersionTwoAssignment(String str, AssignmentInfo assignmentInfo, List<TopicPartition> list, Map<TaskId, Set<TopicPartition>> map, Map<TopicPartition, PartitionInfo> map2, Map<TopicPartition, TaskId> map3) {
        processVersionOneAssignment(str, assignmentInfo, list, map, map3);
        Iterator<Set<TopicPartition>> it = assignmentInfo.partitionsByHost().values().iterator();
        while (it.hasNext()) {
            for (TopicPartition topicPartition : it.next()) {
                map2.put(topicPartition, new PartitionInfo(topicPartition.topic(), topicPartition.partition(), null, new Node[0], new Node[0]));
            }
        }
    }

    private void prepareTopic(Map<String, InternalTopicConfig> map) {
        this.log.debug("Starting to validate internal topics {} in partition assignor.", map);
        HashMap hashMap = new HashMap();
        for (InternalTopicConfig internalTopicConfig : map.values()) {
            Optional<Integer> numberOfPartitions = internalTopicConfig.numberOfPartitions();
            if (!numberOfPartitions.isPresent()) {
                throw new StreamsException(String.format("%sTopic [%s] number of partitions not defined", this.logPrefix, internalTopicConfig.name()));
            }
            internalTopicConfig.setNumberOfPartitions(numberOfPartitions.get().intValue());
            hashMap.put(internalTopicConfig.name(), internalTopicConfig);
        }
        if (!hashMap.isEmpty()) {
            this.internalTopicManager.makeReady(hashMap);
        }
        this.log.debug("Completed validating internal topics {} in partition assignor.", map);
    }

    private void ensureCopartitioning(Collection<Set<String>> collection, Map<String, InternalTopicConfig> map, Cluster cluster) {
        Iterator<Set<String>> it = collection.iterator();
        while (it.hasNext()) {
            this.copartitionedTopicsEnforcer.enforce(it.next(), map, cluster);
        }
    }

    private int updateMinReceivedVersion(int i, int i2) {
        return Math.min(i, i2);
    }

    private int updateMinSupportedVersion(int i, int i2) {
        if (i < i2) {
            this.log.debug("Downgrade the current minimum supported version {} to the smaller seen supported version {}", Integer.valueOf(i2), Integer.valueOf(i));
            return i;
        }
        this.log.debug("Current minimum supported version remains at {}, last seen supported version was {}", Integer.valueOf(i2), Integer.valueOf(i));
        return i2;
    }

    protected void setAssignmentErrorCode(Integer num) {
        this.assignmentErrorCode.set(num.intValue());
    }

    void setRebalanceProtocol(ConsumerPartitionAssignor.RebalanceProtocol rebalanceProtocol) {
        this.rebalanceProtocol = rebalanceProtocol;
    }

    void setInternalTopicManager(InternalTopicManager internalTopicManager) {
        this.internalTopicManager = internalTopicManager;
    }
}
