package com.redis.spring.batch.item.redis;

import com.redis.lettucemod.api.StatefulRedisModulesConnection;
import com.redis.spring.batch.item.AbstractAsyncItemReader;
import com.redis.spring.batch.item.BlockingQueueItemWriter;
import com.redis.spring.batch.item.ProcessingItemWriter;
import com.redis.spring.batch.item.redis.common.BatchUtils;
import com.redis.spring.batch.item.redis.common.KeyValue;
import com.redis.spring.batch.item.redis.common.Operation;
import com.redis.spring.batch.item.redis.common.OperationExecutor;
import com.redis.spring.batch.item.redis.reader.KeyNotification;
import com.redis.spring.batch.item.redis.reader.KeyNotificationItemReader;
import com.redis.spring.batch.item.redis.reader.KeyNotificationStatus;
import com.redis.spring.batch.item.redis.reader.KeyScanNotificationItemReader;
import com.redis.spring.batch.item.redis.reader.KeyValueRead;
import com.redis.spring.batch.item.redis.reader.KeyValueStructRead;
import com.redis.spring.batch.step.FlushingChunkProvider;
import com.redis.spring.batch.step.FlushingStepBuilder;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.KeyScanArgs;
import io.lettuce.core.ReadFrom;
import io.lettuce.core.ScanIterator;
import io.lettuce.core.codec.ByteArrayCodec;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.codec.StringCodec;
import java.time.Duration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.BiPredicate;
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.IteratorItemReader;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;

/* loaded from: input_file:com/redis/spring/batch/item/redis/RedisItemReader.class */
public class RedisItemReader<K, V, T> extends AbstractAsyncItemReader<K, KeyValue<K, T>> {
    public static final int DEFAULT_POOL_SIZE = 8;
    public static final int DEFAULT_NOTIFICATION_QUEUE_CAPACITY = 10000;
    public static final int DEFAULT_QUEUE_CAPACITY = 10000;
    private final RedisCodec<K, V> codec;
    private final BiPredicate<K, K> keyEquals;
    private final Operation<K, V, K, KeyValue<K, T>> operation;
    private Duration flushInterval = DEFAULT_FLUSH_INTERVAL;
    private Duration idleTimeout = DEFAULT_IDLE_TIMEOUT;
    private ReaderMode mode = DEFAULT_MODE;
    private int poolSize = 8;
    private int queueCapacity = 10000;
    private int notificationQueueCapacity = 10000;
    private ReadFrom readFrom;
    private String keyPattern;
    private String keyType;
    private long scanCount;
    private int database;
    private AbstractRedisClient client;
    private BlockingQueue<KeyValue<K, T>> queue;
    public static final ReaderMode DEFAULT_MODE = ReaderMode.SCAN;
    public static final Duration DEFAULT_FLUSH_INTERVAL = FlushingChunkProvider.DEFAULT_FLUSH_INTERVAL;
    public static final Duration DEFAULT_IDLE_TIMEOUT = FlushingChunkProvider.DEFAULT_IDLE_TIMEOUT;

    /* loaded from: input_file:com/redis/spring/batch/item/redis/RedisItemReader$ReaderMode.class */
    public enum ReaderMode {
        SCAN,
        LIVE,
        LIVEONLY
    }

    public RedisItemReader(RedisCodec<K, V> redisCodec, Operation<K, V, K, KeyValue<K, T>> operation) {
        setName(ClassUtils.getShortName(getClass()));
        this.codec = redisCodec;
        this.keyEquals = BatchUtils.keyEqualityPredicate(redisCodec);
        this.operation = operation;
    }

    public Operation<K, V, K, KeyValue<K, T>> getOperation() {
        return this.operation;
    }

    protected SimpleStepBuilder<K, K> stepBuilder() {
        SimpleStepBuilder<K, K> stepBuilder = super.stepBuilder();
        if (this.mode == ReaderMode.SCAN) {
            return stepBuilder;
        }
        FlushingStepBuilder flushingStepBuilder = new FlushingStepBuilder(stepBuilder);
        flushingStepBuilder.flushInterval(this.flushInterval);
        flushingStepBuilder.idleTimeout(this.idleTimeout);
        return flushingStepBuilder;
    }

    protected ItemReader<K> reader() {
        switch (this.mode.ordinal()) {
            case 1:
                return scanNotificationReader();
            case 2:
                return notificationReader();
            default:
                return scanReader();
        }
    }

    protected boolean jobRunning() {
        return super.jobRunning() && readerOpen();
    }

    private boolean readerOpen() {
        switch (this.mode.ordinal()) {
            case 1:
            case 2:
                return getReader() != null && getReader().isOpen();
            default:
                return true;
        }
    }

    private ItemReader<K> scanNotificationReader() {
        KeyScanNotificationItemReader keyScanNotificationItemReader = new KeyScanNotificationItemReader(this.client, this.codec, scanReader());
        configure(keyScanNotificationItemReader);
        return keyScanNotificationItemReader;
    }

    private IteratorItemReader<K> scanReader() {
        return new IteratorItemReader<>(ScanIterator.scan(connection().sync(), scanArgs()));
    }

    private KeyNotificationItemReader<K, V> notificationReader() {
        KeyNotificationItemReader<K, V> keyNotificationItemReader = new KeyNotificationItemReader<>(this.client, this.codec);
        configure(keyNotificationItemReader);
        return keyNotificationItemReader;
    }

    private void configure(KeyNotificationItemReader<K, V> keyNotificationItemReader) {
        keyNotificationItemReader.setName(getName() + "-key-reader");
        keyNotificationItemReader.setQueueCapacity(this.notificationQueueCapacity);
        keyNotificationItemReader.setDatabase(this.database);
        keyNotificationItemReader.setKeyPattern(this.keyPattern);
        keyNotificationItemReader.setKeyType(this.keyType);
        keyNotificationItemReader.setPollTimeout(this.pollTimeout);
        keyNotificationItemReader.addListener(this::keyNotification);
    }

    private void keyNotification(KeyNotification<K> keyNotification, KeyNotificationStatus keyNotificationStatus) {
        if (keyNotificationStatus == KeyNotificationStatus.ACCEPTED) {
            this.queue.removeIf(keyValue -> {
                return this.keyEquals.test(keyValue.getKey(), keyNotification.getKey());
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: doPoll, reason: merged with bridge method [inline-methods] */
    public KeyValue<K, T> m1doPoll(long j, TimeUnit timeUnit) throws InterruptedException {
        return this.queue.poll(j, timeUnit);
    }

    protected ItemWriter<K> writer() {
        this.queue = new LinkedBlockingQueue(this.queueCapacity);
        return new ProcessingItemWriter(operationExecutor(), new BlockingQueueItemWriter(this.queue));
    }

    public OperationExecutor<K, V, K, KeyValue<K, T>> operationExecutor() {
        Assert.notNull(this.client, getName() + ": Redis client not set");
        OperationExecutor<K, V, K, KeyValue<K, T>> operationExecutor = new OperationExecutor<>(this.codec, this.operation);
        operationExecutor.setClient(this.client);
        operationExecutor.setPoolSize(this.poolSize);
        operationExecutor.setReadFrom(this.readFrom);
        return operationExecutor;
    }

    private StatefulRedisModulesConnection<K, V> connection() {
        return BatchUtils.connection(this.client, this.codec, this.readFrom);
    }

    private KeyScanArgs scanArgs() {
        KeyScanArgs keyScanArgs = new KeyScanArgs();
        if (this.scanCount > 0) {
            keyScanArgs.limit(this.scanCount);
        }
        if (this.keyPattern != null) {
            keyScanArgs.match(this.keyPattern);
        }
        if (this.keyType != null) {
            keyScanArgs.type(this.keyType);
        }
        return keyScanArgs;
    }

    public static RedisItemReader<byte[], byte[], byte[]> dump() {
        return new RedisItemReader<>(ByteArrayCodec.INSTANCE, KeyValueRead.dump(ByteArrayCodec.INSTANCE));
    }

    public static RedisItemReader<String, String, Object> type() {
        return type(StringCodec.UTF8);
    }

    public static <K, V> RedisItemReader<K, V, Object> type(RedisCodec<K, V> redisCodec) {
        return new RedisItemReader<>(redisCodec, KeyValueRead.type(redisCodec));
    }

    public static RedisItemReader<String, String, Object> struct() {
        return struct(StringCodec.UTF8);
    }

    public static <K, V> RedisItemReader<K, V, Object> struct(RedisCodec<K, V> redisCodec) {
        return new RedisItemReader<>(redisCodec, new KeyValueStructRead(redisCodec));
    }

    public RedisCodec<K, V> getCodec() {
        return this.codec;
    }

    public AbstractRedisClient getClient() {
        return this.client;
    }

    public void setClient(AbstractRedisClient abstractRedisClient) {
        this.client = abstractRedisClient;
    }

    public int getPoolSize() {
        return this.poolSize;
    }

    public void setPoolSize(int i) {
        this.poolSize = i;
    }

    public String getKeyPattern() {
        return this.keyPattern;
    }

    public void setKeyPattern(String str) {
        this.keyPattern = str;
    }

    public String getKeyType() {
        return this.keyType;
    }

    public void setKeyType(String str) {
        this.keyType = str;
    }

    public long getScanCount() {
        return this.scanCount;
    }

    public void setScanCount(long j) {
        this.scanCount = j;
    }

    public ReadFrom getReadFrom() {
        return this.readFrom;
    }

    public void setReadFrom(ReadFrom readFrom) {
        this.readFrom = readFrom;
    }

    public int getNotificationQueueCapacity() {
        return this.notificationQueueCapacity;
    }

    public void setNotificationQueueCapacity(int i) {
        this.notificationQueueCapacity = i;
    }

    public int getDatabase() {
        return this.database;
    }

    public void setDatabase(int i) {
        this.database = i;
    }

    public ReaderMode getMode() {
        return this.mode;
    }

    public void setMode(ReaderMode readerMode) {
        this.mode = readerMode;
    }

    public Duration getFlushInterval() {
        return this.flushInterval;
    }

    public void setFlushInterval(Duration duration) {
        this.flushInterval = duration;
    }

    public Duration getIdleTimeout() {
        return this.idleTimeout;
    }

    public void setIdleTimeout(Duration duration) {
        this.idleTimeout = duration;
    }

    public int getQueueCapacity() {
        return this.queueCapacity;
    }

    public void setQueueCapacity(int i) {
        this.queueCapacity = i;
    }
}
