package com.redis.spring.batch;

import com.redis.lettucemod.api.StatefulRedisModulesConnection;
import com.redis.spring.batch.common.Await;
import com.redis.spring.batch.common.BatchUtils;
import com.redis.spring.batch.common.FlushingChunkProvider;
import com.redis.spring.batch.common.FlushingStepBuilder;
import com.redis.spring.batch.common.JobFactory;
import com.redis.spring.batch.common.Operation;
import com.redis.spring.batch.common.OperationExecutor;
import com.redis.spring.batch.reader.AbstractPollableItemReader;
import com.redis.spring.batch.reader.KeyComparisonItemReader;
import com.redis.spring.batch.reader.KeyNotificationItemReader;
import com.redis.spring.batch.reader.MemKeyValue;
import com.redis.spring.batch.reader.MemKeyValueRead;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.KeyScanArgs;
import io.lettuce.core.ReadFrom;
import io.lettuce.core.RedisCommandExecutionException;
import io.lettuce.core.RedisCommandTimeoutException;
import io.lettuce.core.ScanIterator;
import io.lettuce.core.codec.ByteArrayCodec;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.codec.StringCodec;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionException;
import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder;
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.IteratorItemReader;
import org.springframework.batch.item.support.SynchronizedItemReader;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:com/redis/spring/batch/RedisItemReader.class */
public class RedisItemReader<K, V, T> extends AbstractPollableItemReader<T> {
    public static final int DEFAULT_POOL_SIZE = 8;
    public static final int DEFAULT_THREADS = 1;
    public static final int DEFAULT_CHUNK_SIZE = 50;
    public static final int DEFAULT_SKIP_LIMIT = 0;
    public static final int DEFAULT_RETRY_LIMIT = 3;
    public static final int DEFAULT_QUEUE_CAPACITY = 10000;
    public static final int DEFAULT_NOTIFICATION_QUEUE_CAPACITY = 10000;
    private final RedisCodec<K, V> codec;
    protected final Operation<K, V, K, T> operation;
    private AbstractRedisClient client;
    private JobFactory jobFactory;
    private ItemProcessor<K, K> keyProcessor;
    private int poolSize = 8;
    private ReaderMode mode = DEFAULT_MODE;
    private int chunkSize = 50;
    private int threads = 1;
    private int skipLimit = 0;
    private int retryLimit = 3;
    private Duration flushInterval = DEFAULT_FLUSH_INTERVAL;
    private Duration idleTimeout = DEFAULT_IDLE_TIMEOUT;
    private int queueCapacity = 10000;
    private int notificationQueueCapacity = 10000;
    private ReadFrom readFrom;
    private String keyPattern;
    private String keyType;
    private long scanCount;
    private int database;
    private JobExecution jobExecution;
    private BlockingQueue<T> queue;
    private OperationExecutor<K, V, K, T> operationExecutor;
    private ItemReader<K> reader;
    public static final Duration DEFAULT_FLUSH_INTERVAL = FlushingChunkProvider.DEFAULT_FLUSH_INTERVAL;
    public static final Duration DEFAULT_IDLE_TIMEOUT = FlushingChunkProvider.DEFAULT_IDLE_TIMEOUT;
    public static final ReaderMode DEFAULT_MODE = ReaderMode.SNAPSHOT;

    /* loaded from: input_file:com/redis/spring/batch/RedisItemReader$ReaderMode.class */
    public enum ReaderMode {
        SNAPSHOT,
        LIVE
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/redis/spring/batch/RedisItemReader$Writer.class */
    public class Writer implements ItemWriter<K> {
        private Writer() {
        }

        public void write(Chunk<? extends K> chunk) throws Exception {
            Iterator<T> it = RedisItemReader.this.read((Iterable) chunk).iterator();
            while (it.hasNext()) {
                RedisItemReader.this.queue.put(it.next());
            }
        }
    }

    public RedisItemReader(RedisCodec<K, V> redisCodec, Operation<K, V, K, T> operation) {
        this.codec = redisCodec;
        this.operation = operation;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized void doOpen() throws Exception {
        Assert.notNull(this.client, getName() + ": Redis client not set");
        if (this.jobFactory == null) {
            this.jobFactory = new JobFactory();
        }
        this.jobFactory.afterPropertiesSet();
        if (this.operationExecutor == null) {
            this.operationExecutor = new OperationExecutor<>(this.codec, this.operation);
            this.operationExecutor.setClient(this.client);
            this.operationExecutor.setPoolSize(this.poolSize);
            this.operationExecutor.setReadFrom(this.readFrom);
            this.operationExecutor.afterPropertiesSet();
        }
        if (this.queue == null) {
            this.queue = new LinkedBlockingQueue();
        }
        if (this.jobExecution == null) {
            this.jobExecution = this.jobFactory.runAsync(this.jobFactory.jobBuilder(getName()).start(step().build()).build());
            try {
                Await.await().until(() -> {
                    return this.jobExecution.isRunning() || this.jobExecution.getStatus().isUnsuccessful();
                });
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw e;
            } catch (TimeoutException e2) {
                List allFailureExceptions = this.jobExecution.getAllFailureExceptions();
                if (!CollectionUtils.isEmpty(allFailureExceptions)) {
                    throw new JobExecutionException("Job execution unsuccessful", (Throwable) allFailureExceptions.get(0));
                }
            }
        }
    }

    private FaultTolerantStepBuilder<K, K> step() {
        SimpleStepBuilder<K, K> stepBuilder = stepBuilder();
        this.reader = reader();
        stepBuilder.reader(this.reader);
        stepBuilder.processor(this.keyProcessor);
        stepBuilder.writer(new Writer());
        if (this.threads > 1) {
            ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
            threadPoolTaskExecutor.setMaxPoolSize(this.threads);
            threadPoolTaskExecutor.setCorePoolSize(this.threads);
            threadPoolTaskExecutor.setQueueCapacity(this.threads);
            threadPoolTaskExecutor.afterPropertiesSet();
            stepBuilder.taskExecutor(threadPoolTaskExecutor);
            if (!isLive()) {
                stepBuilder.reader(new SynchronizedItemReader(this.reader));
            }
        }
        FaultTolerantStepBuilder<K, K> faultTolerant = stepBuilder.faultTolerant();
        faultTolerant.retryLimit(this.retryLimit);
        faultTolerant.skipLimit(this.skipLimit);
        faultTolerant.skip(RedisCommandExecutionException.class);
        faultTolerant.noRetry(RedisCommandExecutionException.class);
        faultTolerant.noSkip(RedisCommandTimeoutException.class);
        faultTolerant.retry(RedisCommandTimeoutException.class);
        return faultTolerant;
    }

    private SimpleStepBuilder<K, K> stepBuilder() {
        SimpleStepBuilder<K, K> step = this.jobFactory.step(getName(), this.chunkSize);
        if (!isLive()) {
            return step;
        }
        FlushingStepBuilder flushingStepBuilder = new FlushingStepBuilder(step);
        flushingStepBuilder.flushInterval(this.flushInterval);
        flushingStepBuilder.idleTimeout(this.idleTimeout);
        return flushingStepBuilder;
    }

    private boolean isLive() {
        return this.mode == ReaderMode.LIVE;
    }

    private ItemReader<K> reader() {
        if (!isLive()) {
            return new IteratorItemReader(ScanIterator.scan(connection().sync(), scanArgs()));
        }
        KeyNotificationItemReader keyNotificationItemReader = new KeyNotificationItemReader(this.client, this.codec);
        keyNotificationItemReader.setName(getName() + "-key-notification-reader");
        keyNotificationItemReader.setQueueCapacity(this.notificationQueueCapacity);
        keyNotificationItemReader.setDatabase(this.database);
        keyNotificationItemReader.setKeyPattern(this.keyPattern);
        keyNotificationItemReader.setKeyType(this.keyType);
        keyNotificationItemReader.setPollTimeout(this.pollTimeout);
        return keyNotificationItemReader;
    }

    public List<T> read(Iterable<? extends K> iterable) {
        return this.operationExecutor.apply(iterable);
    }

    private StatefulRedisModulesConnection<K, V> connection() {
        return BatchUtils.connection(this.client, this.codec, this.readFrom);
    }

    protected synchronized void doClose() throws TimeoutException, InterruptedException {
        if (this.jobExecution != null) {
            Await await = Await.await();
            JobExecution jobExecution = this.jobExecution;
            Objects.requireNonNull(jobExecution);
            await.untilFalse(jobExecution::isRunning);
            this.jobExecution = null;
        }
        if (this.operationExecutor != null) {
            this.operationExecutor.close();
            this.operationExecutor = null;
        }
    }

    @Override // com.redis.spring.batch.reader.AbstractPollableItemReader
    public boolean isRunning() {
        return this.jobExecution != null && this.jobExecution.isRunning();
    }

    private KeyScanArgs scanArgs() {
        KeyScanArgs keyScanArgs = new KeyScanArgs();
        if (this.scanCount > 0) {
            keyScanArgs.limit(this.scanCount);
        }
        if (this.keyPattern != null) {
            keyScanArgs.match(this.keyPattern);
        }
        if (this.keyType != null) {
            keyScanArgs.type(this.keyType);
        }
        return keyScanArgs;
    }

    public List<T> read(int i) {
        ArrayList arrayList = new ArrayList(i);
        if (this.queue != null) {
            this.queue.drainTo(arrayList, i);
        }
        return arrayList;
    }

    @Override // com.redis.spring.batch.reader.AbstractPollableItemReader
    protected T doPoll(long j, TimeUnit timeUnit) throws InterruptedException {
        return this.queue.poll(j, timeUnit);
    }

    public static KeyComparisonItemReader<String, String> compare() {
        return compare(StringCodec.UTF8);
    }

    public static <K, V> KeyComparisonItemReader<K, V> compare(RedisCodec<K, V> redisCodec) {
        return new KeyComparisonItemReader<>(redisCodec, MemKeyValueRead.struct(redisCodec), MemKeyValueRead.struct(redisCodec));
    }

    public static KeyComparisonItemReader<String, String> compareQuick() {
        return compareQuick(StringCodec.UTF8);
    }

    public static <K, V> KeyComparisonItemReader<K, V> compareQuick(RedisCodec<K, V> redisCodec) {
        return new KeyComparisonItemReader<>(redisCodec, MemKeyValueRead.type(redisCodec), MemKeyValueRead.type(redisCodec));
    }

    public static RedisItemReader<byte[], byte[], MemKeyValue<byte[], byte[]>> dump() {
        return new RedisItemReader<>(ByteArrayCodec.INSTANCE, MemKeyValueRead.dump());
    }

    public static RedisItemReader<String, String, MemKeyValue<String, Object>> type() {
        return type(StringCodec.UTF8);
    }

    public static <K, V> RedisItemReader<K, V, MemKeyValue<K, Object>> type(RedisCodec<K, V> redisCodec) {
        return new RedisItemReader<>(redisCodec, MemKeyValueRead.type(redisCodec));
    }

    public static RedisItemReader<String, String, MemKeyValue<String, Object>> struct() {
        return struct(StringCodec.UTF8);
    }

    public static <K, V> RedisItemReader<K, V, MemKeyValue<K, Object>> struct(RedisCodec<K, V> redisCodec) {
        return new RedisItemReader<>(redisCodec, MemKeyValueRead.struct(redisCodec));
    }

    public BlockingQueue<T> getQueue() {
        return this.queue;
    }

    public ItemReader<K> getReader() {
        return this.reader;
    }

    public Operation<K, V, K, T> getOperation() {
        return this.operation;
    }

    public RedisCodec<K, V> getCodec() {
        return this.codec;
    }

    public AbstractRedisClient getClient() {
        return this.client;
    }

    public void setClient(AbstractRedisClient abstractRedisClient) {
        this.client = abstractRedisClient;
    }

    public JobFactory getJobFactory() {
        return this.jobFactory;
    }

    public void setJobFactory(JobFactory jobFactory) {
        this.jobFactory = jobFactory;
    }

    public int getPoolSize() {
        return this.poolSize;
    }

    public void setPoolSize(int i) {
        this.poolSize = i;
    }

    public ReaderMode getMode() {
        return this.mode;
    }

    public void setMode(ReaderMode readerMode) {
        this.mode = readerMode;
    }

    public int getQueueCapacity() {
        return this.queueCapacity;
    }

    public void setQueueCapacity(int i) {
        this.queueCapacity = i;
    }

    public String getKeyPattern() {
        return this.keyPattern;
    }

    public void setKeyPattern(String str) {
        this.keyPattern = str;
    }

    public String getKeyType() {
        return this.keyType;
    }

    public void setKeyType(String str) {
        this.keyType = str;
    }

    public long getScanCount() {
        return this.scanCount;
    }

    public void setScanCount(long j) {
        this.scanCount = j;
    }

    public ReadFrom getReadFrom() {
        return this.readFrom;
    }

    public void setReadFrom(ReadFrom readFrom) {
        this.readFrom = readFrom;
    }

    public int getNotificationQueueCapacity() {
        return this.notificationQueueCapacity;
    }

    public void setNotificationQueueCapacity(int i) {
        this.notificationQueueCapacity = i;
    }

    public int getDatabase() {
        return this.database;
    }

    public void setDatabase(int i) {
        this.database = i;
    }

    public JobExecution getJobExecution() {
        return this.jobExecution;
    }

    public ItemProcessor<K, K> getKeyProcessor() {
        return this.keyProcessor;
    }

    public void setKeyProcessor(ItemProcessor<K, K> itemProcessor) {
        this.keyProcessor = itemProcessor;
    }

    public int getChunkSize() {
        return this.chunkSize;
    }

    public void setChunkSize(int i) {
        this.chunkSize = i;
    }

    public Duration getFlushInterval() {
        return this.flushInterval;
    }

    public void setFlushInterval(Duration duration) {
        this.flushInterval = duration;
    }

    public Duration getIdleTimeout() {
        return this.idleTimeout;
    }

    public void setIdleTimeout(Duration duration) {
        this.idleTimeout = duration;
    }

    public int getThreads() {
        return this.threads;
    }

    public void setThreads(int i) {
        this.threads = i;
    }

    public int getSkipLimit() {
        return this.skipLimit;
    }

    public void setSkipLimit(int i) {
        this.skipLimit = i;
    }

    public int getRetryLimit() {
        return this.retryLimit;
    }

    public void setRetryLimit(int i) {
        this.retryLimit = i;
    }
}
