package com.redis.spring.batch;

import com.redis.spring.batch.common.DataStructure;
import com.redis.spring.batch.common.JobRunner;
import com.redis.spring.batch.common.KeyDump;
import com.redis.spring.batch.common.Utils;
import com.redis.spring.batch.reader.DataStructureValueReader;
import com.redis.spring.batch.reader.KeyComparison;
import com.redis.spring.batch.reader.KeyComparisonValueReader;
import com.redis.spring.batch.reader.KeyDumpValueReader;
import com.redis.spring.batch.reader.LiveReaderBuilder;
import com.redis.spring.batch.reader.ReaderOptions;
import com.redis.spring.batch.reader.ScanReaderBuilder;
import com.redis.spring.batch.reader.StreamReaderBuilder;
import io.lettuce.core.Consumer;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.micrometer.core.instrument.Tag;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionException;
import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder;
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.core.step.skip.SkipPolicy;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemStreamSupport;
import org.springframework.batch.item.support.AbstractItemStreamItemReader;
import org.springframework.batch.item.support.AbstractItemStreamItemWriter;
import org.springframework.core.convert.converter.Converter;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.ClassUtils;

/* loaded from: input_file:com/redis/spring/batch/RedisItemReader.class */
public class RedisItemReader<K, T> extends AbstractItemStreamItemReader<T> {
    protected final ItemReader<K> keyReader;
    private final ItemProcessor<List<? extends K>, List<T>> valueReader;
    protected final BlockingQueue<T> queue;
    private final JobRunner jobRunner;
    protected final ReaderOptions options;
    private JobExecution jobExecution;
    private String name;
    private final Log log = LogFactory.getLog(getClass());
    private final RedisItemReader<K, T>.Enqueuer enqueuer = new Enqueuer();
    private final AtomicInteger runningThreads = new AtomicInteger();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/redis/spring/batch/RedisItemReader$Enqueuer.class */
    public class Enqueuer extends AbstractItemStreamItemWriter<K> {
        private Enqueuer() {
        }

        public void open(ExecutionContext executionContext) {
            Utils.createGaugeCollectionSize("reader.queue.size", RedisItemReader.this.queue, new Tag[0]);
            super.open(executionContext);
            if (RedisItemReader.this.valueReader instanceof ItemStream) {
                RedisItemReader.this.valueReader.open(executionContext);
            }
        }

        public void update(ExecutionContext executionContext) {
            super.update(executionContext);
            if (RedisItemReader.this.valueReader instanceof ItemStream) {
                RedisItemReader.this.valueReader.update(executionContext);
            }
        }

        public void close() {
            if (!RedisItemReader.this.queue.isEmpty()) {
                RedisItemReader.this.log.warn("Closing with items still in queue");
            }
            if (RedisItemReader.this.valueReader instanceof ItemStream) {
                RedisItemReader.this.valueReader.close();
            }
            super.close();
        }

        public void write(List<? extends K> list) throws Exception {
            Iterator it = ((List) RedisItemReader.this.valueReader.process(list)).iterator();
            while (it.hasNext()) {
                RedisItemReader.this.queue.put(it.next());
            }
        }
    }

    public RedisItemReader(ItemReader<K> itemReader, ItemProcessor<List<? extends K>, List<T>> itemProcessor, JobRunner jobRunner, ReaderOptions readerOptions) {
        setName(ClassUtils.getShortName(getClass()));
        this.keyReader = itemReader;
        this.valueReader = itemProcessor;
        this.queue = new LinkedBlockingQueue(readerOptions.getQueueOptions().getCapacity());
        this.jobRunner = jobRunner;
        this.options = readerOptions;
    }

    /* renamed from: getKeyReader */
    public ItemReader<K> mo16getKeyReader() {
        return this.keyReader;
    }

    public void setName(String str) {
        this.name = str;
        super.setName(str);
    }

    public void open(ExecutionContext executionContext) throws ItemStreamException {
        synchronized (this.runningThreads) {
            if (this.jobExecution == null) {
                doOpen();
            }
            this.runningThreads.incrementAndGet();
            super.open(executionContext);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void doOpen() {
        FaultTolerantStepBuilder faultTolerant = createStep().faultTolerant();
        List<Class<? extends Throwable>> skip = this.options.getSkip();
        Objects.requireNonNull(faultTolerant);
        skip.forEach(faultTolerant::skip);
        List<Class<? extends Throwable>> noSkip = this.options.getNoSkip();
        Objects.requireNonNull(faultTolerant);
        noSkip.forEach(faultTolerant::noSkip);
        faultTolerant.skipLimit(this.options.getSkipLimit());
        Optional<SkipPolicy> skipPolicy = this.options.getSkipPolicy();
        Objects.requireNonNull(faultTolerant);
        skipPolicy.ifPresent(faultTolerant::skipPolicy);
        if (this.options.getThreads() > 1) {
            ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
            threadPoolTaskExecutor.setMaxPoolSize(this.options.getThreads());
            threadPoolTaskExecutor.setCorePoolSize(this.options.getThreads());
            threadPoolTaskExecutor.afterPropertiesSet();
            faultTolerant.taskExecutor(threadPoolTaskExecutor).throttleLimit(this.options.getThreads());
        }
        try {
            this.jobExecution = this.jobRunner.runAsync(this.jobRunner.job(this.name).start(faultTolerant.build()).build());
        } catch (JobExecutionException e) {
            throw new ItemStreamException(String.format("Could not run job %s", this.name), e);
        }
    }

    private T poll() throws InterruptedException {
        return this.queue.poll(this.options.getQueueOptions().getPollTimeout().toMillis(), TimeUnit.MILLISECONDS);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SimpleStepBuilder<K, K> createStep() {
        if (this.keyReader instanceof ItemStreamSupport) {
            this.keyReader.setName(this.name + "-reader");
        }
        this.enqueuer.setName(this.name + "-writer");
        return this.jobRunner.step(this.name).chunk(this.options.getChunkSize()).reader(this.keyReader).writer(this.enqueuer);
    }

    public T read() throws Exception {
        T poll;
        do {
            poll = poll();
            if (poll != null || !this.jobExecution.isRunning()) {
                break;
            }
        } while (!this.jobExecution.getStatus().isUnsuccessful());
        return poll;
    }

    public List<T> read(int i) {
        ArrayList arrayList = new ArrayList(i);
        this.queue.drainTo(arrayList, i);
        return arrayList;
    }

    public void close() {
        super.close();
        if (this.runningThreads.decrementAndGet() > 0) {
            return;
        }
        synchronized (this.runningThreads) {
            this.jobRunner.awaitTermination(this.jobExecution);
            this.enqueuer.close();
            this.jobExecution = null;
        }
    }

    public boolean isOpen() {
        return this.jobExecution != null;
    }

    public static ScanReaderBuilder<String, String, KeyComparison<String>> comparator(JobRunner jobRunner, GenericObjectPool<StatefulConnection<String, String>> genericObjectPool, GenericObjectPool<StatefulConnection<String, String>> genericObjectPool2, Duration duration) {
        return new ScanReaderBuilder<>(genericObjectPool, jobRunner, new KeyComparisonValueReader(genericObjectPool, genericObjectPool2, duration));
    }

    public static <K, V> ScanReaderBuilder<K, V, DataStructure<K>> dataStructure(GenericObjectPool<StatefulConnection<K, V>> genericObjectPool, JobRunner jobRunner) {
        return new ScanReaderBuilder<>(genericObjectPool, jobRunner, new DataStructureValueReader(genericObjectPool));
    }

    public static <K, V> ScanReaderBuilder<K, V, KeyDump<K>> keyDump(GenericObjectPool<StatefulConnection<K, V>> genericObjectPool, JobRunner jobRunner) {
        return new ScanReaderBuilder<>(genericObjectPool, jobRunner, new KeyDumpValueReader(genericObjectPool));
    }

    public static <K, V> StreamReaderBuilder<K, V> stream(GenericObjectPool<StatefulConnection<K, V>> genericObjectPool, K k, Consumer<K> consumer) {
        return new StreamReaderBuilder<>(genericObjectPool, k, consumer);
    }

    public static LiveReaderBuilder<String, String, DataStructure<String>> liveDataStructure(GenericObjectPool<StatefulConnection<String, String>> genericObjectPool, JobRunner jobRunner, StatefulRedisPubSubConnection<String, String> statefulRedisPubSubConnection, String... strArr) {
        return liveDataStructure(genericObjectPool, jobRunner, statefulRedisPubSubConnection, 0, strArr);
    }

    public static LiveReaderBuilder<String, String, DataStructure<String>> liveDataStructure(GenericObjectPool<StatefulConnection<String, String>> genericObjectPool, JobRunner jobRunner, StatefulRedisPubSubConnection<String, String> statefulRedisPubSubConnection, int i, String... strArr) {
        return liveDataStructure(genericObjectPool, jobRunner, statefulRedisPubSubConnection, LiveReaderBuilder.pubSubPatterns(i, strArr), LiveReaderBuilder.STRING_KEY_EXTRACTOR);
    }

    public static <K, V> LiveReaderBuilder<K, V, DataStructure<K>> liveDataStructure(GenericObjectPool<StatefulConnection<K, V>> genericObjectPool, JobRunner jobRunner, StatefulRedisPubSubConnection<K, V> statefulRedisPubSubConnection, K[] kArr, Converter<K, K> converter) {
        return new LiveReaderBuilder<>(jobRunner, new DataStructureValueReader(genericObjectPool), statefulRedisPubSubConnection, kArr, converter);
    }

    public static <K, V> LiveReaderBuilder<K, V, DataStructure<K>> liveDataStructure(GenericObjectPool<StatefulConnection<K, V>> genericObjectPool, JobRunner jobRunner, StatefulRedisPubSubConnection<K, V> statefulRedisPubSubConnection, RedisCodec<K, V> redisCodec, int i, String... strArr) {
        return new LiveReaderBuilder<>(jobRunner, new DataStructureValueReader(genericObjectPool), statefulRedisPubSubConnection, LiveReaderBuilder.pubSubPatterns(redisCodec, i, strArr), LiveReaderBuilder.keyExtractor(redisCodec));
    }

    public static <K, V> LiveReaderBuilder<K, V, KeyDump<K>> liveKeyDump(GenericObjectPool<StatefulConnection<K, V>> genericObjectPool, JobRunner jobRunner, StatefulRedisPubSubConnection<K, V> statefulRedisPubSubConnection, RedisCodec<K, V> redisCodec, int i, String... strArr) {
        return new LiveReaderBuilder<>(jobRunner, new KeyDumpValueReader(genericObjectPool), statefulRedisPubSubConnection, LiveReaderBuilder.pubSubPatterns(redisCodec, i, strArr), LiveReaderBuilder.keyExtractor(redisCodec));
    }

    public static LiveReaderBuilder<String, String, KeyDump<String>> liveKeyDump(GenericObjectPool<StatefulConnection<String, String>> genericObjectPool, JobRunner jobRunner, StatefulRedisPubSubConnection<String, String> statefulRedisPubSubConnection, String... strArr) {
        return liveKeyDump(genericObjectPool, jobRunner, statefulRedisPubSubConnection, 0, strArr);
    }

    public static LiveReaderBuilder<String, String, KeyDump<String>> liveKeyDump(GenericObjectPool<StatefulConnection<String, String>> genericObjectPool, JobRunner jobRunner, StatefulRedisPubSubConnection<String, String> statefulRedisPubSubConnection, int i, String... strArr) {
        return liveKeyDump(genericObjectPool, jobRunner, statefulRedisPubSubConnection, LiveReaderBuilder.pubSubPatterns(i, strArr), LiveReaderBuilder.STRING_KEY_EXTRACTOR);
    }

    public static <K, V> LiveReaderBuilder<K, V, KeyDump<K>> liveKeyDump(GenericObjectPool<StatefulConnection<K, V>> genericObjectPool, JobRunner jobRunner, StatefulRedisPubSubConnection<K, V> statefulRedisPubSubConnection, K[] kArr, Converter<K, K> converter) {
        return new LiveReaderBuilder<>(jobRunner, new KeyDumpValueReader(genericObjectPool), statefulRedisPubSubConnection, kArr, converter);
    }
}
