package com.redis.spring.batch;

import com.redis.spring.batch.common.DataType;
import com.redis.spring.batch.reader.AbstractPollableItemReader;
import com.redis.spring.batch.reader.DumpItemReader;
import com.redis.spring.batch.reader.KeyTypeItemReader;
import com.redis.spring.batch.reader.KeyspaceNotificationItemReader;
import com.redis.spring.batch.reader.StructItemReader;
import com.redis.spring.batch.step.FlushingStepBuilder;
import com.redis.spring.batch.util.Await;
import com.redis.spring.batch.util.CodecUtils;
import com.redis.spring.batch.util.ConnectionUtils;
import com.redis.spring.batch.writer.ProcessingItemWriter;
import com.redis.spring.batch.writer.QueueItemWriter;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.KeyScanArgs;
import io.lettuce.core.ReadFrom;
import io.lettuce.core.RedisCommandExecutionException;
import io.lettuce.core.RedisCommandTimeoutException;
import io.lettuce.core.ScanIterator;
import io.lettuce.core.api.sync.RedisKeyCommands;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.internal.Exceptions;
import io.micrometer.core.instrument.Metrics;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.sql.DataSource;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.hsqldb.jdbc.JDBCDataSource;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionException;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.TaskExecutorJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder;
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.function.FunctionItemProcessor;
import org.springframework.batch.item.support.IteratorItemReader;
import org.springframework.batch.item.support.SynchronizedItemReader;
import org.springframework.batch.support.transaction.ResourcelessTransactionManager;
import org.springframework.boot.autoconfigure.batch.BatchDataSourceScriptDatabaseInitializer;
import org.springframework.boot.autoconfigure.batch.BatchProperties;
import org.springframework.boot.sql.init.DatabaseInitializationMode;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.util.ClassUtils;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:com/redis/spring/batch/RedisItemReader.class */
public abstract class RedisItemReader<K, V, T> extends AbstractPollableItemReader<T> {
    public static final int DEFAULT_SKIP_LIMIT = 0;
    public static final int DEFAULT_RETRY_LIMIT = 3;
    public static final String QUEUE_METER = "redis.batch.reader.queue.size";
    public static final int DEFAULT_NOTIFICATION_QUEUE_CAPACITY = 10000;
    public static final int DEFAULT_QUEUE_CAPACITY = 10000;
    public static final int DEFAULT_THREADS = 1;
    public static final int DEFAULT_CHUNK_SIZE = 50;
    public static final String MATCH_ALL = "*";
    public static final String PUBSUB_PATTERN_FORMAT = "__keyspace@%s__:%s";
    public static final Duration DEFAULT_FLUSH_INTERVAL = KeyspaceNotificationItemReader.DEFAULT_FLUSH_INTERVAL;
    public static final Duration DEFAULT_POLL_TIMEOUT = KeyspaceNotificationItemReader.DEFAULT_POLL_TIMEOUT;
    public static final Mode DEFAULT_MODE = Mode.SCAN;
    public static final String DEFAULT_KEY_PATTERN = "*";
    private final AbstractRedisClient client;
    private final RedisCodec<K, V> codec;
    private JobRepository jobRepository;
    private PlatformTransactionManager transactionManager;
    private JobLauncher jobLauncher;
    private JobExecution jobExecution;
    private ItemReader<K> keyReader;
    private BlockingQueue<T> queue;
    private int database;
    private long scanCount;
    protected ItemProcessor<K, K> keyProcessor;
    private ReadFrom readFrom;
    private Duration idleTimeout;
    private String keyType;
    private final Log log = LogFactory.getLog(RedisItemReader.class);
    private Mode mode = DEFAULT_MODE;
    private int skipLimit = 0;
    private int retryLimit = 3;
    private List<Class<? extends Throwable>> skippableExceptions = defaultNonRetriableExceptions();
    private List<Class<? extends Throwable>> nonSkippableExceptions = defaultRetriableExceptions();
    private List<Class<? extends Throwable>> retriableExceptions = defaultRetriableExceptions();
    private List<Class<? extends Throwable>> nonRetriableExceptions = defaultNonRetriableExceptions();
    private int keyspaceNotificationQueueCapacity = 10000;
    private int threads = 1;
    private int chunkSize = 50;
    private Duration flushInterval = DEFAULT_FLUSH_INTERVAL;
    private String keyPattern = "*";
    private Duration pollTimeout = DEFAULT_POLL_TIMEOUT;
    private int queueCapacity = 10000;

    /* loaded from: input_file:com/redis/spring/batch/RedisItemReader$Mode.class */
    public enum Mode {
        SCAN,
        LIVE
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public RedisItemReader(AbstractRedisClient abstractRedisClient, RedisCodec<K, V> redisCodec) {
        setName(String.format("%s-%s", ClassUtils.getShortName(getClass()), UUID.randomUUID().toString()));
        this.client = abstractRedisClient;
        this.codec = redisCodec;
    }

    private String pubSubPattern() {
        return String.format(PUBSUB_PATTERN_FORMAT, Integer.valueOf(this.database), this.keyPattern);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.redis.spring.batch.reader.AbstractPollableItemReader
    public synchronized void doOpen() throws Exception {
        if (this.jobExecution == null) {
            if (this.transactionManager == null) {
                this.transactionManager = transactionManager();
            }
            if (this.jobRepository == null) {
                this.jobRepository = jobRepository();
            }
            if (this.jobLauncher == null) {
                this.jobLauncher = jobLauncher();
            }
            FaultTolerantStepBuilder faultTolerant = baseStep(this.jobRepository, this.transactionManager).faultTolerant();
            this.keyReader = keyReader();
            faultTolerant.reader(this.keyReader);
            faultTolerant.processor(this.keyProcessor);
            faultTolerant.writer(writer());
            faultTolerant.taskExecutor(taskExecutor());
            faultTolerant.skipLimit(this.skipLimit);
            faultTolerant.retryLimit(this.retryLimit);
            List<Class<? extends Throwable>> list = this.skippableExceptions;
            Objects.requireNonNull(faultTolerant);
            list.forEach(faultTolerant::skip);
            List<Class<? extends Throwable>> list2 = this.nonSkippableExceptions;
            Objects.requireNonNull(faultTolerant);
            list2.forEach(faultTolerant::noSkip);
            List<Class<? extends Throwable>> list3 = this.retriableExceptions;
            Objects.requireNonNull(faultTolerant);
            list3.forEach(faultTolerant::retry);
            List<Class<? extends Throwable>> list4 = this.nonRetriableExceptions;
            Objects.requireNonNull(faultTolerant);
            list4.forEach(faultTolerant::noRetry);
            this.jobExecution = this.jobLauncher.run(new JobBuilder(getName(), this.jobRepository).start(faultTolerant.build()).build(), new JobParameters());
            if (Await.await().until(() -> {
                return this.jobExecution.isRunning() || this.jobExecution.getStatus().isUnsuccessful();
            })) {
                return;
            }
            List allFailureExceptions = this.jobExecution.getAllFailureExceptions();
            if (!CollectionUtils.isEmpty(allFailureExceptions)) {
                throw new JobExecutionException("Job failed", Exceptions.unwrap((Throwable) allFailureExceptions.get(0)));
            }
        }
    }

    private TaskExecutor taskExecutor() {
        if (!isMultiThreaded()) {
            return new SyncTaskExecutor();
        }
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setMaxPoolSize(this.threads);
        threadPoolTaskExecutor.setCorePoolSize(this.threads);
        threadPoolTaskExecutor.setQueueCapacity(this.threads);
        threadPoolTaskExecutor.afterPropertiesSet();
        return threadPoolTaskExecutor;
    }

    private boolean isMultiThreaded() {
        return this.threads > 1;
    }

    private PlatformTransactionManager transactionManager() {
        return new ResourcelessTransactionManager();
    }

    private JobLauncher jobLauncher() throws Exception {
        TaskExecutorJobLauncher taskExecutorJobLauncher = new TaskExecutorJobLauncher();
        taskExecutorJobLauncher.setJobRepository(this.jobRepository);
        taskExecutorJobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
        taskExecutorJobLauncher.afterPropertiesSet();
        return taskExecutorJobLauncher;
    }

    private JobRepository jobRepository() throws Exception {
        JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
        jobRepositoryFactoryBean.setDataSource(dataSource());
        jobRepositoryFactoryBean.setTransactionManager(this.transactionManager);
        jobRepositoryFactoryBean.afterPropertiesSet();
        return jobRepositoryFactoryBean.getObject();
    }

    private DataSource dataSource() throws Exception {
        JDBCDataSource jDBCDataSource = new JDBCDataSource();
        jDBCDataSource.setURL("jdbc:hsqldb:mem:" + getName());
        BatchProperties.Jdbc jdbc = new BatchProperties.Jdbc();
        jdbc.setInitializeSchema(DatabaseInitializationMode.ALWAYS);
        BatchDataSourceScriptDatabaseInitializer batchDataSourceScriptDatabaseInitializer = new BatchDataSourceScriptDatabaseInitializer(jDBCDataSource, jdbc);
        batchDataSourceScriptDatabaseInitializer.afterPropertiesSet();
        batchDataSourceScriptDatabaseInitializer.initializeDatabase();
        return jDBCDataSource;
    }

    private KeyScanArgs scanArgs() {
        KeyScanArgs keyScanArgs = new KeyScanArgs();
        if (this.scanCount > 0) {
            keyScanArgs.limit(this.scanCount);
        }
        if (this.keyPattern != null) {
            keyScanArgs.match(this.keyPattern);
        }
        if (this.keyType != null) {
            keyScanArgs.type(this.keyType);
        }
        return keyScanArgs;
    }

    public ItemReader<K> keyReader() {
        if (!isLive()) {
            IteratorItemReader iteratorItemReader = new IteratorItemReader(ScanIterator.scan((RedisKeyCommands) ConnectionUtils.sync(ConnectionUtils.connection(this.client, this.codec, this.readFrom)), scanArgs()));
            return isMultiThreaded() ? new SynchronizedItemReader(iteratorItemReader) : iteratorItemReader;
        }
        KeyspaceNotificationItemReader keyspaceNotificationItemReader = new KeyspaceNotificationItemReader(this.client, this.codec, pubSubPattern());
        keyspaceNotificationItemReader.setName(getName() + "-keyspace-notification-reader");
        keyspaceNotificationItemReader.setKeyType(this.keyType);
        keyspaceNotificationItemReader.setPollTimeout(this.pollTimeout);
        keyspaceNotificationItemReader.setQueueCapacity(this.keyspaceNotificationQueueCapacity);
        return keyspaceNotificationItemReader;
    }

    private ProcessingItemWriter<K, T> writer() {
        this.queue = new LinkedBlockingQueue(this.queueCapacity);
        Metrics.globalRegistry.gaugeCollectionSize(QUEUE_METER, Collections.emptyList(), this.queue);
        return new ProcessingItemWriter<>(new FunctionItemProcessor(this::values), new QueueItemWriter(this.queue));
    }

    private SimpleStepBuilder<K, K> baseStep(JobRepository jobRepository, PlatformTransactionManager platformTransactionManager) {
        SimpleStepBuilder<K, K> chunk = new StepBuilder(getName(), jobRepository).chunk(this.chunkSize, platformTransactionManager);
        if (!isLive()) {
            return chunk;
        }
        FlushingStepBuilder flushingStepBuilder = new FlushingStepBuilder(chunk);
        flushingStepBuilder.interval(this.flushInterval);
        flushingStepBuilder.idleTimeout(this.idleTimeout);
        return flushingStepBuilder;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.redis.spring.batch.reader.AbstractPollableItemReader
    public synchronized void doClose() throws Exception {
        if (this.jobExecution != null) {
            Await await = Await.await();
            JobExecution jobExecution = this.jobExecution;
            Objects.requireNonNull(jobExecution);
            await.untilFalse(jobExecution::isRunning);
            if (!this.queue.isEmpty()) {
                this.log.warn(String.format("%s queue still contains %,d elements", getName(), Integer.valueOf(this.queue.size())));
            }
            this.jobExecution = null;
        }
    }

    @Override // com.redis.spring.batch.reader.AbstractPollableItemReader
    protected T doRead() throws Exception {
        T poll;
        do {
            poll = poll(this.pollTimeout.toMillis(), TimeUnit.MILLISECONDS);
            if (poll != null) {
                break;
            }
        } while (this.jobExecution.isRunning());
        return poll;
    }

    protected abstract Chunk<T> values(Chunk<? extends K> chunk);

    public List<T> read(int i) {
        ArrayList arrayList = new ArrayList(i);
        this.queue.drainTo(arrayList, i);
        return arrayList;
    }

    @Override // com.redis.spring.batch.reader.AbstractPollableItemReader
    protected T doPoll(long j, TimeUnit timeUnit) throws InterruptedException {
        return this.queue.poll(j, timeUnit);
    }

    public BlockingQueue<T> getQueue() {
        return this.queue;
    }

    public boolean isLive() {
        return this.mode == Mode.LIVE;
    }

    public AbstractRedisClient getClient() {
        return this.client;
    }

    public RedisCodec<K, V> getCodec() {
        return this.codec;
    }

    public Mode getMode() {
        return this.mode;
    }

    public void setScanCount(long j) {
        this.scanCount = j;
    }

    public Duration getFlushInterval() {
        return this.flushInterval;
    }

    public void setFlushInterval(Duration duration) {
        this.flushInterval = duration;
    }

    public Duration getIdleTimeout() {
        return this.idleTimeout;
    }

    public void setIdleTimeout(Duration duration) {
        this.idleTimeout = duration;
    }

    public ItemProcessor<K, K> getKeyProcessor() {
        return this.keyProcessor;
    }

    public void setKeyProcessor(ItemProcessor<K, K> itemProcessor) {
        this.keyProcessor = itemProcessor;
    }

    public void setThreads(int i) {
        this.threads = i;
    }

    public void setChunkSize(int i) {
        this.chunkSize = i;
    }

    public void setQueueCapacity(int i) {
        this.queueCapacity = i;
    }

    public void setMode(Mode mode) {
        this.mode = mode;
    }

    public void setReadFrom(ReadFrom readFrom) {
        this.readFrom = readFrom;
    }

    public void setKeyPattern(String str) {
        this.keyPattern = str;
    }

    public void setKeyType(DataType dataType) {
        setKeyType(dataType == null ? null : dataType.getString());
    }

    public void setKeyType(String str) {
        this.keyType = str;
    }

    public int getDatabase() {
        return this.database;
    }

    public Duration getPollTimeout() {
        return this.pollTimeout;
    }

    public void setPollTimeout(Duration duration) {
        this.pollTimeout = duration;
    }

    public int getKeyspaceNotificationQueueCapacity() {
        return this.keyspaceNotificationQueueCapacity;
    }

    public long getScanCount() {
        return this.scanCount;
    }

    public ReadFrom getReadFrom() {
        return this.readFrom;
    }

    public int getThreads() {
        return this.threads;
    }

    public int getChunkSize() {
        return this.chunkSize;
    }

    public int getQueueCapacity() {
        return this.queueCapacity;
    }

    public String getKeyPattern() {
        return this.keyPattern;
    }

    public String getKeyType() {
        return this.keyType;
    }

    public void setKeyspaceNotificationQueueCapacity(int i) {
        this.keyspaceNotificationQueueCapacity = i;
    }

    public void setDatabase(int i) {
        this.database = i;
    }

    public ItemReader<K> getKeyReader() {
        return this.keyReader;
    }

    public void setJobRepository(JobRepository jobRepository) {
        this.jobRepository = jobRepository;
    }

    public void setTransactionManager(PlatformTransactionManager platformTransactionManager) {
        this.transactionManager = platformTransactionManager;
    }

    public void setJobLauncher(JobLauncher jobLauncher) {
        this.jobLauncher = jobLauncher;
    }

    public static DumpItemReader dump(AbstractRedisClient abstractRedisClient) {
        return new DumpItemReader(abstractRedisClient);
    }

    public static StructItemReader<String, String> struct(AbstractRedisClient abstractRedisClient) {
        return struct(abstractRedisClient, CodecUtils.STRING_CODEC);
    }

    public static <K, V> StructItemReader<K, V> struct(AbstractRedisClient abstractRedisClient, RedisCodec<K, V> redisCodec) {
        return new StructItemReader<>(abstractRedisClient, redisCodec);
    }

    public static List<Class<? extends Throwable>> defaultRetriableExceptions() {
        return modifiableList(RedisCommandTimeoutException.class);
    }

    public static List<Class<? extends Throwable>> defaultNonRetriableExceptions() {
        return modifiableList(RedisCommandExecutionException.class);
    }

    private static <T> List<T> modifiableList(T... tArr) {
        return new ArrayList(Arrays.asList(tArr));
    }

    public static KeyTypeItemReader<String, String> type(AbstractRedisClient abstractRedisClient) {
        return type(abstractRedisClient, CodecUtils.STRING_CODEC);
    }

    public static <K, V> KeyTypeItemReader<K, V> type(AbstractRedisClient abstractRedisClient, RedisCodec<K, V> redisCodec) {
        return new KeyTypeItemReader<>(abstractRedisClient, redisCodec);
    }
}
