package org.apache.kafka.streams.processor.internals;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/StoreChangelogReader.class */
public class StoreChangelogReader implements ChangelogReader {
    private final Logger log;
    private final Consumer<byte[], byte[]> restoreConsumer;
    private final StateRestoreListener userStateRestoreListener;
    private final Map<TopicPartition, Long> restoreToOffsets = new HashMap();
    private final Map<String, List<PartitionInfo>> partitionInfo = new HashMap();
    private final Map<TopicPartition, StateRestorer> stateRestorers = new HashMap();
    private final Set<TopicPartition> needsRestoring = new HashSet();
    private final Set<TopicPartition> needsInitializing = new HashSet();
    private final Set<TopicPartition> completedRestorers = new HashSet();
    private final Duration pollTime;

    public StoreChangelogReader(Consumer<byte[], byte[]> consumer, Duration duration, StateRestoreListener stateRestoreListener, LogContext logContext) {
        this.restoreConsumer = consumer;
        this.pollTime = duration;
        this.log = logContext.logger(getClass());
        this.userStateRestoreListener = stateRestoreListener;
    }

    @Override // org.apache.kafka.streams.processor.internals.ChangelogReader
    public void register(StateRestorer stateRestorer) {
        if (this.stateRestorers.containsKey(stateRestorer.partition())) {
            this.log.debug("Skip re-adding restorer for changelog {}", stateRestorer.partition());
        } else {
            stateRestorer.setUserRestoreListener(this.userStateRestoreListener);
            this.stateRestorers.put(stateRestorer.partition(), stateRestorer);
            this.log.trace("Added restorer for changelog {}", stateRestorer.partition());
        }
        this.needsInitializing.add(stateRestorer.partition());
    }

    @Override // org.apache.kafka.streams.processor.internals.ChangelogReader
    public Collection<TopicPartition> restore(RestoringTasks restoringTasks) {
        if (!this.needsInitializing.isEmpty()) {
            initialize(restoringTasks);
        }
        if (checkForCompletedRestoration()) {
            return this.completedRestorers;
        }
        try {
            ConsumerRecords<byte[], byte[]> poll = this.restoreConsumer.poll(this.pollTime);
            for (TopicPartition topicPartition : this.needsRestoring) {
                StateRestorer stateRestorer = this.stateRestorers.get(topicPartition);
                long processNext = processNext(poll.records(topicPartition), stateRestorer, this.restoreToOffsets.get(topicPartition));
                stateRestorer.setRestoredOffset(processNext);
                if (stateRestorer.hasCompleted(processNext, this.restoreToOffsets.get(topicPartition).longValue())) {
                    stateRestorer.restoreDone();
                    this.restoreToOffsets.remove(topicPartition);
                    this.completedRestorers.add(topicPartition);
                }
            }
        } catch (InvalidOffsetException e) {
            this.log.warn("Restoring StreamTasks failed. Deleting StreamTasks stores to recreate from scratch.", (Throwable) e);
            Set<TopicPartition> partitions = e.partitions();
            for (TopicPartition topicPartition2 : partitions) {
                StreamTask restoringTaskFor = restoringTasks.restoringTaskFor(topicPartition2);
                this.log.info("Reinitializing StreamTask {} for changelog {}", restoringTaskFor, topicPartition2);
                this.needsInitializing.remove(topicPartition2);
                this.needsRestoring.remove(topicPartition2);
                this.stateRestorers.get(topicPartition2).setCheckpointOffset(-1L);
                restoringTaskFor.reinitializeStateStoresForPartitions(e.partitions());
            }
            this.restoreConsumer.seekToBeginning(partitions);
        }
        this.needsRestoring.removeAll(this.completedRestorers);
        checkForCompletedRestoration();
        return this.completedRestorers;
    }

    private void initialize(RestoringTasks restoringTasks) {
        if (!this.restoreConsumer.subscription().isEmpty()) {
            throw new StreamsException("Restore consumer should not be subscribed to any topics (" + this.restoreConsumer.subscription() + ")");
        }
        refreshChangelogInfo();
        HashSet hashSet = new HashSet();
        for (TopicPartition topicPartition : this.needsInitializing) {
            if (hasPartition(topicPartition)) {
                hashSet.add(topicPartition);
            }
        }
        try {
            this.restoreConsumer.endOffsets(hashSet).forEach((topicPartition2, l) -> {
                if (l != null) {
                    this.restoreToOffsets.put(topicPartition2, Long.valueOf(Math.min(l.longValue(), this.stateRestorers.get(topicPartition2).offsetLimit())));
                } else {
                    this.log.info("End offset cannot be found form the returned metadata; removing this partition from the current run loop");
                    hashSet.remove(topicPartition2);
                }
            });
            Iterator<TopicPartition> it = hashSet.iterator();
            while (it.hasNext()) {
                TopicPartition next = it.next();
                Long l2 = this.restoreToOffsets.get(next);
                StateRestorer stateRestorer = this.stateRestorers.get(next);
                if (stateRestorer.checkpoint() >= l2.longValue()) {
                    stateRestorer.setRestoredOffset(stateRestorer.checkpoint());
                    it.remove();
                    this.completedRestorers.add(next);
                } else if (l2.longValue() == 0) {
                    stateRestorer.setRestoredOffset(0L);
                    it.remove();
                    this.completedRestorers.add(next);
                } else {
                    stateRestorer.setEndingOffset(l2.longValue());
                }
                this.needsInitializing.remove(next);
            }
            if (hashSet.isEmpty()) {
                return;
            }
            startRestoration(hashSet, restoringTasks);
        } catch (TimeoutException e) {
            this.log.debug("Could not fetch end offset for {}; will fall back to partition by partition fetching", hashSet);
        }
    }

    private void startRestoration(Set<TopicPartition> set, RestoringTasks restoringTasks) {
        this.log.debug("Start restoring state stores from changelog topics {}", set);
        HashSet hashSet = new HashSet(this.restoreConsumer.assignment());
        hashSet.addAll(set);
        this.restoreConsumer.assign(hashSet);
        ArrayList<StateRestorer> arrayList = new ArrayList();
        for (TopicPartition topicPartition : set) {
            StateRestorer stateRestorer = this.stateRestorers.get(topicPartition);
            if (stateRestorer.checkpoint() != -1) {
                this.log.trace("Found checkpoint {} from changelog {} for store {}.", Long.valueOf(stateRestorer.checkpoint()), topicPartition, stateRestorer.storeName());
                this.restoreConsumer.seek(topicPartition, stateRestorer.checkpoint());
                logRestoreOffsets(topicPartition, stateRestorer.checkpoint(), this.restoreToOffsets.get(topicPartition));
                stateRestorer.setStartingOffset(this.restoreConsumer.position(topicPartition));
                this.log.debug("Calling restorer for partition {}", topicPartition);
                stateRestorer.restoreStarted();
            } else {
                this.log.trace("Did not find checkpoint from changelog {} for store {}, rewinding to beginning.", topicPartition, stateRestorer.storeName());
                this.restoreConsumer.seekToBeginning(Collections.singletonList(topicPartition));
                arrayList.add(stateRestorer);
            }
        }
        for (StateRestorer stateRestorer2 : arrayList) {
            TopicPartition partition = stateRestorer2.partition();
            StreamTask restoringTaskFor = restoringTasks.restoringTaskFor(partition);
            if (restoringTaskFor.isEosEnabled()) {
                this.log.info("No checkpoint found for task {} state store {} changelog {} with EOS turned on. Reinitializing the task and restore its state from the beginning.", restoringTaskFor.id, stateRestorer2.storeName(), partition);
                this.needsInitializing.remove(partition);
                set.remove(partition);
                stateRestorer2.setCheckpointOffset(this.restoreConsumer.position(partition));
                restoringTaskFor.reinitializeStateStoresForPartitions(Collections.singleton(partition));
            } else {
                this.log.info("Restoring task {}'s state store {} from beginning of the changelog {} ", restoringTaskFor.id, stateRestorer2.storeName(), partition);
                long position = this.restoreConsumer.position(stateRestorer2.partition());
                logRestoreOffsets(stateRestorer2.partition(), position, this.restoreToOffsets.get(stateRestorer2.partition()));
                stateRestorer2.setStartingOffset(position);
                stateRestorer2.restoreStarted();
            }
        }
        this.needsRestoring.addAll(set);
    }

    private void logRestoreOffsets(TopicPartition topicPartition, long j, Long l) {
        this.log.debug("Restoring partition {} from offset {} to endOffset {}", topicPartition, Long.valueOf(j), l);
    }

    private void refreshChangelogInfo() {
        try {
            this.partitionInfo.putAll(this.restoreConsumer.listTopics());
        } catch (TimeoutException e) {
            this.log.debug("Could not fetch topic metadata within the timeout, will retry in the next run loop");
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.ChangelogReader
    public Map<TopicPartition, Long> restoredOffsets() {
        HashMap hashMap = new HashMap();
        for (Map.Entry<TopicPartition, StateRestorer> entry : this.stateRestorers.entrySet()) {
            StateRestorer value = entry.getValue();
            if (value.isPersistent()) {
                hashMap.put(entry.getKey(), value.restoredOffset());
            }
        }
        return hashMap;
    }

    @Override // org.apache.kafka.streams.processor.internals.ChangelogReader
    public void remove(List<TopicPartition> list) {
        for (TopicPartition topicPartition : list) {
            this.partitionInfo.remove(topicPartition.topic());
            this.stateRestorers.remove(topicPartition);
            this.needsRestoring.remove(topicPartition);
            this.restoreToOffsets.remove(topicPartition);
            this.needsInitializing.remove(topicPartition);
            this.completedRestorers.remove(topicPartition);
        }
    }

    @Override // org.apache.kafka.streams.processor.internals.ChangelogReader
    public void clear() {
        this.partitionInfo.clear();
        this.stateRestorers.clear();
        this.needsRestoring.clear();
        this.restoreToOffsets.clear();
        this.needsInitializing.clear();
        this.completedRestorers.clear();
    }

    @Override // org.apache.kafka.streams.processor.internals.ChangelogReader
    public boolean isEmpty() {
        return this.stateRestorers.isEmpty() && this.needsRestoring.isEmpty() && this.restoreToOffsets.isEmpty() && this.needsInitializing.isEmpty() && this.completedRestorers.isEmpty();
    }

    public String toString() {
        return "RestoreToOffset: " + this.restoreToOffsets + "\nStateRestorers: " + this.stateRestorers + "\nNeedsRestoring: " + this.needsRestoring + "\nNeedsInitializing: " + this.needsInitializing + "\nCompletedRestorers: " + this.completedRestorers + "\n";
    }

    private long processNext(List<ConsumerRecord<byte[], byte[]>> list, StateRestorer stateRestorer, Long l) {
        ArrayList arrayList = new ArrayList();
        long j = -1;
        int size = list.size();
        int i = 0;
        long j2 = -1;
        Iterator<ConsumerRecord<byte[], byte[]>> it = list.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            ConsumerRecord<byte[], byte[]> next = it.next();
            long offset = next.offset();
            if (stateRestorer.hasCompleted(offset, l.longValue())) {
                j = next.offset();
                break;
            }
            j2 = offset;
            i++;
            if (next.key() != null) {
                arrayList.add(next);
            }
        }
        if (j == -1 || (stateRestorer.offsetLimit() == Long.MAX_VALUE && size != i)) {
            j = this.restoreConsumer.position(stateRestorer.partition());
        }
        if (!arrayList.isEmpty()) {
            stateRestorer.restore(arrayList);
            stateRestorer.restoreBatchCompleted(j2, list.size());
            this.log.trace("Restored from {} to {} with {} records, ending offset is {}, next starting position is {}", stateRestorer.partition(), stateRestorer.storeName(), Integer.valueOf(list.size()), Long.valueOf(j2), Long.valueOf(j));
        }
        return j;
    }

    private boolean checkForCompletedRestoration() {
        if (!this.needsRestoring.isEmpty()) {
            return false;
        }
        this.log.info("Finished restoring all active tasks");
        this.restoreConsumer.unsubscribe();
        return true;
    }

    private boolean hasPartition(TopicPartition topicPartition) {
        List<PartitionInfo> list = this.partitionInfo.get(topicPartition.topic());
        if (list == null) {
            return false;
        }
        Iterator<PartitionInfo> it = list.iterator();
        while (it.hasNext()) {
            if (it.next().partition() == topicPartition.partition()) {
                return true;
            }
        }
        return false;
    }
}
