package com.redis.spring.batch;

import com.redis.lettucemod.RedisModulesClient;
import com.redis.lettucemod.cluster.RedisModulesClusterClient;
import com.redis.spring.batch.common.BatchOperation;
import com.redis.spring.batch.common.DataStructure;
import com.redis.spring.batch.common.KeyDump;
import com.redis.spring.batch.common.Operation;
import com.redis.spring.batch.common.OperationItemStreamSupport;
import com.redis.spring.batch.common.PoolOptions;
import com.redis.spring.batch.common.ProcessingItemWriter;
import com.redis.spring.batch.common.QueueItemWriter;
import com.redis.spring.batch.common.SimpleBatchOperation;
import com.redis.spring.batch.common.SynchronizedPollableItemReader;
import com.redis.spring.batch.common.Utils;
import com.redis.spring.batch.reader.DataStructureReadOperation;
import com.redis.spring.batch.reader.KeyComparison;
import com.redis.spring.batch.reader.KeyComparisonReadOperation;
import com.redis.spring.batch.reader.KeyDumpReadOperation;
import com.redis.spring.batch.reader.KeyspaceNotificationItemReader;
import com.redis.spring.batch.reader.KeyspaceNotificationOrderingStrategy;
import com.redis.spring.batch.reader.LiveRedisItemReader;
import com.redis.spring.batch.reader.PollableItemReader;
import com.redis.spring.batch.reader.QueueOptions;
import com.redis.spring.batch.reader.ScanKeyItemReader;
import com.redis.spring.batch.reader.StringDataStructureReadOperation;
import com.redis.spring.batch.step.FlushingChunkProvider;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.codec.ByteArrayCodec;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.codec.StringCodec;
import io.micrometer.core.instrument.Tag;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionException;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemStreamReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.AbstractItemStreamItemReader;
import org.springframework.batch.item.support.SynchronizedItemStreamReader;
import org.springframework.batch.support.transaction.ResourcelessTransactionManager;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.util.ClassUtils;

/* loaded from: input_file:com/redis/spring/batch/RedisItemReader.class */
public class RedisItemReader<K, V, T> extends AbstractItemStreamItemReader<T> implements PollableItemReader<T> {
    public static final int DEFAULT_THREADS = 1;
    public static final int DEFAULT_CHUNK_SIZE = 50;
    private final AbstractRedisClient client;
    private final RedisCodec<K, V> codec;
    private final ItemReader<K> reader;
    private ItemProcessor<K, K> processor;
    private final BatchOperation<K, V, K, T> operation;
    private JobRepository jobRepository;
    private int threads = 1;
    private int chunkSize = 50;
    private PoolOptions poolOptions = PoolOptions.builder().build();
    private QueueOptions queueOptions = QueueOptions.builder().build();
    private String name;
    private JobExecution jobExecution;
    private BlockingQueue<T> queue;

    /* loaded from: input_file:com/redis/spring/batch/RedisItemReader$AbstractReaderBuilder.class */
    public static abstract class AbstractReaderBuilder<B extends AbstractReaderBuilder<B>> extends BaseBuilder<B> {
        protected final AbstractRedisClient client;

        protected AbstractReaderBuilder(AbstractRedisClient abstractRedisClient) {
            this.client = abstractRedisClient;
        }

        public RedisItemReader<byte[], byte[], KeyDump<byte[]>> keyDump() {
            return reader((RedisCodec) ByteArrayCodec.INSTANCE, (Operation) new KeyDumpReadOperation(this.client));
        }

        public RedisItemReader<String, String, DataStructure<String>> dataStructure() {
            return reader((RedisCodec) StringCodec.UTF8, (Operation) new StringDataStructureReadOperation(this.client));
        }

        public <K, V> RedisItemReader<K, V, DataStructure<K>> dataStructure(RedisCodec<K, V> redisCodec) {
            return reader(redisCodec, new DataStructureReadOperation(this.client, redisCodec));
        }

        protected <K, V, T> RedisItemReader<K, V, T> reader(RedisCodec<K, V> redisCodec, Operation<K, V, K, T> operation) {
            return configure(reader(redisCodec, new SimpleBatchOperation(operation)));
        }

        protected abstract <K, V, T> RedisItemReader<K, V, T> reader(RedisCodec<K, V> redisCodec, BatchOperation<K, V, K, T> batchOperation);
    }

    /* loaded from: input_file:com/redis/spring/batch/RedisItemReader$BaseBuilder.class */
    public static class BaseBuilder<B extends BaseBuilder<B>> {
        private JobRepository jobRepository;
        private int threads = 1;
        private int chunkSize = 50;
        private PoolOptions poolOptions = PoolOptions.builder().build();
        private QueueOptions queueOptions = QueueOptions.builder().build();

        public B jobRepository(JobRepository jobRepository) {
            this.jobRepository = jobRepository;
            return this;
        }

        public B queueOptions(QueueOptions queueOptions) {
            this.queueOptions = queueOptions;
            return this;
        }

        public B poolOptions(PoolOptions poolOptions) {
            this.poolOptions = poolOptions;
            return this;
        }

        public B threads(int i) {
            this.threads = i;
            return this;
        }

        public B chunkSize(int i) {
            this.chunkSize = i;
            return this;
        }

        public <B1 extends BaseBuilder<B1>> B1 toBuilder(B1 b1) {
            b1.jobRepository(this.jobRepository);
            b1.chunkSize(this.chunkSize);
            b1.threads(this.threads);
            b1.poolOptions(this.poolOptions);
            b1.queueOptions(this.queueOptions);
            return b1;
        }

        protected <K, V, T, R extends RedisItemReader<K, V, T>> R configure(R r) {
            r.withJobRepository(this.jobRepository);
            r.withChunkSize(this.chunkSize);
            r.withThreads(this.threads);
            r.withPoolOptions(this.poolOptions);
            r.withQueueOptions(this.queueOptions);
            return r;
        }
    }

    /* loaded from: input_file:com/redis/spring/batch/RedisItemReader$ComparatorBuilder.class */
    public static class ComparatorBuilder extends BaseBuilder<ComparatorBuilder> {
        private final AbstractRedisClient left;
        private final AbstractRedisClient right;
        private long scanCount = 1000;
        protected String scanMatch = "*";
        private Optional<String> scanType = Optional.empty();
        private Duration ttlTolerance = KeyComparisonReadOperation.DEFAULT_TTL_TOLERANCE;
        private PoolOptions rightPoolOptions = PoolOptions.builder().build();

        public ComparatorBuilder(AbstractRedisClient abstractRedisClient, AbstractRedisClient abstractRedisClient2) {
            this.left = abstractRedisClient;
            this.right = abstractRedisClient2;
        }

        public ComparatorBuilder scanCount(long j) {
            this.scanCount = j;
            return this;
        }

        public ComparatorBuilder scanMatch(String str) {
            this.scanMatch = str;
            return this;
        }

        public ComparatorBuilder scanType(String str) {
            return scanType(Optional.of(str));
        }

        public ComparatorBuilder scanType(Optional<String> optional) {
            this.scanType = optional;
            return this;
        }

        public ComparatorBuilder ttlTolerance(Duration duration) {
            this.ttlTolerance = duration;
            return this;
        }

        public ComparatorBuilder rightPoolOptions(PoolOptions poolOptions) {
            this.rightPoolOptions = poolOptions;
            return this;
        }

        public RedisItemReader<String, String, KeyComparison> build() {
            ScanKeyItemReader scanKeyItemReader = new ScanKeyItemReader(this.left, StringCodec.UTF8);
            scanKeyItemReader.withCount(this.scanCount);
            scanKeyItemReader.withMatch(this.scanMatch);
            scanKeyItemReader.withType(this.scanType);
            KeyComparisonReadOperation keyComparisonReadOperation = new KeyComparisonReadOperation(this.left, this.right);
            keyComparisonReadOperation.withPoolOptions(this.rightPoolOptions);
            keyComparisonReadOperation.withTtlTolerance(this.ttlTolerance);
            return configure(new RedisItemReader(this.left, StringCodec.UTF8, scanKeyItemReader, keyComparisonReadOperation));
        }
    }

    /* loaded from: input_file:com/redis/spring/batch/RedisItemReader$LiveReaderBuilder.class */
    public static class LiveReaderBuilder extends AbstractReaderBuilder<LiveReaderBuilder> {
        private static final String PUBSUB_PATTERN_FORMAT = "__keyspace@%s__:%s";
        public static final int DEFAULT_DATABASE = 0;
        protected static final String[] DEFAULT_KEY_PATTERNS = {"*"};
        private int database;
        private String[] keyPatterns;
        private List<String> keyTypes;
        private QueueOptions notificationQueueOptions;
        private KeyspaceNotificationOrderingStrategy notificationOrdering;
        private Duration flushingInterval;
        private Duration idleTimeout;

        public LiveReaderBuilder(AbstractRedisClient abstractRedisClient) {
            super(abstractRedisClient);
            this.database = 0;
            this.keyPatterns = DEFAULT_KEY_PATTERNS;
            this.keyTypes = new ArrayList();
            this.notificationQueueOptions = QueueOptions.builder().build();
            this.notificationOrdering = KeyspaceNotificationItemReader.DEFAULT_ORDERING;
            this.flushingInterval = FlushingChunkProvider.DEFAULT_FLUSHING_INTERVAL;
        }

        public LiveReaderBuilder idleTimeout(Duration duration) {
            this.idleTimeout = duration;
            return this;
        }

        public LiveReaderBuilder flushingInterval(Duration duration) {
            this.flushingInterval = duration;
            return this;
        }

        public LiveReaderBuilder database(int i) {
            this.database = i;
            return this;
        }

        public LiveReaderBuilder keyPatterns(String... strArr) {
            this.keyPatterns = strArr;
            return this;
        }

        public LiveReaderBuilder keyTypes(String... strArr) {
            this.keyTypes = Arrays.asList(strArr);
            return this;
        }

        public LiveReaderBuilder notificationQueueOptions(QueueOptions queueOptions) {
            this.notificationQueueOptions = queueOptions;
            return this;
        }

        public LiveReaderBuilder notificationOrdering(KeyspaceNotificationOrderingStrategy keyspaceNotificationOrderingStrategy) {
            this.notificationOrdering = keyspaceNotificationOrderingStrategy;
            return this;
        }

        @Override // com.redis.spring.batch.RedisItemReader.AbstractReaderBuilder
        protected <K, V, T> RedisItemReader<K, V, T> reader(RedisCodec<K, V> redisCodec, BatchOperation<K, V, K, T> batchOperation) {
            KeyspaceNotificationItemReader keyspaceNotificationItemReader = new KeyspaceNotificationItemReader(this.client, redisCodec);
            keyspaceNotificationItemReader.withPatterns(patterns(this.database, this.keyPatterns));
            keyspaceNotificationItemReader.withTypes(this.keyTypes);
            keyspaceNotificationItemReader.withOrderingStrategy(this.notificationOrdering);
            keyspaceNotificationItemReader.withQueueOptions(this.notificationQueueOptions);
            LiveRedisItemReader liveRedisItemReader = new LiveRedisItemReader(this.client, redisCodec, keyspaceNotificationItemReader, batchOperation);
            liveRedisItemReader.withFlushingInterval(this.flushingInterval);
            liveRedisItemReader.withIdleTimeout(this.idleTimeout);
            return liveRedisItemReader;
        }

        public static List<String> defaultPatterns() {
            return patterns(0, DEFAULT_KEY_PATTERNS);
        }

        public static List<String> patterns(int i, String... strArr) {
            return (List) Stream.of((Object[]) strArr).map(str -> {
                return String.format(PUBSUB_PATTERN_FORMAT, Integer.valueOf(i), str);
            }).collect(Collectors.toList());
        }

        @Override // com.redis.spring.batch.RedisItemReader.AbstractReaderBuilder
        public LiveRedisItemReader<String, String, DataStructure<String>> dataStructure() {
            return (LiveRedisItemReader) super.dataStructure();
        }

        @Override // com.redis.spring.batch.RedisItemReader.AbstractReaderBuilder
        public LiveRedisItemReader<byte[], byte[], KeyDump<byte[]>> keyDump() {
            return (LiveRedisItemReader) super.keyDump();
        }
    }

    /* loaded from: input_file:com/redis/spring/batch/RedisItemReader$ScanReaderBuilder.class */
    public static class ScanReaderBuilder extends AbstractReaderBuilder<ScanReaderBuilder> {
        protected String scanMatch;
        private long scanCount;
        private Optional<String> scanType;

        public ScanReaderBuilder(AbstractRedisClient abstractRedisClient) {
            super(abstractRedisClient);
            this.scanMatch = "*";
            this.scanCount = 1000L;
            this.scanType = Optional.empty();
        }

        public ScanReaderBuilder scanMatch(String str) {
            this.scanMatch = str;
            return this;
        }

        public ScanReaderBuilder scanCount(long j) {
            this.scanCount = j;
            return this;
        }

        public ScanReaderBuilder scanType(Optional<String> optional) {
            this.scanType = optional;
            return this;
        }

        public ScanReaderBuilder type(String str) {
            return scanType(Optional.of(str));
        }

        @Override // com.redis.spring.batch.RedisItemReader.AbstractReaderBuilder
        protected <K, V, T> RedisItemReader<K, V, T> reader(RedisCodec<K, V> redisCodec, BatchOperation<K, V, K, T> batchOperation) {
            ScanKeyItemReader scanKeyItemReader = new ScanKeyItemReader(this.client, redisCodec);
            scanKeyItemReader.withCount(this.scanCount);
            scanKeyItemReader.withMatch(this.scanMatch);
            scanKeyItemReader.withType(this.scanType);
            return new RedisItemReader<>(this.client, redisCodec, scanKeyItemReader, batchOperation);
        }

        public LiveReaderBuilder live() {
            LiveReaderBuilder liveReaderBuilder = (LiveReaderBuilder) toBuilder(new LiveReaderBuilder(this.client));
            liveReaderBuilder.keyPatterns(this.scanMatch);
            Optional<String> optional = this.scanType;
            Objects.requireNonNull(liveReaderBuilder);
            optional.ifPresent(str -> {
                liveReaderBuilder.keyTypes(str);
            });
            return liveReaderBuilder;
        }
    }

    public RedisItemReader(AbstractRedisClient abstractRedisClient, RedisCodec<K, V> redisCodec, ItemReader<K> itemReader, BatchOperation<K, V, K, T> batchOperation) {
        setName(ClassUtils.getShortName(getClass()));
        this.client = abstractRedisClient;
        this.codec = redisCodec;
        this.reader = itemReader;
        this.operation = batchOperation;
    }

    public RedisItemReader<K, V, T> withKeyProcessor(ItemProcessor<K, K> itemProcessor) {
        this.processor = itemProcessor;
        return this;
    }

    public RedisItemReader<K, V, T> withThreads(int i) {
        this.threads = i;
        return this;
    }

    public RedisItemReader<K, V, T> withChunkSize(int i) {
        this.chunkSize = i;
        return this;
    }

    public RedisItemReader<K, V, T> withPoolOptions(PoolOptions poolOptions) {
        this.poolOptions = poolOptions;
        return this;
    }

    public RedisItemReader<K, V, T> withQueueOptions(QueueOptions queueOptions) {
        this.queueOptions = queueOptions;
        return this;
    }

    public RedisItemReader<K, V, T> withJobRepository(JobRepository jobRepository) {
        this.jobRepository = jobRepository;
        return this;
    }

    public void setName(String str) {
        super.setName(str);
        this.name = str;
    }

    public synchronized void open(ExecutionContext executionContext) {
        super.open(executionContext);
        if (this.jobExecution != null) {
            return;
        }
        this.queue = queue();
        try {
            JobRepository jobRepository = jobRepository();
            StepBuilder stepBuilder = new StepBuilder(this.name + "-step");
            stepBuilder.repository(jobRepository);
            stepBuilder.transactionManager(transactionManager());
            SimpleStepBuilder<K, K> step = step(stepBuilder);
            step.reader(reader());
            step.processor(this.processor);
            step.writer(writer());
            Utils.multiThread(step, this.threads);
            Job build = new JobBuilderFactory(jobRepository).get(this.name).start(step.build()).build();
            SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
            simpleJobLauncher.setJobRepository(jobRepository);
            simpleJobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
            try {
                this.jobExecution = simpleJobLauncher.run(build, new JobParameters());
                while (!this.jobExecution.isRunning() && !this.jobExecution.getStatus().isUnsuccessful() && this.jobExecution.getStatus() != BatchStatus.COMPLETED) {
                    sleep();
                }
                sleep();
                if (this.jobExecution.getStatus().isUnsuccessful()) {
                    throw new ItemStreamException("Could not run job", (Throwable) this.jobExecution.getAllFailureExceptions().iterator().next());
                }
            } catch (JobExecutionException e) {
                throw new ItemStreamException("Job execution failed", e);
            }
        } catch (Exception e2) {
            throw new ItemStreamException("Could not initialize job repository", e2);
        }
    }

    private void sleep() {
        try {
            Thread.sleep(this.queueOptions.getPollTimeout().toMillis());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ItemStreamException("Interrupted during initialization", e);
        }
    }

    private PlatformTransactionManager transactionManager() {
        return new ResourcelessTransactionManager();
    }

    public JobExecution getJobExecution() {
        return this.jobExecution;
    }

    private JobRepository jobRepository() throws Exception {
        if (this.jobRepository == null) {
            this.jobRepository = Utils.inMemoryJobRepository();
        }
        return this.jobRepository;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SimpleStepBuilder<K, K> step(StepBuilder stepBuilder) {
        return stepBuilder.chunk(this.chunkSize);
    }

    private ItemWriter<? super K> writer() {
        OperationItemStreamSupport operationItemStreamSupport = new OperationItemStreamSupport(this.client, this.codec, this.operation);
        operationItemStreamSupport.withPoolOptions(this.poolOptions);
        return new ProcessingItemWriter(operationItemStreamSupport, new QueueItemWriter(this.queue));
    }

    protected ItemReader<K> reader() {
        if (this.threads > 1) {
            if (this.reader instanceof PollableItemReader) {
                return new SynchronizedPollableItemReader((PollableItemReader) this.reader);
            }
            if (this.reader instanceof ItemStreamReader) {
                SynchronizedItemStreamReader synchronizedItemStreamReader = new SynchronizedItemStreamReader();
                synchronizedItemStreamReader.setDelegate(this.reader);
                return synchronizedItemStreamReader;
            }
        }
        return this.reader;
    }

    private BlockingQueue<T> queue() {
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(this.queueOptions.getCapacity());
        Utils.createGaugeCollectionSize("reader.queue.size", linkedBlockingQueue, new Tag[0]);
        return linkedBlockingQueue;
    }

    public synchronized void close() {
        if (this.jobExecution != null) {
            if (this.jobExecution.isRunning()) {
                Iterator it = this.jobExecution.getStepExecutions().iterator();
                while (it.hasNext()) {
                    ((StepExecution) it.next()).setTerminateOnly();
                }
                this.jobExecution.setStatus(BatchStatus.STOPPING);
            }
            this.jobExecution = null;
        }
        this.queue = null;
        super.close();
    }

    public boolean isOpen() {
        return this.jobExecution != null;
    }

    @Override // com.redis.spring.batch.reader.PollableItemReader
    public synchronized T poll(long j, TimeUnit timeUnit) throws InterruptedException {
        return this.queue.poll(j, timeUnit);
    }

    public synchronized T read() throws Exception {
        T poll;
        do {
            poll = this.queue.poll(this.queueOptions.getPollTimeout().toMillis(), TimeUnit.MILLISECONDS);
            if (poll != null || this.jobExecution == null) {
                break;
            }
        } while (this.jobExecution.isRunning());
        if (this.jobExecution == null || !this.jobExecution.getStatus().isUnsuccessful()) {
            return poll;
        }
        throw new ItemStreamException("Reader job failed");
    }

    public synchronized List<T> read(int i) {
        ArrayList arrayList = new ArrayList(i);
        this.queue.drainTo(arrayList, i);
        return arrayList;
    }

    public static ScanReaderBuilder client(RedisModulesClient redisModulesClient) {
        return new ScanReaderBuilder(redisModulesClient);
    }

    public static ScanReaderBuilder client(RedisModulesClusterClient redisModulesClusterClient) {
        return new ScanReaderBuilder(redisModulesClusterClient);
    }

    public static ComparatorBuilder compare(RedisModulesClient redisModulesClient, RedisModulesClient redisModulesClient2) {
        return new ComparatorBuilder(redisModulesClient, redisModulesClient2);
    }

    public static ComparatorBuilder compare(RedisModulesClient redisModulesClient, RedisModulesClusterClient redisModulesClusterClient) {
        return new ComparatorBuilder(redisModulesClient, redisModulesClusterClient);
    }

    public static ComparatorBuilder compare(RedisModulesClusterClient redisModulesClusterClient, RedisModulesClient redisModulesClient) {
        return new ComparatorBuilder(redisModulesClusterClient, redisModulesClient);
    }

    public static ComparatorBuilder compare(RedisModulesClusterClient redisModulesClusterClient, RedisModulesClusterClient redisModulesClusterClient2) {
        return new ComparatorBuilder(redisModulesClusterClient, redisModulesClusterClient2);
    }
}
