package org.springframework.batch.item.redis.support;

import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisException;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import io.lettuce.core.codec.StringCodec;
import io.micrometer.core.instrument.Tag;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder;
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.core.step.skip.AlwaysSkipItemSkipPolicy;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.redis.support.KeyValue;
import org.springframework.batch.item.support.AbstractItemStreamItemReader;
import org.springframework.batch.item.support.AbstractItemStreamItemWriter;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;

/* loaded from: input_file:org/springframework/batch/item/redis/support/KeyValueItemReader.class */
public class KeyValueItemReader<T extends KeyValue<?>> extends AbstractItemStreamItemReader<T> {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(KeyValueItemReader.class);
    private final ItemReader<String> keyReader;
    private final ItemProcessor<List<? extends String>, List<T>> valueReader;
    private final int threads;
    private final int chunkSize;
    private final int queueCapacity;
    private final Duration queuePollTimeout;
    protected BlockingQueue<T> queue;
    private long pollTimeout;
    private JobExecution jobExecution;
    private String name;

    /* loaded from: input_file:org/springframework/batch/item/redis/support/KeyValueItemReader$AbstractKeyValueItemReaderBuilder.class */
    public static class AbstractKeyValueItemReaderBuilder<T extends KeyValue<?>, R extends ItemProcessor<List<? extends String>, List<T>>, B extends AbstractKeyValueItemReaderBuilder<T, R, B>> extends CommandBuilder<String, String, B> {
        public static final int DEFAULT_THREADS = 1;
        public static final int DEFAULT_CHUNK_SIZE = 50;
        public static final int DEFAULT_QUEUE_CAPACITY = 1000;
        public static final Duration DEFAULT_QUEUE_POLL_TIMEOUT = Duration.ofMillis(100);
        protected final R valueReader;
        protected final AbstractRedisClient client;
        protected int threads;
        protected int chunkSize;
        protected int queueCapacity;
        protected Duration queuePollTimeout;

        public AbstractKeyValueItemReaderBuilder(RedisClient redisClient, R r) {
            super(redisClient, StringCodec.UTF8);
            this.threads = 1;
            this.chunkSize = 50;
            this.queueCapacity = 1000;
            this.queuePollTimeout = DEFAULT_QUEUE_POLL_TIMEOUT;
            this.client = redisClient;
            this.valueReader = r;
        }

        public AbstractKeyValueItemReaderBuilder(RedisClusterClient redisClusterClient, R r) {
            super(redisClusterClient, StringCodec.UTF8);
            this.threads = 1;
            this.chunkSize = 50;
            this.queueCapacity = 1000;
            this.queuePollTimeout = DEFAULT_QUEUE_POLL_TIMEOUT;
            this.client = redisClusterClient;
            this.valueReader = r;
        }

        public B threads(int i) {
            Assert.isTrue(i > 0, "Thread count must be greater than zero");
            this.threads = i;
            return this;
        }

        public B chunkSize(int i) {
            Assert.isTrue(i > 0, "Chunk size must be greater than zero");
            this.chunkSize = i;
            return this;
        }

        public B queueCapacity(int i) {
            Assert.isTrue(i > 0, "Queue capacity must be greater than zero");
            this.queueCapacity = i;
            return this;
        }

        public B queuePollTimeout(Duration duration) {
            Assert.notNull(duration, "Queue poll timeout must not be null");
            Assert.isTrue(!duration.isZero(), "Queue poll timeout must not be zero");
            Assert.isTrue(!duration.isNegative(), "Queue poll timeout must not be negative");
            this.queuePollTimeout = duration;
            return this;
        }
    }

    /* loaded from: input_file:org/springframework/batch/item/redis/support/KeyValueItemReader$KeyValueItemReaderBuilder.class */
    public static class KeyValueItemReaderBuilder<T extends KeyValue<?>, R extends ItemProcessor<List<? extends String>, List<T>>, B extends KeyValueItemReaderBuilder<T, R, B>> extends AbstractKeyValueItemReaderBuilder<T, R, B> {
        public static final String DEFAULT_SCAN_MATCH = "*";
        public static final long DEFAULT_SCAN_COUNT = 1000;
        private String scanMatch;
        private long scanCount;
        private String scanType;

        public B scanMatch(String str) {
            this.scanMatch = str;
            return this;
        }

        public B scanCount(long j) {
            this.scanCount = j;
            return this;
        }

        public B scanType(String str) {
            this.scanType = str;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public KeyValueItemReaderBuilder(RedisClient redisClient, R r) {
            super(redisClient, r);
            this.scanMatch = "*";
            this.scanCount = 1000L;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public KeyValueItemReaderBuilder(RedisClusterClient redisClusterClient, R r) {
            super(redisClusterClient, r);
            this.scanMatch = "*";
            this.scanCount = 1000L;
        }

        public ItemReader<String> keyReader() {
            return new ScanKeyItemReader(connectionSupplier(), sync(), this.scanMatch, this.scanCount, this.scanType);
        }
    }

    /* loaded from: input_file:org/springframework/batch/item/redis/support/KeyValueItemReader$LiveKeyValueItemReaderBuilder.class */
    public static class LiveKeyValueItemReaderBuilder<T extends KeyValue<?>, R extends ItemProcessor<List<? extends String>, List<T>>, B extends LiveKeyValueItemReaderBuilder<T, R, B>> extends AbstractKeyValueItemReaderBuilder<T, R, B> {
        public static final int DEFAULT_QUEUE_CAPACITY = 1000;
        public static final int DEFAULT_DATABASE = 0;
        public static final String DEFAULT_KEY_PATTERN = "*";
        public static final String PUBSUB_PATTERN_FORMAT = "__keyspace@%s__:%s";
        public static final List<String> DEFAULT_PUBSUB_PATTERNS = pubSubPatterns(0, "*");
        protected int queueCapacity;
        private String[] keyPatterns;
        private int database;
        protected Duration flushingInterval;
        protected Duration idleTimeout;

        @Override // org.springframework.batch.item.redis.support.KeyValueItemReader.AbstractKeyValueItemReaderBuilder
        public B queueCapacity(int i) {
            this.queueCapacity = i;
            return this;
        }

        public B flushingInterval(Duration duration) {
            this.flushingInterval = duration;
            return this;
        }

        public B idleTimeout(Duration duration) {
            this.idleTimeout = duration;
            return this;
        }

        public B database(int i) {
            this.database = i;
            return this;
        }

        public B keyPatterns(String... strArr) {
            this.keyPatterns = strArr;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public LiveKeyValueItemReaderBuilder(RedisClient redisClient, R r) {
            super(redisClient, r);
            this.queueCapacity = 1000;
            this.keyPatterns = new String[]{"*"};
            this.database = 0;
            this.flushingInterval = FlushingStepBuilder.DEFAULT_FLUSHING_INTERVAL;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public LiveKeyValueItemReaderBuilder(RedisClusterClient redisClusterClient, R r) {
            super(redisClusterClient, r);
            this.queueCapacity = 1000;
            this.keyPatterns = new String[]{"*"};
            this.database = 0;
            this.flushingInterval = FlushingStepBuilder.DEFAULT_FLUSHING_INTERVAL;
        }

        public static List<String> pubSubPatterns(int i, String... strArr) {
            ArrayList arrayList = new ArrayList();
            for (String str : strArr) {
                arrayList.add(pubSubPattern(i, str));
            }
            return arrayList;
        }

        public static String pubSubPattern(int i, String str) {
            return String.format(PUBSUB_PATTERN_FORMAT, Integer.valueOf(i), str);
        }

        public PollableItemReader<String> keyReader() {
            return this.client instanceof RedisClusterClient ? new RedisClusterKeyspaceNotificationItemReader((Supplier<StatefulRedisClusterPubSubConnection<String, String>>) pubSubConnectionSupplier(), pubSubPatterns(this.database, this.keyPatterns), this.queueCapacity) : new RedisKeyspaceNotificationItemReader(pubSubConnectionSupplier(), pubSubPatterns(this.database, this.keyPatterns), this.queueCapacity);
        }
    }

    /* loaded from: input_file:org/springframework/batch/item/redis/support/KeyValueItemReader$ValueWriter.class */
    private static class ValueWriter<T extends KeyValue<?>> extends AbstractItemStreamItemWriter<String> {
        private final ItemProcessor<List<? extends String>, List<T>> valueReader;
        private final BlockingQueue<T> queue;

        private ValueWriter(ItemProcessor<List<? extends String>, List<T>> itemProcessor, BlockingQueue<T> blockingQueue) {
            this.valueReader = itemProcessor;
            this.queue = blockingQueue;
        }

        public void open(ExecutionContext executionContext) {
            super.open(executionContext);
            if (this.valueReader instanceof ItemStream) {
                this.valueReader.open(executionContext);
            }
        }

        public void update(ExecutionContext executionContext) {
            super.update(executionContext);
            if (this.valueReader instanceof ItemStream) {
                this.valueReader.update(executionContext);
            }
        }

        public void close() {
            if (this.valueReader instanceof ItemStream) {
                this.valueReader.close();
            }
            super.close();
        }

        public void write(List<? extends String> list) throws Exception {
            for (KeyValue keyValue : (List) this.valueReader.process(list)) {
                this.queue.removeIf(keyValue2 -> {
                    return keyValue2.getKey().equals(keyValue.getKey());
                });
                this.queue.put(keyValue);
            }
        }
    }

    public KeyValueItemReader(ItemReader<String> itemReader, ItemProcessor<List<? extends String>, List<T>> itemProcessor, int i, int i2, int i3, Duration duration) {
        setName(ClassUtils.getShortName(getClass()));
        Assert.notNull(itemReader, "A key reader is required");
        Assert.notNull(itemProcessor, "A value reader is required");
        Assert.isTrue(i > 0, "Thread count must be greater than zero");
        Assert.isTrue(i2 > 0, "Chunk size must be greater than zero");
        Assert.isTrue(i3 > 0, "Queue capacity must be greater than zero");
        Assert.notNull(duration, "Queue poll timeout must not be null");
        Assert.isTrue(!duration.isZero(), "Queue poll timeout must not be zero");
        Assert.isTrue(!duration.isNegative(), "Queue poll timeout must not be negative");
        this.keyReader = itemReader;
        this.valueReader = itemProcessor;
        this.threads = i;
        this.chunkSize = i2;
        this.queueCapacity = i3;
        this.queuePollTimeout = duration;
    }

    public void setName(String str) {
        this.name = str;
        super.setName(str);
    }

    public synchronized void open(ExecutionContext executionContext) throws ItemStreamException {
        if (this.jobExecution != null) {
            log.debug("Already opened, skipping");
            return;
        }
        log.debug("Opening {}", this.name);
        this.queue = new LinkedBlockingDeque(this.queueCapacity);
        this.pollTimeout = this.queuePollTimeout.toMillis();
        MetricsUtils.createGaugeCollectionSize("reader.queue.size", this.queue, new Tag[0]);
        ValueWriter valueWriter = new ValueWriter(this.valueReader, this.queue);
        JobFactory jobFactory = new JobFactory();
        try {
            jobFactory.afterPropertiesSet();
            FaultTolerantStepBuilder<String, String> faultTolerantStepBuilder = faultTolerantStepBuilder(jobFactory.getStepBuilderFactory().get(this.name + "-step").chunk(this.chunkSize));
            faultTolerantStepBuilder.skipPolicy(new AlwaysSkipItemSkipPolicy()).skip(RedisException.class).skip(TimeoutException.class);
            faultTolerantStepBuilder.reader(this.keyReader).writer(valueWriter);
            if (this.threads > 1) {
                ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
                threadPoolTaskExecutor.setMaxPoolSize(this.threads);
                threadPoolTaskExecutor.setCorePoolSize(this.threads);
                threadPoolTaskExecutor.afterPropertiesSet();
                faultTolerantStepBuilder.taskExecutor(threadPoolTaskExecutor).throttleLimit(this.threads);
            }
            Job build = jobFactory.getJobBuilderFactory().get(this.name + "-job").start(faultTolerantStepBuilder.build()).build();
            try {
                this.jobExecution = jobFactory.getAsyncLauncher().run(build, new JobParameters());
                while (!this.jobExecution.isRunning()) {
                    try {
                        Thread.sleep(10L);
                    } catch (InterruptedException e) {
                        throw new ItemStreamException("Interrupted while waiting for job to run");
                    }
                }
                super.open(executionContext);
                log.debug("Opened {}", this.name);
            } catch (Exception e2) {
                throw new ItemStreamException("Could not run job " + build.getName());
            }
        } catch (Exception e3) {
            throw new ItemStreamException("Failed to initialize the reader", e3);
        }
    }

    protected FaultTolerantStepBuilder<String, String> faultTolerantStepBuilder(SimpleStepBuilder<String, String> simpleStepBuilder) {
        return simpleStepBuilder.faultTolerant();
    }

    /* renamed from: read, reason: merged with bridge method [inline-methods] */
    public T m14read() throws Exception {
        T poll;
        do {
            poll = this.queue.poll(this.pollTimeout, TimeUnit.MILLISECONDS);
            if (poll != null) {
                break;
            }
        } while (this.jobExecution.isRunning());
        return poll;
    }

    public List<T> read(int i) throws Exception {
        ArrayList arrayList = new ArrayList(i);
        this.queue.drainTo(arrayList, i);
        return arrayList;
    }

    public synchronized void close() {
        if (this.jobExecution == null) {
            log.debug("Already closed, skipping");
            return;
        }
        log.debug("Closing {}", this.name);
        super.close();
        if (!this.queue.isEmpty()) {
            log.warn("Closing {} with {} items still in queue", ClassUtils.getShortName(getClass()), Integer.valueOf(this.queue.size()));
        }
        while (this.jobExecution.isRunning()) {
            try {
                Thread.sleep(10L);
            } catch (InterruptedException e) {
                throw new ItemStreamException("Interrupted while waiting for job to finish running");
            }
        }
        this.queue = null;
        this.jobExecution = null;
        log.debug("Closed {}", this.name);
    }
}
