package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.DeleteRecordsResult;
import org.apache.kafka.clients.admin.RecordsToDelete;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskIdFormatException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.state.HostInfo;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/TaskManager.class */
public class TaskManager {
    private final Logger log;
    private final UUID processId;
    private final AssignedStreamsTasks active;
    private final AssignedStandbyTasks standby;
    private final ChangelogReader changelogReader;
    private final String logPrefix;
    private final Consumer<byte[], byte[]> restoreConsumer;
    private final StreamThread.AbstractTaskCreator<StreamTask> taskCreator;
    private final StreamThread.AbstractTaskCreator<StandbyTask> standbyTaskCreator;
    private final StreamsMetadataState streamsMetadataState;
    private final Admin adminClient;
    private DeleteRecordsResult deleteRecordsResult;
    private Cluster cluster;
    private Consumer<byte[], byte[]> consumer;
    private boolean rebalanceInProgress = false;
    private boolean restoreConsumerAssignedStandbys = false;
    private Map<TopicPartition, TaskId> partitionsToTaskId = new HashMap();
    private Map<TaskId, Set<TopicPartition>> assignedActiveTasks = new HashMap();
    private Map<TaskId, Set<TopicPartition>> assignedStandbyTasks = new HashMap();
    private Map<TaskId, Set<TopicPartition>> addedActiveTasks = new HashMap();
    private Map<TaskId, Set<TopicPartition>> addedStandbyTasks = new HashMap();
    private Map<TaskId, Set<TopicPartition>> revokedActiveTasks = new HashMap();
    private Map<TaskId, Set<TopicPartition>> revokedStandbyTasks = new HashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    public TaskManager(ChangelogReader changelogReader, UUID uuid, String str, Consumer<byte[], byte[]> consumer, StreamsMetadataState streamsMetadataState, StreamThread.AbstractTaskCreator<StreamTask> abstractTaskCreator, StreamThread.AbstractTaskCreator<StandbyTask> abstractTaskCreator2, Admin admin, AssignedStreamsTasks assignedStreamsTasks, AssignedStandbyTasks assignedStandbyTasks) {
        this.changelogReader = changelogReader;
        this.processId = uuid;
        this.logPrefix = str;
        this.streamsMetadataState = streamsMetadataState;
        this.restoreConsumer = consumer;
        this.taskCreator = abstractTaskCreator;
        this.standbyTaskCreator = abstractTaskCreator2;
        this.active = assignedStreamsTasks;
        this.standby = assignedStandbyTasks;
        this.log = new LogContext(str).logger(getClass());
        this.adminClient = admin;
    }

    public Admin adminClient() {
        return this.adminClient;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void createTasks(Collection<TopicPartition> collection) {
        if (this.consumer == null) {
            throw new IllegalStateException(this.logPrefix + "consumer has not been initialized while adding stream tasks. This should not happen.");
        }
        if (!collection.isEmpty() && !this.assignedActiveTasks.isEmpty()) {
            resumeSuspended(collection);
        }
        if (!this.addedActiveTasks.isEmpty()) {
            addNewActiveTasks(this.addedActiveTasks);
        }
        if (!this.addedStandbyTasks.isEmpty()) {
            addNewStandbyTasks(this.addedStandbyTasks);
        }
        if (!this.addedActiveTasks.isEmpty() && this.restoreConsumerAssignedStandbys) {
            this.restoreConsumer.unsubscribe();
            this.restoreConsumerAssignedStandbys = false;
        }
        this.log.debug("Pausing all active task partitions until the underlying state stores are ready");
        pausePartitions();
    }

    private void resumeSuspended(Collection<TopicPartition> collection) {
        Set<TaskId> partitionsToTaskSet = partitionsToTaskSet(collection);
        partitionsToTaskSet.removeAll(this.addedActiveTasks.keySet());
        this.log.debug("Suspended tasks to be resumed: {}", partitionsToTaskSet);
        for (TaskId taskId : partitionsToTaskSet) {
            Set<TopicPartition> set = this.assignedActiveTasks.get(taskId);
            try {
                if (!this.active.maybeResumeSuspendedTask(taskId, set)) {
                    this.addedActiveTasks.put(taskId, set);
                }
            } catch (StreamsException e) {
                this.log.error("Failed to resume a suspended active task {} due to the following error:", taskId, e);
                throw e;
            }
        }
    }

    private void addNewActiveTasks(Map<TaskId, Set<TopicPartition>> map) {
        this.log.debug("New active tasks to be created: {}", map);
        Iterator<StreamTask> it = this.taskCreator.createTasks(this.consumer, map).iterator();
        while (it.hasNext()) {
            this.active.addNewTask(it.next());
        }
    }

    private void addNewStandbyTasks(Map<TaskId, Set<TopicPartition>> map) {
        this.log.debug("New standby tasks to be created: {}", map);
        Iterator<StandbyTask> it = this.standbyTaskCreator.createTasks(this.consumer, map).iterator();
        while (it.hasNext()) {
            this.standby.addNewTask(it.next());
        }
    }

    public Set<TaskId> cachedTasksIds() {
        HashSet hashSet = new HashSet();
        File[] listNonEmptyTaskDirectories = this.taskCreator.stateDirectory().listNonEmptyTaskDirectories();
        if (listNonEmptyTaskDirectories != null) {
            for (File file : listNonEmptyTaskDirectories) {
                try {
                    TaskId parse = TaskId.parse(file.getName());
                    if (new File(file, ".checkpoint").exists()) {
                        hashSet.add(parse);
                    }
                } catch (TaskIdFormatException e) {
                }
            }
        }
        return hashSet;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<TopicPartition> closeRevokedStandbyTasks() {
        List<TopicPartition> closeRevokedStandbyTasks = this.standby.closeRevokedStandbyTasks(this.revokedStandbyTasks);
        removeChangelogsFromRestoreConsumer(closeRevokedStandbyTasks, true);
        return closeRevokedStandbyTasks;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void closeRevokedSuspendedTasks() {
        RuntimeException closeNotAssignedSuspendedTasks = this.active.closeNotAssignedSuspendedTasks(this.revokedActiveTasks.keySet());
        if (closeNotAssignedSuspendedTasks != null) {
            throw closeNotAssignedSuspendedTasks;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<TaskId> suspendActiveTasksAndState(Collection<TopicPartition> collection) {
        AtomicReference atomicReference = new AtomicReference(null);
        ArrayList arrayList = new ArrayList();
        atomicReference.compareAndSet(null, this.active.suspendOrCloseTasks(partitionsToTaskSet(collection), arrayList));
        this.changelogReader.remove(arrayList);
        removeChangelogsFromRestoreConsumer(arrayList, false);
        Exception exc = (Exception) atomicReference.get();
        if (exc != null) {
            throw new StreamsException(this.logPrefix + "failed to suspend stream tasks", exc);
        }
        return this.active.suspendedTaskIds();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<TaskId> closeLostTasks() {
        HashSet hashSet = new HashSet(this.assignedActiveTasks.keySet());
        this.log.debug("Closing lost active tasks as zombies: {}", hashSet);
        RuntimeException closeAllTasksAsZombies = this.active.closeAllTasksAsZombies();
        this.log.debug("Clearing assigned active tasks: {}", this.assignedActiveTasks);
        this.assignedActiveTasks.clear();
        this.log.debug("Clearing the store changelog reader: {}", this.changelogReader);
        this.changelogReader.clear();
        if (!this.restoreConsumerAssignedStandbys) {
            this.log.debug("Clearing the restore consumer's assignment: {}", this.restoreConsumer.assignment());
            this.restoreConsumer.unsubscribe();
        }
        if (closeAllTasksAsZombies != null) {
            throw closeAllTasksAsZombies;
        }
        return hashSet;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void shutdown(boolean z) {
        AtomicReference atomicReference = new AtomicReference(null);
        try {
            this.active.shutdown(z);
        } catch (RuntimeException e) {
            atomicReference.compareAndSet(null, e);
        }
        this.standby.shutdown(z);
        try {
            this.restoreConsumer.unsubscribe();
        } catch (RuntimeException e2) {
            atomicReference.compareAndSet(null, e2);
        }
        this.taskCreator.close();
        this.standbyTaskCreator.close();
        RuntimeException runtimeException = (RuntimeException) atomicReference.get();
        if (runtimeException != null) {
            throw runtimeException;
        }
    }

    public Set<TaskId> previousRunningTaskIds() {
        return this.active.previousRunningTaskIds();
    }

    public Set<TaskId> activeTaskIds() {
        return this.active.allAssignedTaskIds();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<TaskId> standbyTaskIds() {
        return this.standby.allAssignedTaskIds();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<TaskId> revokedActiveTaskIds() {
        return this.revokedActiveTasks.keySet();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<TaskId> revokedStandbyTaskIds() {
        return this.revokedStandbyTasks.keySet();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<TaskId> previousActiveTaskIds() {
        HashSet hashSet = new HashSet(this.assignedActiveTasks.keySet());
        hashSet.addAll(this.revokedActiveTasks.keySet());
        hashSet.removeAll(this.addedActiveTasks.keySet());
        return hashSet;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<TaskId> previousStandbyTaskIds() {
        HashSet hashSet = new HashSet(this.assignedStandbyTasks.keySet());
        hashSet.addAll(this.revokedStandbyTasks.keySet());
        hashSet.removeAll(this.addedStandbyTasks.keySet());
        return hashSet;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamTask activeTask(TopicPartition topicPartition) {
        return this.active.runningTaskFor(topicPartition);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StandbyTask standbyTask(TopicPartition topicPartition) {
        return this.standby.runningTaskFor(topicPartition);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<TaskId, StreamTask> activeTasks() {
        return this.active.runningTaskMap();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<TaskId, StandbyTask> standbyTasks() {
        return this.standby.runningTaskMap();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setConsumer(Consumer<byte[], byte[]> consumer) {
        this.consumer = consumer;
    }

    public UUID processId() {
        return this.processId;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public InternalTopologyBuilder builder() {
        return this.taskCreator.builder();
    }

    void pausePartitions() {
        this.log.trace("Pausing partitions: {}", this.consumer.assignment());
        this.consumer.pause(this.consumer.assignment());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<StreamTask> allStreamsTasks() {
        return this.active.allTasks();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<TaskId> restoringTaskIds() {
        return this.active.restoringTaskIds();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<StandbyTask> allStandbyTasks() {
        return this.standby.allTasks();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean updateNewAndRestoringTasks() {
        this.active.initializeNewTasks();
        this.standby.initializeNewTasks();
        if (this.active.hasRestoringTasks()) {
            Collection<TopicPartition> restore = this.changelogReader.restore(this.active);
            this.active.updateRestored(restore);
            removeChangelogsFromRestoreConsumer(restore, false);
        } else {
            this.active.clearRestoringPartitions();
        }
        if (!this.active.allTasksRunning()) {
            return false;
        }
        Set<TopicPartition> assignment = this.consumer.assignment();
        this.log.trace("Resuming partitions {}", assignment);
        this.consumer.resume(assignment);
        assignStandbyPartitions();
        return this.standby.allTasksRunning();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean hasActiveRunningTasks() {
        return this.active.hasRunningTasks();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean hasStandbyRunningTasks() {
        return this.standby.hasRunningTasks();
    }

    private void assignStandbyPartitions() {
        Collection<StandbyTask> running = this.standby.running();
        HashMap hashMap = new HashMap();
        Iterator<StandbyTask> it = running.iterator();
        while (it.hasNext()) {
            hashMap.putAll(it.next().checkpointedOffsets());
        }
        this.log.debug("Assigning and seeking restoreConsumer to {}", hashMap);
        this.restoreConsumerAssignedStandbys = true;
        this.restoreConsumer.assign(hashMap.keySet());
        for (Map.Entry entry : hashMap.entrySet()) {
            TopicPartition topicPartition = (TopicPartition) entry.getKey();
            long longValue = ((Long) entry.getValue()).longValue();
            if (longValue >= 0) {
                this.restoreConsumer.seek(topicPartition, longValue);
            } else {
                this.restoreConsumer.seekToBeginning(Collections.singleton(topicPartition));
            }
        }
    }

    public void setRebalanceInProgress(boolean z) {
        this.rebalanceInProgress = z;
    }

    public void setClusterMetadata(Cluster cluster) {
        this.cluster = cluster;
    }

    public void setHostPartitionMappings(Map<HostInfo, Set<TopicPartition>> map, Map<HostInfo, Set<TopicPartition>> map2) {
        this.streamsMetadataState.onChange(map, map2, this.cluster);
    }

    public void setPartitionsToTaskId(Map<TopicPartition, TaskId> map) {
        this.partitionsToTaskId = map;
    }

    public void setAssignmentMetadata(Map<TaskId, Set<TopicPartition>> map, Map<TaskId, Set<TopicPartition>> map2) {
        this.addedActiveTasks.clear();
        for (Map.Entry<TaskId, Set<TopicPartition>> entry : map.entrySet()) {
            if (!this.assignedActiveTasks.containsKey(entry.getKey())) {
                this.addedActiveTasks.put(entry.getKey(), entry.getValue());
            }
        }
        this.addedStandbyTasks.clear();
        for (Map.Entry<TaskId, Set<TopicPartition>> entry2 : map2.entrySet()) {
            if (!this.assignedStandbyTasks.containsKey(entry2.getKey())) {
                this.addedStandbyTasks.put(entry2.getKey(), entry2.getValue());
            }
        }
        this.revokedActiveTasks.clear();
        for (Map.Entry<TaskId, Set<TopicPartition>> entry3 : this.assignedActiveTasks.entrySet()) {
            if (!map.containsKey(entry3.getKey())) {
                this.revokedActiveTasks.put(entry3.getKey(), entry3.getValue());
            }
        }
        this.revokedStandbyTasks.clear();
        for (Map.Entry<TaskId, Set<TopicPartition>> entry4 : this.assignedStandbyTasks.entrySet()) {
            if (!map2.containsKey(entry4.getKey())) {
                this.revokedStandbyTasks.put(entry4.getKey(), entry4.getValue());
            }
        }
        this.log.debug("Assigning metadata with: \tpreviousAssignedActiveTasks: {},\n\tpreviousAssignedStandbyTasks: {}\nThe updated task states are: \n\tassignedActiveTasks {},\n\tassignedStandbyTasks {},\n\taddedActiveTasks {},\n\taddedStandbyTasks {},\n\trevokedActiveTasks {},\n\trevokedStandbyTasks {}", this.assignedActiveTasks, this.assignedStandbyTasks, map, map2, this.addedActiveTasks, this.addedStandbyTasks, this.revokedActiveTasks, this.revokedStandbyTasks);
        this.assignedActiveTasks = map;
        this.assignedStandbyTasks = map2;
    }

    public void updateSubscriptionsFromAssignment(List<TopicPartition> list) {
        if (builder().sourceTopicPattern() != null) {
            HashSet hashSet = new HashSet();
            Iterator<TopicPartition> it = list.iterator();
            while (it.hasNext()) {
                hashSet.add(it.next().topic());
            }
            Set<String> subscriptionUpdates = builder().subscriptionUpdates();
            if (subscriptionUpdates.containsAll(hashSet)) {
                return;
            }
            hashSet.addAll(subscriptionUpdates);
            builder().updateSubscribedTopics(hashSet, this.logPrefix);
        }
    }

    public void updateSubscriptionsFromMetadata(Set<String> set) {
        if (builder().sourceTopicPattern() == null || builder().subscriptionUpdates().equals(set)) {
            return;
        }
        builder().updateSubscribedTopics(set, this.logPrefix);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int commitAll() {
        if (this.rebalanceInProgress) {
            return -1;
        }
        return this.active.commit() + this.standby.commit();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int process(long j) {
        return this.active.process(j);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int punctuate() {
        return this.active.punctuate();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int maybeCommitActiveTasksPerUserRequested() {
        if (this.rebalanceInProgress) {
            return -1;
        }
        return this.active.maybeCommitPerUserRequested();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void maybePurgeCommitedRecords() {
        if (this.deleteRecordsResult == null || this.deleteRecordsResult.all().isDone()) {
            if (this.deleteRecordsResult != null && this.deleteRecordsResult.all().isCompletedExceptionally()) {
                this.log.debug("Previous delete-records request has failed: {}. Try sending the new request now", this.deleteRecordsResult.lowWatermarks());
            }
            HashMap hashMap = new HashMap();
            for (Map.Entry<TopicPartition, Long> entry : this.active.recordsToDelete().entrySet()) {
                hashMap.put(entry.getKey(), RecordsToDelete.beforeOffset(entry.getValue().longValue()));
            }
            if (hashMap.isEmpty()) {
                return;
            }
            this.deleteRecordsResult = this.adminClient.deleteRecords(hashMap);
            this.log.trace("Sent delete-records request: {}", hashMap);
        }
    }

    public String toString() {
        return toString("");
    }

    public String toString(String str) {
        StringBuilder sb = new StringBuilder();
        sb.append("TaskManager\n");
        sb.append(str).append("\tMetadataState:\n");
        sb.append(this.streamsMetadataState.toString(str + "\t\t"));
        sb.append(str).append("\tActive tasks:\n");
        sb.append(this.active.toString(str + "\t\t"));
        sb.append(str).append("\tStandby tasks:\n");
        sb.append(this.standby.toString(str + "\t\t"));
        return sb.toString();
    }

    private void removeChangelogsFromRestoreConsumer(Collection<TopicPartition> collection, boolean z) {
        if (collection.isEmpty() || z != this.restoreConsumerAssignedStandbys) {
            return;
        }
        HashSet hashSet = new HashSet(this.restoreConsumer.assignment());
        hashSet.removeAll(collection);
        this.restoreConsumer.assign(hashSet);
    }

    private Set<TaskId> partitionsToTaskSet(Collection<TopicPartition> collection) {
        HashSet hashSet = new HashSet();
        for (TopicPartition topicPartition : collection) {
            TaskId taskId = this.partitionsToTaskId.get(topicPartition);
            if (taskId == null) {
                this.log.error("Failed to lookup taskId for partition {}", topicPartition);
                throw new StreamsException("Found partition in assignment with no corresponding task");
            }
            hashSet.add(taskId);
        }
        return hashSet;
    }

    Map<TaskId, Set<TopicPartition>> assignedActiveTasks() {
        return this.assignedActiveTasks;
    }

    Map<TaskId, Set<TopicPartition>> assignedStandbyTasks() {
        return this.assignedStandbyTasks;
    }
}
