package com.redis.spring.batch;

import com.redis.spring.batch.reader.KeyValueItemProcessor;
import com.redis.spring.batch.reader.KeyspaceNotificationItemReader;
import com.redis.spring.batch.reader.PollableItemReader;
import com.redis.spring.batch.step.FlushingChunkProvider;
import com.redis.spring.batch.step.FlushingStepBuilder;
import com.redis.spring.batch.util.BatchUtils;
import com.redis.spring.batch.util.CodecUtils;
import com.redis.spring.batch.util.ConnectionUtils;
import com.redis.spring.batch.util.PredicateItemProcessor;
import com.redis.spring.batch.writer.ProcessingItemWriter;
import com.redis.spring.batch.writer.QueueItemWriter;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.KeyScanArgs;
import io.lettuce.core.ReadFrom;
import io.lettuce.core.ScanIterator;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.sync.RedisKeyCommands;
import io.lettuce.core.codec.RedisCodec;
import io.micrometer.core.instrument.Metrics;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Stream;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionException;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.AbstractItemStreamItemReader;
import org.springframework.batch.item.support.AbstractItemStreamItemWriter;
import org.springframework.batch.item.support.CompositeItemProcessor;
import org.springframework.batch.item.support.CompositeItemWriter;
import org.springframework.batch.item.support.IteratorItemReader;
import org.springframework.batch.support.transaction.ResourcelessTransactionManager;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.util.ClassUtils;
import org.springframework.util.unit.DataSize;

/* loaded from: input_file:com/redis/spring/batch/RedisItemReader.class */
public class RedisItemReader<K, V> extends AbstractItemStreamItemReader<KeyValue<K>> implements PollableItemReader<KeyValue<K>> {
    public static final String QUEUE_METER = "redis.batch.reader.queue.size";
    public static final int DEFAULT_MEMORY_USAGE_SAMPLES = 5;
    public static final int DEFAULT_QUEUE_CAPACITY = 10000;
    public static final int DEFAULT_THREADS = 1;
    public static final int DEFAULT_CHUNK_SIZE = 50;
    public static final int DEFAULT_SCAN_COUNT = 50;
    public static final String PUBSUB_PATTERN_FORMAT = "__keyspace@%s__:%s";
    public static final int DEFAULT_DATABASE = 0;
    public static final int DEFAULT_NOTIFICATION_QUEUE_CAPACITY = 10000;
    public static final int DEFAULT_POOL_SIZE = 8;
    private final AbstractRedisClient client;
    private final RedisCodec<K, V> codec;
    private final RedisItemReader<K, V>.BlockedKeyItemWriter blockedKeyWriter;
    private final PredicateItemProcessor<K> blockedKeyFilter;
    protected ReadFrom readFrom;
    private ItemProcessor<K, K> keyProcessor;
    private DataSize memoryUsageLimit;
    private String scanMatch;
    private String scanType;
    private Duration idleTimeout;
    private JobRepository jobRepository;
    private JobBuilderFactory jobBuilderFactory;
    private String name;
    private JobExecution jobExecution;
    private BlockingQueue<KeyValue<K>> queue;
    public static final ValueType DEFAULT_VALUE_TYPE = ValueType.DUMP;
    public static final String MATCH_ALL = "*";
    public static final String DEFAULT_PUBSUB_PATTERN = pattern(0, MATCH_ALL);
    public static final KeyspaceNotificationItemReader.OrderingStrategy DEFAULT_ORDERING = KeyspaceNotificationItemReader.OrderingStrategy.PRIORITY;
    public static final Duration DEFAULT_POLL_TIMEOUT = Duration.ofMillis(100);
    public static final Duration DEFAULT_FLUSHING_INTERVAL = FlushingChunkProvider.DEFAULT_FLUSHING_INTERVAL;
    private ValueType valueType = DEFAULT_VALUE_TYPE;
    private Mode mode = Mode.SCAN;
    private int threads = 1;
    private int chunkSize = 50;
    private int poolSize = 8;
    private int queueCapacity = 10000;
    private Duration pollTimeout = DEFAULT_POLL_TIMEOUT;
    private int memoryUsageSamples = 5;
    private long scanCount = 50;
    private int database = 0;
    private KeyspaceNotificationItemReader.OrderingStrategy orderingStrategy = DEFAULT_ORDERING;
    private int notificationQueueCapacity = 10000;
    private Duration flushingInterval = DEFAULT_FLUSHING_INTERVAL;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/redis/spring/batch/RedisItemReader$BlockedKeyItemWriter.class */
    public class BlockedKeyItemWriter extends AbstractItemStreamItemWriter<KeyValue<K>> {
        private final Function<K, String> toStringKeyFunction;
        private final Set<String> blockedKeys = new HashSet();
        private final Predicate<KeyValue<K>> predicate = this::isMemKey;

        public BlockedKeyItemWriter(RedisCodec<K, ?> redisCodec) {
            this.toStringKeyFunction = CodecUtils.toStringKeyFunction(redisCodec);
        }

        public void write(List<? extends KeyValue<K>> list) throws Exception {
            Stream map = list.stream().filter(this.predicate).map((v0) -> {
                return v0.getKey();
            }).map(this.toStringKeyFunction);
            Set<String> set = this.blockedKeys;
            Objects.requireNonNull(set);
            map.forEach((v1) -> {
                r1.add(v1);
            });
        }

        public Set<String> getBlockedKeys() {
            return this.blockedKeys;
        }

        private boolean isMemKey(KeyValue<K> keyValue) {
            return keyValue != null && KeyValue.hasMemoryUsage(keyValue) && keyValue.getMemoryUsage().longValue() > RedisItemReader.this.memoryUsageLimit.toBytes();
        }
    }

    /* loaded from: input_file:com/redis/spring/batch/RedisItemReader$Mode.class */
    public enum Mode {
        SCAN,
        LIVE
    }

    public RedisItemReader(AbstractRedisClient abstractRedisClient, RedisCodec<K, V> redisCodec) {
        setName(ClassUtils.getShortName(getClass()));
        this.client = abstractRedisClient;
        this.codec = redisCodec;
        this.blockedKeyWriter = new BlockedKeyItemWriter(redisCodec);
        Function stringKeyFunction = CodecUtils.toStringKeyFunction(redisCodec);
        Set<String> blockedKeys = this.blockedKeyWriter.getBlockedKeys();
        Objects.requireNonNull(blockedKeys);
        Predicate predicate = (v1) -> {
            return r4.contains(v1);
        };
        this.blockedKeyFilter = new PredicateItemProcessor<>(compose(stringKeyFunction, predicate.negate()));
    }

    private static <S, T> Predicate<S> compose(Function<S, T> function, Predicate<T> predicate) {
        return obj -> {
            return predicate.test(function.apply(obj));
        };
    }

    private static String pattern(int i, String str) {
        Object[] objArr = new Object[2];
        objArr[0] = Integer.valueOf(i);
        objArr[1] = str == null ? MATCH_ALL : str;
        return String.format(PUBSUB_PATTERN_FORMAT, objArr);
    }

    public void setValueType(ValueType valueType) {
        this.valueType = valueType;
    }

    public AbstractRedisClient getClient() {
        return this.client;
    }

    public ValueType getValueType() {
        return this.valueType;
    }

    public JobRepository getJobRepository() {
        return this.jobRepository;
    }

    public void setJobRepository(JobRepository jobRepository) {
        this.jobRepository = jobRepository;
    }

    public void setKeyProcessor(ItemProcessor<K, K> itemProcessor) {
        this.keyProcessor = itemProcessor;
    }

    public ItemProcessor<K, K> getKeyProcessor() {
        return this.keyProcessor;
    }

    public int getThreads() {
        return this.threads;
    }

    public void setThreads(int i) {
        this.threads = i;
    }

    public int getChunkSize() {
        return this.chunkSize;
    }

    public void setChunkSize(int i) {
        this.chunkSize = i;
    }

    public ReadFrom getReadFrom() {
        return this.readFrom;
    }

    public void setReadFrom(ReadFrom readFrom) {
        this.readFrom = readFrom;
    }

    public int getQueueCapacity() {
        return this.queueCapacity;
    }

    public void setQueueCapacity(int i) {
        this.queueCapacity = i;
    }

    public int getNotificationQueueCapacity() {
        return this.notificationQueueCapacity;
    }

    public void setNotificationQueueCapacity(int i) {
        this.notificationQueueCapacity = i;
    }

    public Duration getPollTimeout() {
        return this.pollTimeout;
    }

    public void setPollTimeout(Duration duration) {
        this.pollTimeout = duration;
    }

    public DataSize getMemoryUsageLimit() {
        return this.memoryUsageLimit;
    }

    public void setMemoryUsageLimit(DataSize dataSize) {
        this.memoryUsageLimit = dataSize;
    }

    public int getMemoryUsageSamples() {
        return this.memoryUsageSamples;
    }

    public void setMemoryUsageSamples(int i) {
        this.memoryUsageSamples = i;
    }

    public String getScanMatch() {
        return this.scanMatch;
    }

    public void setScanMatch(String str) {
        this.scanMatch = str;
    }

    public String getScanType() {
        return this.scanType;
    }

    public void setScanType(String str) {
        this.scanType = str;
    }

    public void setScanCount(long j) {
        this.scanCount = j;
    }

    public long getScanCount() {
        return this.scanCount;
    }

    public Duration getIdleTimeout() {
        return this.idleTimeout;
    }

    public void setIdleTimeout(Duration duration) {
        this.idleTimeout = duration;
    }

    public Duration getFlushingInterval() {
        return this.flushingInterval;
    }

    public void setFlushingInterval(Duration duration) {
        this.flushingInterval = duration;
    }

    public Mode getMode() {
        return this.mode;
    }

    public void setMode(Mode mode) {
        this.mode = mode;
    }

    public int getPoolSize() {
        return this.poolSize;
    }

    public void setPoolSize(int i) {
        this.poolSize = i;
    }

    public int getDatabase() {
        return this.database;
    }

    public void setDatabase(int i) {
        this.database = i;
    }

    public KeyspaceNotificationItemReader.OrderingStrategy getOrderingStrategy() {
        return this.orderingStrategy;
    }

    public void setOrderingStrategy(KeyspaceNotificationItemReader.OrderingStrategy orderingStrategy) {
        this.orderingStrategy = orderingStrategy;
    }

    public void setName(String str) {
        super.setName(str);
        this.name = str;
    }

    public synchronized void open(ExecutionContext executionContext) {
        super.open(executionContext);
        if (isOpen()) {
            return;
        }
        doOpen();
    }

    private void doOpen() {
        SimpleStepBuilder chunk = new StepBuilder(this.name).repository(jobRepository()).transactionManager(transactionManager()).chunk(this.chunkSize);
        ItemReader<K> reader = reader();
        chunk.reader(reader);
        chunk.processor(processor());
        chunk.writer(writer());
        if (this.threads > 1) {
            chunk.taskExecutor(BatchUtils.threadPoolTaskExecutor(this.threads));
            chunk.throttleLimit(this.threads);
        }
        if (this.mode == Mode.LIVE) {
            chunk = new FlushingStepBuilder(chunk).interval(this.flushingInterval).idleTimeout(this.idleTimeout);
        }
        try {
            this.jobExecution = jobLauncher().run(jobBuilderFactory().get(this.name).start(chunk.build()).build(), new JobParameters());
            while (!BatchUtils.isOpen(reader) && !this.jobExecution.getStatus().isUnsuccessful() && !this.jobExecution.getStatus().isLessThanOrEqualTo(BatchStatus.COMPLETED)) {
                sleep();
            }
            if (this.jobExecution.getStatus().isUnsuccessful()) {
                throw new ItemStreamException("Could not run job", (Throwable) this.jobExecution.getAllFailureExceptions().iterator().next());
            }
        } catch (JobExecutionException e) {
            throw new ItemStreamException("Job execution failed", e);
        }
    }

    private ItemReader<K> reader() {
        return this.mode == Mode.LIVE ? keyspaceNotificationReader() : scanKeyReader();
    }

    private ItemReader<K> keyspaceNotificationReader() {
        KeyspaceNotificationItemReader keyspaceNotificationItemReader = new KeyspaceNotificationItemReader(this.client, this.codec);
        keyspaceNotificationItemReader.setKeyType(this.scanType);
        keyspaceNotificationItemReader.setOrderingStrategy(this.orderingStrategy);
        keyspaceNotificationItemReader.setQueueCapacity(this.notificationQueueCapacity);
        keyspaceNotificationItemReader.setPollTimeout(this.pollTimeout);
        keyspaceNotificationItemReader.setPattern(pattern(this.database, this.scanMatch));
        return keyspaceNotificationItemReader;
    }

    private ItemReader<K> scanKeyReader() {
        return new IteratorItemReader(ScanIterator.scan((RedisKeyCommands) ConnectionUtils.sync((StatefulConnection) ConnectionUtils.supplier(this.client, this.codec, this.readFrom).get()), scanArgs()));
    }

    private KeyScanArgs scanArgs() {
        KeyScanArgs keyScanArgs = new KeyScanArgs();
        keyScanArgs.limit(this.scanCount);
        if (this.scanMatch != null) {
            keyScanArgs.match(this.scanMatch);
        }
        if (this.scanType != null) {
            keyScanArgs.type(this.scanType);
        }
        return keyScanArgs;
    }

    public Set<String> getBlockedKeys() {
        return this.blockedKeyWriter.getBlockedKeys();
    }

    private JobBuilderFactory jobBuilderFactory() {
        if (this.jobBuilderFactory == null) {
            this.jobBuilderFactory = new JobBuilderFactory(jobRepository());
        }
        return this.jobBuilderFactory;
    }

    private JobRepository jobRepository() {
        if (this.jobRepository == null) {
            try {
                this.jobRepository = BatchUtils.inMemoryJobRepository();
            } catch (Exception e) {
                throw new ItemStreamException("Could not initialize job repository", e);
            }
        }
        return this.jobRepository;
    }

    private SimpleJobLauncher jobLauncher() {
        SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
        simpleJobLauncher.setJobRepository(jobRepository());
        simpleJobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
        return simpleJobLauncher;
    }

    private void sleep() {
        try {
            Thread.sleep(this.pollTimeout.toMillis());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ItemStreamException("Interrupted during initialization", e);
        }
    }

    private PlatformTransactionManager transactionManager() {
        return new ResourcelessTransactionManager();
    }

    public JobExecution getJobExecution() {
        return this.jobExecution;
    }

    private ItemProcessor<? super K, ? extends K> processor() {
        if (!shouldBlockKeys()) {
            return this.keyProcessor;
        }
        if (this.keyProcessor == null) {
            return this.blockedKeyFilter;
        }
        CompositeItemProcessor compositeItemProcessor = new CompositeItemProcessor();
        compositeItemProcessor.setDelegates(Arrays.asList(this.blockedKeyFilter, this.keyProcessor));
        return compositeItemProcessor;
    }

    public ItemWriter<K> writer() {
        return new ProcessingItemWriter(keyValueProcessor(), keyValueWriter());
    }

    public KeyValueItemProcessor<K, V> keyValueProcessor() {
        KeyValueItemProcessor<K, V> keyValueItemProcessor = new KeyValueItemProcessor<>(this.client, this.codec);
        keyValueItemProcessor.setMemoryUsageLimit(this.memoryUsageLimit);
        keyValueItemProcessor.setMemoryUsageSamples(this.memoryUsageSamples);
        keyValueItemProcessor.setValueType(this.valueType);
        keyValueItemProcessor.setPoolSize(this.poolSize);
        keyValueItemProcessor.setReadFrom(this.readFrom);
        return keyValueItemProcessor;
    }

    private ItemWriter<KeyValue<K>> keyValueWriter() {
        this.queue = new LinkedBlockingQueue(this.queueCapacity);
        Metrics.globalRegistry.gaugeCollectionSize(QUEUE_METER, Collections.emptyList(), this.queue);
        QueueItemWriter queueItemWriter = new QueueItemWriter(this.queue);
        if (!shouldBlockKeys()) {
            return queueItemWriter;
        }
        CompositeItemWriter compositeItemWriter = new CompositeItemWriter();
        compositeItemWriter.setDelegates(Arrays.asList(queueItemWriter, this.blockedKeyWriter));
        return compositeItemWriter;
    }

    private boolean shouldBlockKeys() {
        return this.mode == Mode.LIVE && this.memoryUsageLimit != null;
    }

    public synchronized void close() {
        if (isOpen()) {
            this.queue = null;
            if (this.jobExecution.isRunning()) {
                Iterator it = this.jobExecution.getStepExecutions().iterator();
                while (it.hasNext()) {
                    ((StepExecution) it.next()).setTerminateOnly();
                }
                this.jobExecution.setStatus(BatchStatus.STOPPING);
            }
            this.jobExecution = null;
        }
        super.close();
    }

    public boolean isOpen() {
        return this.jobExecution != null;
    }

    /* renamed from: read, reason: merged with bridge method [inline-methods] */
    public synchronized KeyValue<K> m1read() throws Exception {
        KeyValue<K> poll;
        do {
            poll = this.queue.poll(this.pollTimeout.toMillis(), TimeUnit.MILLISECONDS);
            if (poll != null || this.jobExecution == null) {
                break;
            }
        } while (this.jobExecution.isRunning());
        if (this.jobExecution == null || !this.jobExecution.getStatus().isUnsuccessful()) {
            return poll;
        }
        throw new ItemStreamException("Reader job failed");
    }

    @Override // com.redis.spring.batch.reader.PollableItemReader
    public KeyValue<K> poll(long j, TimeUnit timeUnit) throws InterruptedException {
        return this.queue.poll(j, timeUnit);
    }

    public synchronized List<KeyValue<K>> readChunk() throws Exception {
        KeyValue<K> m1read;
        ArrayList arrayList = new ArrayList();
        while (arrayList.size() < this.chunkSize && (m1read = m1read()) != null) {
            arrayList.add(m1read);
        }
        return arrayList;
    }
}
