package com.redis.spring.batch;

import com.redis.spring.batch.KeyValue;
import com.redis.spring.batch.reader.AbstractValueReader;
import com.redis.spring.batch.reader.LiveRedisItemReader;
import com.redis.spring.batch.reader.RedisValueEnqueuer;
import com.redis.spring.batch.reader.ScanKeyItemReader;
import com.redis.spring.batch.reader.StreamItemReader;
import com.redis.spring.batch.reader.ValueReader;
import com.redis.spring.batch.support.JobRunner;
import com.redis.spring.batch.support.RedisConnectionBuilder;
import com.redis.spring.batch.support.Utils;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.RedisCommandExecutionException;
import io.lettuce.core.RedisCommandTimeoutException;
import io.lettuce.core.XReadArgs;
import io.lettuce.core.codec.ByteArrayCodec;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.codec.StringCodec;
import io.micrometer.core.instrument.Tag;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionException;
import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder;
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy;
import org.springframework.batch.core.step.skip.SkipPolicy;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.support.AbstractItemStreamItemReader;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.ClassUtils;

/* loaded from: input_file:com/redis/spring/batch/RedisItemReader.class */
public class RedisItemReader<K, T extends KeyValue<K, ?>> extends AbstractItemStreamItemReader<T> {
    public static final int DEFAULT_THREADS = 1;
    public static final int DEFAULT_CHUNK_SIZE = 50;
    public static final int DEFAULT_QUEUE_CAPACITY = 10000;
    public static final int DEFAULT_SKIP_LIMIT = 3;
    private final AtomicInteger runningThreads = new AtomicInteger();
    protected final ItemReader<K> keyReader;
    private final ValueReader<K, T> valueReader;
    protected final BlockingQueue<T> valueQueue;
    protected final RedisValueEnqueuer<K, T> enqueuer;
    protected final JobRunner jobRunner;
    private final int threads;
    private final int chunkSize;
    private final Duration queuePollTimeout;
    private final SkipPolicy skipPolicy;
    private JobExecution jobExecution;
    private String name;
    private boolean open;
    private static final Logger log = LoggerFactory.getLogger(RedisItemReader.class);
    public static final Duration DEFAULT_QUEUE_POLL_TIMEOUT = Duration.ofMillis(100);
    public static final SkipPolicy DEFAULT_SKIP_POLICY = limitCheckingSkipPolicy(3);

    /* loaded from: input_file:com/redis/spring/batch/RedisItemReader$AbstractBuilder.class */
    public static abstract class AbstractBuilder<K, V, T extends KeyValue<K, ?>, B extends AbstractBuilder<K, V, T, B>> extends RedisConnectionBuilder<K, V, B> {
        protected final AbstractValueReader.ValueReaderFactory<K, V, T> valueReaderFactory;
        protected int chunkSize;
        protected int threads;
        protected int valueQueueCapacity;
        protected Duration queuePollTimeout;
        protected SkipPolicy skipPolicy;
        protected Optional<JobRunner> jobRunner;

        /* JADX INFO: Access modifiers changed from: protected */
        public AbstractBuilder(AbstractRedisClient abstractRedisClient, RedisCodec<K, V> redisCodec, AbstractValueReader.ValueReaderFactory<K, V, T> valueReaderFactory) {
            super(abstractRedisClient, redisCodec);
            this.chunkSize = 50;
            this.threads = 1;
            this.valueQueueCapacity = 10000;
            this.queuePollTimeout = RedisItemReader.DEFAULT_QUEUE_POLL_TIMEOUT;
            this.skipPolicy = RedisItemReader.DEFAULT_SKIP_POLICY;
            this.jobRunner = Optional.empty();
            this.valueReaderFactory = valueReaderFactory;
        }

        /* renamed from: keyReader */
        protected abstract ItemReader<K> mo21keyReader();

        public B jobRunner(JobRunner jobRunner) {
            this.jobRunner = Optional.of(jobRunner);
            return this;
        }

        public B jobRunner(Optional<JobRunner> optional) {
            this.jobRunner = optional;
            return this;
        }

        public B chunkSize(int i) {
            this.chunkSize = i;
            return this;
        }

        public B threads(int i) {
            this.threads = i;
            return this;
        }

        public B valueQueueCapacity(int i) {
            this.valueQueueCapacity = i;
            return this;
        }

        public B queuePollTimeout(Duration duration) {
            this.queuePollTimeout = duration;
            return this;
        }

        public B skipPolicy(SkipPolicy skipPolicy) {
            this.skipPolicy = skipPolicy;
            return this;
        }

        protected ValueReader<K, T> valueReader() {
            return this.valueReaderFactory.create(connectionSupplier(), this.poolConfig, async());
        }

        protected JobRunner jobRunner() throws Exception {
            return this.jobRunner.isEmpty() ? JobRunner.inMemory() : this.jobRunner.get();
        }
    }

    /* loaded from: input_file:com/redis/spring/batch/RedisItemReader$Builder.class */
    public static class Builder<K, V, T extends KeyValue<K, ?>> extends AbstractBuilder<K, V, T, Builder<K, V, T>> {
        private String match;
        private long count;
        private Optional<String> type;

        public Builder(AbstractRedisClient abstractRedisClient, RedisCodec<K, V> redisCodec, AbstractValueReader.ValueReaderFactory<K, V, T> valueReaderFactory) {
            super(abstractRedisClient, redisCodec, valueReaderFactory);
            this.match = ScanKeyItemReader.DEFAULT_SCAN_MATCH;
            this.count = 1000L;
            this.type = Optional.empty();
        }

        public Builder<K, V, T> match(String str) {
            this.match = str;
            return this;
        }

        public Builder<K, V, T> count(long j) {
            this.count = j;
            return this;
        }

        public Builder<K, V, T> type(String str) {
            this.type = Optional.of(str);
            return this;
        }

        @Override // com.redis.spring.batch.RedisItemReader.AbstractBuilder
        /* renamed from: keyReader */
        protected ItemReader<K> mo21keyReader() {
            return new ScanKeyItemReader(connectionSupplier(), sync(), this.match, this.count, this.type);
        }

        public RedisItemReader<K, T> build() throws Exception {
            return new RedisItemReader<>(this);
        }

        public LiveRedisItemReader.Builder<K, V, T> live() {
            LiveRedisItemReader.Builder<K, V, T> builder = new LiveRedisItemReader.Builder<>(this.client, this.codec, this.valueReaderFactory);
            builder.keyPatterns(this.match);
            builder.jobRunner(this.jobRunner);
            builder.threads(this.threads);
            builder.valueQueueCapacity(this.valueQueueCapacity);
            builder.queuePollTimeout(this.queuePollTimeout);
            builder.skipPolicy(this.skipPolicy);
            return builder;
        }
    }

    /* loaded from: input_file:com/redis/spring/batch/RedisItemReader$CodecBuilder.class */
    public static class CodecBuilder {
        private final AbstractRedisClient client;

        public CodecBuilder(AbstractRedisClient abstractRedisClient) {
            this.client = abstractRedisClient;
        }

        public <K, V> TypeBuilder<K, V> codec(RedisCodec<K, V> redisCodec) {
            return new TypeBuilder<>(this.client, redisCodec);
        }

        public TypeBuilder<String, String> string() {
            return new TypeBuilder<>(this.client, StringCodec.UTF8);
        }

        public TypeBuilder<byte[], byte[]> bytes() {
            return new TypeBuilder<>(this.client, ByteArrayCodec.INSTANCE);
        }
    }

    /* loaded from: input_file:com/redis/spring/batch/RedisItemReader$StreamBuilder.class */
    public static class StreamBuilder<K, V> extends RedisConnectionBuilder<K, V, StreamBuilder<K, V>> {
        public static final String DEFAULT_CONSUMER_GROUP = ClassUtils.getShortName(StreamItemReader.class);
        public static final String DEFAULT_CONSUMER = "consumer1";
        private final K stream;
        private String offset;
        private Duration block;
        private long count;
        private K consumerGroup;
        private K consumer;
        private StreamItemReader.AckPolicy ackPolicy;

        public StreamBuilder(AbstractRedisClient abstractRedisClient, RedisCodec<K, V> redisCodec, K k) {
            super(abstractRedisClient, redisCodec);
            this.offset = "0-0";
            this.block = StreamItemReader.DEFAULT_BLOCK;
            this.count = 50L;
            this.ackPolicy = StreamItemReader.DEFAULT_ACK_POLICY;
            this.stream = k;
        }

        public StreamBuilder<K, V> offset(String str) {
            this.offset = str;
            return this;
        }

        public StreamBuilder<K, V> block(Duration duration) {
            this.block = duration;
            return this;
        }

        public StreamBuilder<K, V> count(long j) {
            this.count = j;
            return this;
        }

        public StreamBuilder<K, V> consumerGroup(K k) {
            this.consumerGroup = k;
            return this;
        }

        public StreamBuilder<K, V> consumer(K k) {
            this.consumer = k;
            return this;
        }

        public StreamBuilder<K, V> ackPolicy(StreamItemReader.AckPolicy ackPolicy) {
            this.ackPolicy = ackPolicy;
            return this;
        }

        public StreamItemReader<K, V> build() {
            StreamItemReader<K, V> streamItemReader = new StreamItemReader<>(connectionSupplier(), this.poolConfig, sync(), XReadArgs.StreamOffset.from(this.stream, this.offset));
            streamItemReader.setAckPolicy(this.ackPolicy);
            streamItemReader.setBlock(this.block);
            streamItemReader.setConsumer(this.consumer == null ? encodeKey(DEFAULT_CONSUMER) : this.consumer);
            streamItemReader.setConsumerGroup(this.consumerGroup == null ? encodeKey(DEFAULT_CONSUMER_GROUP) : this.consumerGroup);
            streamItemReader.setCount(this.count);
            return streamItemReader;
        }
    }

    /* loaded from: input_file:com/redis/spring/batch/RedisItemReader$TypeBuilder.class */
    public static class TypeBuilder<K, V> {
        private final AbstractRedisClient client;
        private final RedisCodec<K, V> codec;

        public TypeBuilder(AbstractRedisClient abstractRedisClient, RedisCodec<K, V> redisCodec) {
            this.client = abstractRedisClient;
            this.codec = redisCodec;
        }

        public Builder<K, V, DataStructure<K>> dataStructure() {
            return new Builder<>(this.client, this.codec, AbstractValueReader.ValueReaderFactory.dataStructure());
        }

        public Builder<K, V, KeyValue<K, byte[]>> keyDump() {
            return new Builder<>(this.client, this.codec, AbstractValueReader.ValueReaderFactory.keyDump());
        }

        public StreamBuilder<K, V> stream(K k) {
            return new StreamBuilder<>(this.client, this.codec, k);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public RedisItemReader(AbstractBuilder<K, ?, T, ?> abstractBuilder) throws Exception {
        setName(ClassUtils.getShortName(getClass()));
        this.chunkSize = abstractBuilder.chunkSize;
        this.queuePollTimeout = abstractBuilder.queuePollTimeout;
        this.skipPolicy = abstractBuilder.skipPolicy;
        this.threads = abstractBuilder.threads;
        this.keyReader = abstractBuilder.mo21keyReader();
        this.valueReader = abstractBuilder.valueReader();
        this.valueQueue = new LinkedBlockingQueue(abstractBuilder.valueQueueCapacity);
        this.enqueuer = new RedisValueEnqueuer<>(this.valueReader, this.valueQueue);
        this.jobRunner = abstractBuilder.jobRunner();
    }

    public static SkipPolicy limitCheckingSkipPolicy(int i) {
        return new LimitCheckingItemSkipPolicy(i, (Map) Stream.of((Object[]) new Class[]{RedisCommandExecutionException.class, RedisCommandTimeoutException.class, TimeoutException.class}).collect(Collectors.toMap(cls -> {
            return cls;
        }, cls2 -> {
            return true;
        })));
    }

    public ValueReader<K, T> getValueReader() {
        return this.valueReader;
    }

    public void setName(String str) {
        this.name = str;
        super.setName(str);
    }

    public void open(ExecutionContext executionContext) throws ItemStreamException {
        synchronized (this.runningThreads) {
            if (this.jobExecution == null) {
                Utils.createGaugeCollectionSize("reader.queue.size", this.valueQueue, new Tag[0]);
                FaultTolerantStepBuilder faultTolerant = step().faultTolerant();
                faultTolerant.skipPolicy(this.skipPolicy);
                if (this.threads > 1) {
                    ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
                    threadPoolTaskExecutor.setMaxPoolSize(this.threads);
                    threadPoolTaskExecutor.setCorePoolSize(this.threads);
                    threadPoolTaskExecutor.afterPropertiesSet();
                    faultTolerant.taskExecutor(threadPoolTaskExecutor).throttleLimit(this.threads);
                }
                try {
                    this.jobExecution = execute(this.jobRunner.job(this.name).start(faultTolerant.build()).build());
                    this.open = true;
                } catch (JobExecutionException e) {
                    throw new ItemStreamException(String.format("Could not run job %s", this.name), e);
                }
            }
            this.runningThreads.incrementAndGet();
            super.open(executionContext);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SimpleStepBuilder<K, K> step() {
        return this.jobRunner.step(this.name, this.chunkSize, this.keyReader, this.enqueuer);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public JobExecution execute(Job job) throws JobExecutionException {
        log.debug("Executing job {}", this.name);
        return this.jobRunner.runAsync(job);
    }

    /* renamed from: read, reason: merged with bridge method [inline-methods] */
    public T m2read() throws Exception {
        T poll;
        do {
            poll = this.valueQueue.poll(this.queuePollTimeout.toMillis(), TimeUnit.MILLISECONDS);
            if (poll != null || !this.jobExecution.isRunning()) {
                break;
            }
        } while (!this.jobExecution.getStatus().isUnsuccessful());
        return poll;
    }

    public List<T> read(int i) {
        ArrayList arrayList = new ArrayList(i);
        this.valueQueue.drainTo(arrayList, i);
        return arrayList;
    }

    public void close() {
        super.close();
        if (this.runningThreads.decrementAndGet() > 0) {
            return;
        }
        synchronized (this.runningThreads) {
            log.debug("Closing {}", this.name);
            try {
                this.jobRunner.awaitTermination(this.jobExecution);
                if (!this.valueQueue.isEmpty()) {
                    log.warn("Closing {} with {} items still in queue", ClassUtils.getShortName(getClass()), Integer.valueOf(this.valueQueue.size()));
                }
                this.open = false;
            } catch (JobExecutionException e) {
                throw new ItemStreamException(String.format("Job '%s' did not terminate in a timely fashion", this.name), e);
            }
        }
    }

    public boolean isOpen() {
        return this.open;
    }

    public static CodecBuilder client(AbstractRedisClient abstractRedisClient) {
        return new CodecBuilder(abstractRedisClient);
    }
}
