package com.redis.spring.batch;

import com.redis.spring.batch.common.DataType;
import com.redis.spring.batch.reader.DumpItemReader;
import com.redis.spring.batch.reader.KeyItemReader;
import com.redis.spring.batch.reader.KeyTypeItemReader;
import com.redis.spring.batch.reader.KeyspaceNotificationItemReader;
import com.redis.spring.batch.reader.PollableItemReader;
import com.redis.spring.batch.reader.ScanKeyItemReader;
import com.redis.spring.batch.reader.StructItemReader;
import com.redis.spring.batch.step.FlushingChunkProvider;
import com.redis.spring.batch.step.FlushingStepBuilder;
import com.redis.spring.batch.util.Await;
import com.redis.spring.batch.util.BatchUtils;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.ReadFrom;
import io.lettuce.core.RedisCommandExecutionException;
import io.lettuce.core.RedisCommandTimeoutException;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.codec.StringCodec;
import io.micrometer.core.instrument.Metrics;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionException;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean;
import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder;
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.support.AbstractItemStreamItemReader;
import org.springframework.batch.item.support.AbstractItemStreamItemWriter;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.util.ClassUtils;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:com/redis/spring/batch/RedisItemReader.class */
public abstract class RedisItemReader<K, V, T> extends AbstractItemStreamItemReader<T> implements PollableItemReader<T> {
    public static final String QUEUE_METER = "redis.batch.reader.queue.size";
    public static final int DEFAULT_QUEUE_CAPACITY = 10000;
    public static final int DEFAULT_THREADS = 1;
    public static final int DEFAULT_CHUNK_SIZE = 50;
    public static final int DEFAULT_NOTIFICATION_QUEUE_CAPACITY = 10000;
    public static final int DEFAULT_SKIP_LIMIT = 0;
    public static final int DEFAULT_RETRY_LIMIT = 3;
    private final AbstractRedisClient client;
    private final RedisCodec<K, V> codec;
    private int database;
    private Duration idleTimeout;
    private long scanCount;
    private ItemProcessor<K, K> keyProcessor;
    private ReadFrom readFrom;
    private String keyPattern;
    private DataType keyType;
    private JobRepository jobRepository;
    private PlatformTransactionManager transactionManager;
    private JobBuilderFactory jobBuilderFactory;
    private StepBuilderFactory stepBuilderFactory;
    private SimpleJobLauncher jobLauncher;
    private String name;
    private JobExecution jobExecution;
    private KeyItemReader<K> keyReader;
    private BlockingQueue<T> queue;
    public static final Duration DEFAULT_FLUSH_INTERVAL = FlushingChunkProvider.DEFAULT_FLUSH_INTERVAL;
    public static final Duration DEFAULT_POLL_TIMEOUT = Duration.ofMillis(10);
    public static final KeyspaceNotificationItemReader.OrderingStrategy DEFAULT_ORDERING = KeyspaceNotificationItemReader.OrderingStrategy.PRIORITY;
    public static final ReaderMode DEFAULT_MODE = ReaderMode.SCAN;
    private static final Duration DEFAULT_OPEN_TIMEOUT = Duration.ofSeconds(3);
    private ReaderMode mode = DEFAULT_MODE;
    private int skipLimit = 0;
    private int retryLimit = 3;
    private List<Class<? extends Throwable>> skippableExceptions = defaultNonRetriableExceptions();
    private List<Class<? extends Throwable>> nonSkippableExceptions = defaultRetriableExceptions();
    private List<Class<? extends Throwable>> retriableExceptions = defaultRetriableExceptions();
    private List<Class<? extends Throwable>> nonRetriableExceptions = defaultNonRetriableExceptions();
    private KeyspaceNotificationItemReader.OrderingStrategy orderingStrategy = DEFAULT_ORDERING;
    private int notificationQueueCapacity = 10000;
    private Duration flushInterval = DEFAULT_FLUSH_INTERVAL;
    private int threads = 1;
    private int chunkSize = 50;
    private int queueCapacity = 10000;
    private Duration pollTimeout = DEFAULT_POLL_TIMEOUT;
    private Duration openTimeout = DEFAULT_OPEN_TIMEOUT;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/redis/spring/batch/RedisItemReader$ProcessingItemWriter.class */
    public class ProcessingItemWriter extends AbstractItemStreamItemWriter<K> {
        private final ItemProcessor<List<? extends K>, List<T>> processor;

        public ProcessingItemWriter(ItemProcessor<List<? extends K>, List<T>> itemProcessor) {
            this.processor = itemProcessor;
        }

        public void open(ExecutionContext executionContext) {
            if (this.processor instanceof ItemStream) {
                this.processor.open(executionContext);
            }
            super.open(executionContext);
        }

        public void update(ExecutionContext executionContext) {
            if (this.processor instanceof ItemStream) {
                this.processor.update(executionContext);
            }
            super.update(executionContext);
        }

        public void close() {
            if (this.processor instanceof ItemStream) {
                this.processor.close();
            }
            super.close();
        }

        public void write(List<? extends K> list) throws Exception {
            List list2 = (List) this.processor.process(list);
            if (list2 != null) {
                Iterator it = list2.iterator();
                while (it.hasNext()) {
                    RedisItemReader.this.queue.put(it.next());
                }
            }
        }
    }

    /* loaded from: input_file:com/redis/spring/batch/RedisItemReader$ReaderMode.class */
    public enum ReaderMode {
        SCAN,
        LIVE
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public RedisItemReader(AbstractRedisClient abstractRedisClient, RedisCodec<K, V> redisCodec) {
        setName(ClassUtils.getShortName(getClass()));
        this.client = abstractRedisClient;
        this.codec = redisCodec;
    }

    public AbstractRedisClient getClient() {
        return this.client;
    }

    public RedisCodec<K, V> getCodec() {
        return this.codec;
    }

    public ReaderMode getMode() {
        return this.mode;
    }

    public void addSkippableException(Class<? extends Throwable> cls) {
        this.skippableExceptions.add(cls);
    }

    public void addNonSkippableException(Class<? extends Throwable> cls) {
        this.nonSkippableExceptions.add(cls);
    }

    public void addRetriableException(Class<? extends Throwable> cls) {
        this.retriableExceptions.add(cls);
    }

    public void addNonRetriableException(Class<? extends Throwable> cls) {
        this.nonRetriableExceptions.add(cls);
    }

    public void setRetryLimit(int i) {
        this.retryLimit = i;
    }

    public void setSkipLimit(int i) {
        this.skipLimit = i;
    }

    public void setScanCount(long j) {
        this.scanCount = j;
    }

    public void setOpenTimeout(Duration duration) {
        this.openTimeout = duration;
    }

    public void setJobRepository(JobRepository jobRepository) {
        this.jobRepository = jobRepository;
    }

    public void setTransactionManager(PlatformTransactionManager platformTransactionManager) {
        this.transactionManager = platformTransactionManager;
    }

    public void setKeyProcessor(ItemProcessor<K, K> itemProcessor) {
        this.keyProcessor = itemProcessor;
    }

    public void setThreads(int i) {
        this.threads = i;
    }

    public void setChunkSize(int i) {
        this.chunkSize = i;
    }

    public void setQueueCapacity(int i) {
        this.queueCapacity = i;
    }

    public void setMode(ReaderMode readerMode) {
        this.mode = readerMode;
    }

    public void setReadFrom(ReadFrom readFrom) {
        this.readFrom = readFrom;
    }

    public void setKeyPattern(String str) {
        this.keyPattern = str;
    }

    public void setKeyType(DataType dataType) {
        this.keyType = dataType;
    }

    public void setPollTimeout(Duration duration) {
        this.pollTimeout = duration;
    }

    public void setIdleTimeout(Duration duration) {
        this.idleTimeout = duration;
    }

    public List<Class<? extends Throwable>> getRetriableExceptions() {
        return this.retriableExceptions;
    }

    public void setRetriableExceptions(List<Class<? extends Throwable>> list) {
        this.retriableExceptions = list;
    }

    public List<Class<? extends Throwable>> getNonRetriableExceptions() {
        return this.nonRetriableExceptions;
    }

    public void setNonRetriableExceptions(List<Class<? extends Throwable>> list) {
        this.nonRetriableExceptions = list;
    }

    public int getRetryLimit() {
        return this.retryLimit;
    }

    public int getSkipLimit() {
        return this.skipLimit;
    }

    public int getDatabase() {
        return this.database;
    }

    public KeyspaceNotificationItemReader.OrderingStrategy getOrderingStrategy() {
        return this.orderingStrategy;
    }

    public int getNotificationQueueCapacity() {
        return this.notificationQueueCapacity;
    }

    public Duration getFlushInterval() {
        return this.flushInterval;
    }

    public Duration getIdleTimeout() {
        return this.idleTimeout;
    }

    public long getScanCount() {
        return this.scanCount;
    }

    public ReadFrom getReadFrom() {
        return this.readFrom;
    }

    public int getThreads() {
        return this.threads;
    }

    public int getChunkSize() {
        return this.chunkSize;
    }

    public int getQueueCapacity() {
        return this.queueCapacity;
    }

    public String getKeyPattern() {
        return this.keyPattern;
    }

    public DataType getKeyType() {
        return this.keyType;
    }

    public Duration getPollTimeout() {
        return this.pollTimeout;
    }

    public Duration getOpenTimeout() {
        return this.openTimeout;
    }

    public void setFlushInterval(Duration duration) {
        this.flushInterval = duration;
    }

    public void setNotificationQueueCapacity(int i) {
        this.notificationQueueCapacity = i;
    }

    public void setDatabase(int i) {
        this.database = i;
    }

    public void setOrderingStrategy(KeyspaceNotificationItemReader.OrderingStrategy orderingStrategy) {
        this.orderingStrategy = orderingStrategy;
    }

    public void setName(String str) {
        super.setName(str);
        this.name = str;
    }

    public ItemReader<K> getKeyReader() {
        return this.keyReader;
    }

    public synchronized void open(ExecutionContext executionContext) {
        super.open(executionContext);
        if (isOpen()) {
            return;
        }
        initializeJobInfrastructure();
        this.jobExecution = launch(this.jobBuilderFactory.get(this.name).start(step().build()).build());
    }

    private JobExecution launch(Job job) {
        try {
            JobExecution run = this.jobLauncher.run(job, new JobParameters());
            try {
                if (!new Await().await(() -> {
                    return isRunning(run);
                }, this.openTimeout)) {
                    throw new ItemStreamException("Timeout waiting for job to run");
                }
                if (!run.getStatus().isUnsuccessful()) {
                    return run;
                }
                if (CollectionUtils.isEmpty(run.getAllFailureExceptions())) {
                    throw new ItemStreamException("Could not run job");
                }
                throw new ItemStreamException("Could not run job", (Throwable) run.getAllFailureExceptions().get(0));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new ItemStreamException("Interruped while waiting for job to start", e);
            }
        } catch (JobExecutionException e2) {
            throw new ItemStreamException("Job execution failed", e2);
        }
    }

    private boolean isRunning(JobExecution jobExecution) {
        return (jobExecution.isRunning() && this.keyReader.isOpen()) || jobExecution.getStatus().isUnsuccessful() || jobExecution.getStatus().equals(BatchStatus.COMPLETED);
    }

    public synchronized void close() {
        super.close();
        if (isOpen()) {
            this.jobExecution = null;
        }
    }

    public synchronized T read() throws Exception {
        T poll;
        do {
            poll = poll(this.pollTimeout.toMillis(), TimeUnit.MILLISECONDS);
            if (poll != null) {
                break;
            }
        } while (this.keyReader.isOpen());
        return poll;
    }

    @Override // com.redis.spring.batch.reader.PollableItemReader
    public T poll(long j, TimeUnit timeUnit) throws InterruptedException {
        return this.queue.poll(j, timeUnit);
    }

    private void initializeJobInfrastructure() {
        if (this.jobRepository == null || this.transactionManager == null) {
            MapJobRepositoryFactoryBean mapJobRepositoryFactoryBean = new MapJobRepositoryFactoryBean();
            try {
                this.jobRepository = mapJobRepositoryFactoryBean.getObject();
                this.transactionManager = mapJobRepositoryFactoryBean.getTransactionManager();
            } catch (Exception e) {
                throw new ItemStreamException("Could not initialize job repository");
            }
        }
        this.jobBuilderFactory = new JobBuilderFactory(this.jobRepository);
        this.stepBuilderFactory = new StepBuilderFactory(this.jobRepository, this.transactionManager);
        this.jobLauncher = new SimpleJobLauncher();
        this.jobLauncher.setJobRepository(this.jobRepository);
        this.jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
    }

    private SimpleStepBuilder<K, K> step() {
        FaultTolerantStepBuilder faultTolerant = simpleStep().faultTolerant();
        this.keyReader = keyReader();
        faultTolerant.reader(this.keyReader);
        faultTolerant.processor(this.keyProcessor);
        faultTolerant.writer(writer());
        if (this.threads > 1) {
            faultTolerant.taskExecutor(BatchUtils.threadPoolTaskExecutor(this.threads));
            faultTolerant.throttleLimit(this.threads);
        }
        faultTolerant.skipLimit(this.skipLimit);
        faultTolerant.retryLimit(this.retryLimit);
        List<Class<? extends Throwable>> list = this.skippableExceptions;
        Objects.requireNonNull(faultTolerant);
        list.forEach(faultTolerant::skip);
        List<Class<? extends Throwable>> list2 = this.nonSkippableExceptions;
        Objects.requireNonNull(faultTolerant);
        list2.forEach(faultTolerant::noSkip);
        List<Class<? extends Throwable>> list3 = this.retriableExceptions;
        Objects.requireNonNull(faultTolerant);
        list3.forEach(faultTolerant::retry);
        List<Class<? extends Throwable>> list4 = this.nonRetriableExceptions;
        Objects.requireNonNull(faultTolerant);
        list4.forEach(faultTolerant::noRetry);
        return faultTolerant;
    }

    private SimpleStepBuilder<K, K> simpleStep() {
        SimpleStepBuilder<K, K> chunk = this.stepBuilderFactory.get(this.name).chunk(this.chunkSize);
        if (!isLive()) {
            return chunk;
        }
        FlushingStepBuilder flushingStepBuilder = new FlushingStepBuilder(chunk);
        flushingStepBuilder.interval(this.flushInterval);
        flushingStepBuilder.idleTimeout(this.idleTimeout);
        return flushingStepBuilder;
    }

    private RedisItemReader<K, V, T>.ProcessingItemWriter writer() {
        this.queue = new LinkedBlockingQueue(this.queueCapacity);
        Metrics.globalRegistry.gaugeCollectionSize(QUEUE_METER, Collections.emptyList(), this.queue);
        return new ProcessingItemWriter(processor());
    }

    protected abstract ItemProcessor<List<? extends K>, List<T>> processor();

    private KeyItemReader<K> keyReader() {
        return isLive() ? keyspaceNotificationReader() : scanKeyReader();
    }

    private KeyspaceNotificationItemReader<K> keyspaceNotificationReader() {
        KeyspaceNotificationItemReader<K> keyspaceNotificationItemReader = new KeyspaceNotificationItemReader<>(this.client, this.codec);
        keyspaceNotificationItemReader.setDatabase(this.database);
        keyspaceNotificationItemReader.setKeyPattern(this.keyPattern);
        keyspaceNotificationItemReader.setKeyType(this.keyType);
        keyspaceNotificationItemReader.setOrderingStrategy(this.orderingStrategy);
        keyspaceNotificationItemReader.setQueueCapacity(this.notificationQueueCapacity);
        keyspaceNotificationItemReader.setPollTimeout(this.pollTimeout);
        return keyspaceNotificationItemReader;
    }

    public ScanKeyItemReader<K> scanKeyReader() {
        ScanKeyItemReader<K> scanKeyItemReader = new ScanKeyItemReader<>(this.client, this.codec);
        scanKeyItemReader.setReadFrom(this.readFrom);
        scanKeyItemReader.setLimit(this.scanCount);
        scanKeyItemReader.setMatch(this.keyPattern);
        scanKeyItemReader.setType(this.keyType == null ? null : this.keyType.getString());
        return scanKeyItemReader;
    }

    public boolean isLive() {
        return this.mode == ReaderMode.LIVE;
    }

    public synchronized boolean isOpen() {
        return this.jobExecution != null;
    }

    public static DumpItemReader dump(AbstractRedisClient abstractRedisClient) {
        return new DumpItemReader(abstractRedisClient);
    }

    public static StructItemReader<String, String> struct(AbstractRedisClient abstractRedisClient) {
        return struct(abstractRedisClient, StringCodec.UTF8);
    }

    public static <K, V> StructItemReader<K, V> struct(AbstractRedisClient abstractRedisClient, RedisCodec<K, V> redisCodec) {
        return new StructItemReader<>(abstractRedisClient, redisCodec);
    }

    public static List<Class<? extends Throwable>> defaultRetriableExceptions() {
        return modifiableList(RedisCommandTimeoutException.class);
    }

    public static List<Class<? extends Throwable>> defaultNonRetriableExceptions() {
        return modifiableList(RedisCommandExecutionException.class);
    }

    private static <T> List<T> modifiableList(T... tArr) {
        return new ArrayList(Arrays.asList(tArr));
    }

    public static KeyTypeItemReader<String, String> type(AbstractRedisClient abstractRedisClient) {
        return type(abstractRedisClient, StringCodec.UTF8);
    }

    public static <K, V> KeyTypeItemReader<K, V> type(AbstractRedisClient abstractRedisClient, RedisCodec<K, V> redisCodec) {
        return new KeyTypeItemReader<>(abstractRedisClient, redisCodec);
    }
}
