package com.redis.spring.batch;

import com.redis.lettucemod.RedisModulesClient;
import com.redis.lettucemod.cluster.RedisModulesClusterClient;
import com.redis.spring.batch.common.BatchOperation;
import com.redis.spring.batch.common.DataStructure;
import com.redis.spring.batch.common.FilteringItemProcessor;
import com.redis.spring.batch.common.JobRunner;
import com.redis.spring.batch.common.KeyDump;
import com.redis.spring.batch.common.Operation;
import com.redis.spring.batch.common.OperationItemStreamSupport;
import com.redis.spring.batch.common.PoolOptions;
import com.redis.spring.batch.common.ProcessingItemWriter;
import com.redis.spring.batch.common.QueueItemWriter;
import com.redis.spring.batch.common.SimpleBatchOperation;
import com.redis.spring.batch.common.SimpleStepRunner;
import com.redis.spring.batch.common.Utils;
import com.redis.spring.batch.reader.DataStructureCodecReadOperation;
import com.redis.spring.batch.reader.DataStructureStringReadOperation;
import com.redis.spring.batch.reader.KeyComparison;
import com.redis.spring.batch.reader.KeyComparisonReadOperation;
import com.redis.spring.batch.reader.KeyDumpReadOperation;
import com.redis.spring.batch.reader.KeyspaceNotificationItemReader;
import com.redis.spring.batch.reader.PollableItemReader;
import com.redis.spring.batch.reader.QueueOptions;
import com.redis.spring.batch.reader.ReaderOptions;
import com.redis.spring.batch.reader.ScanKeyItemReader;
import com.redis.spring.batch.reader.ScanOptions;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.codec.ByteArrayCodec;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.codec.StringCodec;
import io.micrometer.core.instrument.Tag;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import org.springframework.batch.item.support.AbstractItemStreamItemReader;
import org.springframework.util.ClassUtils;

/* loaded from: input_file:com/redis/spring/batch/RedisItemReader.class */
public class RedisItemReader<K, V, T> extends AbstractItemStreamItemReader<T> implements PollableItemReader<T> {
    public static final Duration DEFAULT_RUNNING_TIMEOUT = Duration.ofSeconds(5);
    public static final Duration DEFAULT_FLUSHING_INTERVAL = Duration.ofMillis(50);
    private final JobRunner jobRunner;
    private final AbstractRedisClient client;
    private final RedisCodec<K, V> codec;
    private final ItemReader<K> reader;
    private final ItemProcessor<K, K> processor;
    private final BatchOperation<K, V, K, T> operation;
    private final ReaderOptions options;
    private String name;
    private BlockingQueue<T> queue;
    private SimpleStepRunner<K, K> stepRunner;

    /* loaded from: input_file:com/redis/spring/batch/RedisItemReader$AbstractBuilder.class */
    public static abstract class AbstractBuilder<K, V, T, B extends AbstractBuilder<K, V, T, B>> {
        protected final AbstractRedisClient client;
        protected final RedisCodec<K, V> codec;
        private Optional<JobRunner> jobRunner = Optional.empty();
        protected ReaderOptions options = ReaderOptions.builder().build();

        protected AbstractBuilder(AbstractRedisClient abstractRedisClient, RedisCodec<K, V> redisCodec) {
            this.client = abstractRedisClient;
            this.codec = redisCodec;
        }

        public B jobRunner(JobRunner jobRunner) {
            this.jobRunner = Optional.of(jobRunner);
            return this;
        }

        protected B jobRunner(Optional<JobRunner> optional) {
            this.jobRunner = optional;
            return this;
        }

        public B options(ReaderOptions readerOptions) {
            this.options = readerOptions;
            return this;
        }

        protected <B1 extends AbstractBuilder<?, ?, ?, ?>> B1 toBuilder(B1 b1) {
            b1.jobRunner(this.jobRunner);
            b1.options(this.options);
            return b1;
        }

        private JobRunner jobRunner() {
            return this.jobRunner.orElseGet(JobRunner::getInMemoryInstance);
        }

        public RedisItemReader<K, V, T> build() {
            return new RedisItemReader<>(jobRunner(), this.client, this.codec, keyReader(), keyProcessor(), operation(), this.options);
        }

        protected abstract BatchOperation<K, V, K, T> operation();

        protected ItemProcessor<K, K> keyProcessor() {
            return null;
        }

        protected abstract ItemReader<K> keyReader();
    }

    /* loaded from: input_file:com/redis/spring/batch/RedisItemReader$ComparatorBuilder.class */
    public static class ComparatorBuilder extends AbstractBuilder<String, String, KeyComparison, ComparatorBuilder> {
        public static final Duration DEFAULT_TTL_TOLERANCE = Duration.ofMillis(100);
        private final AbstractRedisClient right;
        private Duration ttlTolerance;
        private ScanOptions scanOptions;
        private PoolOptions rightPoolOptions;

        public ComparatorBuilder(AbstractRedisClient abstractRedisClient, AbstractRedisClient abstractRedisClient2) {
            super(abstractRedisClient, StringCodec.UTF8);
            this.ttlTolerance = DEFAULT_TTL_TOLERANCE;
            this.scanOptions = ScanOptions.builder().build();
            this.rightPoolOptions = PoolOptions.builder().build();
            this.right = abstractRedisClient2;
        }

        public ComparatorBuilder ttlTolerance(Duration duration) {
            this.ttlTolerance = duration;
            return this;
        }

        public ComparatorBuilder scanOptions(ScanOptions scanOptions) {
            this.scanOptions = scanOptions;
            return this;
        }

        public ComparatorBuilder rightPoolOptions(PoolOptions poolOptions) {
            this.rightPoolOptions = poolOptions;
            return this;
        }

        @Override // com.redis.spring.batch.RedisItemReader.AbstractBuilder
        protected ItemReader<String> keyReader() {
            return new ScanKeyItemReader(this.client, this.codec, this.scanOptions);
        }

        @Override // com.redis.spring.batch.RedisItemReader.AbstractBuilder
        protected BatchOperation<String, String, String, KeyComparison> operation() {
            return new KeyComparisonReadOperation(this.client, this.right, this.rightPoolOptions, this.ttlTolerance);
        }
    }

    /* loaded from: input_file:com/redis/spring/batch/RedisItemReader$LiveBuilder.class */
    public static class LiveBuilder<K, V, T> extends AbstractBuilder<K, V, T, LiveBuilder<K, V, T>> {
        public static final int DEFAULT_DATABASE = 0;
        public static final String PUBSUB_PATTERN_FORMAT = "__keyspace@%s__:%s";
        private final Operation<K, V, K, T> operation;
        private int database;
        private String[] keyPatterns;
        private QueueOptions eventQueueOptions;
        private Predicate<K> keyFilter;
        public static final Duration DEFAULT_FLUSHING_INTERVAL = Duration.ofMillis(50);
        protected static final String[] DEFAULT_KEY_PATTERNS = {"*"};

        public LiveBuilder(AbstractRedisClient abstractRedisClient, RedisCodec<K, V> redisCodec, Operation<K, V, K, T> operation) {
            super(abstractRedisClient, redisCodec);
            this.database = 0;
            this.keyPatterns = DEFAULT_KEY_PATTERNS;
            this.eventQueueOptions = QueueOptions.builder().build();
            this.operation = operation;
            this.options.getStepOptions().setFlushingInterval(DEFAULT_FLUSHING_INTERVAL);
        }

        @Override // com.redis.spring.batch.RedisItemReader.AbstractBuilder
        protected BatchOperation<K, V, K, T> operation() {
            return new SimpleBatchOperation(this.operation);
        }

        public LiveBuilder<K, V, T> keyFilter(Predicate<K> predicate) {
            this.keyFilter = predicate;
            return this;
        }

        @Override // com.redis.spring.batch.RedisItemReader.AbstractBuilder
        protected ItemProcessor<K, K> keyProcessor() {
            if (this.keyFilter == null) {
                return null;
            }
            return new FilteringItemProcessor(this.keyFilter);
        }

        public static String[] defaultKeyPatterns() {
            return DEFAULT_KEY_PATTERNS;
        }

        public static String[] patterns(int i, String... strArr) {
            return (String[]) ((List) Stream.of((Object[]) strArr).map(str -> {
                return String.format(PUBSUB_PATTERN_FORMAT, Integer.valueOf(i), str);
            }).collect(Collectors.toList())).toArray(new String[0]);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // com.redis.spring.batch.RedisItemReader.AbstractBuilder
        public KeyspaceNotificationItemReader<K, V> keyReader() {
            return new KeyspaceNotificationItemReader<>(this.client, this.codec, this.eventQueueOptions, patterns(this.database, this.keyPatterns));
        }

        public LiveBuilder<K, V, T> database(int i) {
            this.database = i;
            return this;
        }

        public LiveBuilder<K, V, T> keyPatterns(String... strArr) {
            this.keyPatterns = strArr;
            return this;
        }

        public LiveBuilder<K, V, T> eventQueueOptions(QueueOptions queueOptions) {
            this.eventQueueOptions = queueOptions;
            return this;
        }

        public static String[] defaultNotificationPatterns() {
            return patterns(0, defaultKeyPatterns());
        }
    }

    /* loaded from: input_file:com/redis/spring/batch/RedisItemReader$ScanBuilder.class */
    public static class ScanBuilder<K, V, T> extends AbstractBuilder<K, V, T, ScanBuilder<K, V, T>> {
        private final Operation<K, V, K, T> operation;
        private ScanOptions scanOptions;

        public ScanBuilder(AbstractRedisClient abstractRedisClient, RedisCodec<K, V> redisCodec, Operation<K, V, K, T> operation) {
            super(abstractRedisClient, redisCodec);
            this.scanOptions = ScanOptions.builder().build();
            this.operation = operation;
        }

        @Override // com.redis.spring.batch.RedisItemReader.AbstractBuilder
        protected BatchOperation<K, V, K, T> operation() {
            return new SimpleBatchOperation(this.operation);
        }

        public ScanBuilder<K, V, T> scanOptions(ScanOptions scanOptions) {
            this.scanOptions = scanOptions;
            return this;
        }

        public LiveBuilder<K, V, T> live() {
            return ((LiveBuilder) toBuilder(new LiveBuilder(this.client, this.codec, this.operation))).keyPatterns(this.scanOptions.getMatch());
        }

        @Override // com.redis.spring.batch.RedisItemReader.AbstractBuilder
        protected ItemReader<K> keyReader() {
            return new ScanKeyItemReader(this.client, this.codec, this.scanOptions);
        }
    }

    public RedisItemReader(JobRunner jobRunner, AbstractRedisClient abstractRedisClient, RedisCodec<K, V> redisCodec, ItemReader<K> itemReader, ItemProcessor<K, K> itemProcessor, BatchOperation<K, V, K, T> batchOperation, ReaderOptions readerOptions) {
        setName(ClassUtils.getShortName(getClass()));
        this.jobRunner = jobRunner;
        this.client = abstractRedisClient;
        this.codec = redisCodec;
        this.reader = itemReader;
        this.processor = itemProcessor;
        this.operation = batchOperation;
        this.options = readerOptions;
    }

    public void setName(String str) {
        super.setName(str);
        this.name = str;
    }

    public synchronized void open(ExecutionContext executionContext) {
        super.open(executionContext);
        if (this.stepRunner == null) {
            this.queue = queue();
            this.stepRunner = new SimpleStepRunner<>(this.jobRunner, this.reader, this.processor, new ProcessingItemWriter(operationProcessor(), new QueueItemWriter(this.queue)), this.options.getStepOptions());
            this.stepRunner.setName(this.name);
            this.stepRunner.open(executionContext);
        }
    }

    private OperationItemStreamSupport<K, V, K, T> operationProcessor() {
        return new OperationItemStreamSupport<>(this.client, this.codec, this.options.getPoolOptions(), this.operation);
    }

    private BlockingQueue<T> queue() {
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(this.options.getQueueOptions().getCapacity());
        Utils.createGaugeCollectionSize("reader.queue.size", linkedBlockingQueue, new Tag[0]);
        return linkedBlockingQueue;
    }

    public void update(ExecutionContext executionContext) {
        super.update(executionContext);
        if (this.stepRunner != null) {
            this.stepRunner.update(executionContext);
        }
    }

    public synchronized void close() {
        if (this.stepRunner != null) {
            this.stepRunner.close();
            this.stepRunner = null;
        }
        this.queue = null;
        super.close();
    }

    @Override // com.redis.spring.batch.reader.PollableItemReader
    public synchronized T poll(long j, TimeUnit timeUnit) throws InterruptedException {
        return this.queue.poll(j, timeUnit);
    }

    public T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
        T poll;
        do {
            poll = this.queue.poll(this.options.getQueueOptions().getPollTimeout().toMillis(), TimeUnit.MILLISECONDS);
            if (poll != null) {
                break;
            }
        } while (this.stepRunner.isRunning());
        if (this.stepRunner.isJobFailed()) {
            throw new ItemStreamException("Reader job failed");
        }
        return poll;
    }

    public synchronized List<T> read(int i) {
        ArrayList arrayList = new ArrayList(i);
        this.queue.drainTo(arrayList, i);
        return arrayList;
    }

    public static ComparatorBuilder compare(AbstractRedisClient abstractRedisClient, AbstractRedisClient abstractRedisClient2) {
        return new ComparatorBuilder(abstractRedisClient, abstractRedisClient2);
    }

    public static ScanBuilder<String, String, DataStructure<String>> dataStructure(RedisModulesClusterClient redisModulesClusterClient) {
        return new ScanBuilder<>(redisModulesClusterClient, StringCodec.UTF8, new DataStructureStringReadOperation(redisModulesClusterClient));
    }

    public static ScanBuilder<String, String, DataStructure<String>> dataStructure(RedisModulesClient redisModulesClient) {
        return new ScanBuilder<>(redisModulesClient, StringCodec.UTF8, new DataStructureStringReadOperation(redisModulesClient));
    }

    public static <K, V> ScanBuilder<K, V, DataStructure<K>> dataStructure(RedisModulesClient redisModulesClient, RedisCodec<K, V> redisCodec) {
        return new ScanBuilder<>(redisModulesClient, redisCodec, new DataStructureCodecReadOperation(redisModulesClient, redisCodec));
    }

    public static <K, V> ScanBuilder<K, V, DataStructure<K>> dataStructure(RedisModulesClusterClient redisModulesClusterClient, RedisCodec<K, V> redisCodec) {
        return new ScanBuilder<>(redisModulesClusterClient, redisCodec, new DataStructureCodecReadOperation(redisModulesClusterClient, redisCodec));
    }

    public static ScanBuilder<byte[], byte[], KeyDump<byte[]>> keyDump(RedisModulesClient redisModulesClient) {
        return new ScanBuilder<>(redisModulesClient, ByteArrayCodec.INSTANCE, new KeyDumpReadOperation(redisModulesClient));
    }

    public static ScanBuilder<byte[], byte[], KeyDump<byte[]>> keyDump(RedisModulesClusterClient redisModulesClusterClient) {
        return new ScanBuilder<>(redisModulesClusterClient, ByteArrayCodec.INSTANCE, new KeyDumpReadOperation(redisModulesClusterClient));
    }
}
