package com.redis.spring.batch.reader;

import com.redis.lettucemod.RedisModulesClient;
import com.redis.lettucemod.cluster.RedisModulesClusterClient;
import com.redis.spring.batch.common.DataStructure;
import com.redis.spring.batch.common.FilteringItemProcessor;
import com.redis.spring.batch.common.FlushingOptions;
import com.redis.spring.batch.common.JobRunner;
import com.redis.spring.batch.common.KeyDump;
import com.redis.spring.batch.reader.AbstractRedisItemReader;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.codec.StringCodec;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.awaitility.Awaitility;
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.item.ItemProcessor;

/* loaded from: input_file:com/redis/spring/batch/reader/LiveRedisItemReader.class */
public class LiveRedisItemReader<K, T> extends AbstractRedisItemReader<K, T> implements PollableItemReader<T> {
    public static final int DEFAULT_DATABASE = 0;
    protected static final String[] DEFAULT_KEY_PATTERNS = {ScanOptions.DEFAULT_MATCH};
    public static final String PUBSUB_PATTERN_FORMAT = "__keyspace@%s__:%s";
    private final FlushingOptions flushingOptions;

    /* loaded from: input_file:com/redis/spring/batch/reader/LiveRedisItemReader$Builder.class */
    public static class Builder<K, V> extends AbstractRedisItemReader.AbstractReaderBuilder<K, V, Builder<K, V>> {
        private FlushingOptions flushingOptions;
        private int database;
        private String[] keyPatterns;
        private QueueOptions queueOptions;
        private Predicate<K> keyFilter;

        public Builder<K, V> keyFilter(Predicate<K> predicate) {
            this.keyFilter = predicate;
            return this;
        }

        private ItemProcessor<K, K> keyProcessor() {
            if (this.keyFilter == null) {
                return null;
            }
            return new FilteringItemProcessor(this.keyFilter);
        }

        public Builder(AbstractRedisClient abstractRedisClient, RedisCodec<K, V> redisCodec) {
            super(abstractRedisClient, redisCodec);
            this.flushingOptions = FlushingOptions.builder().build();
            this.database = 0;
            this.keyPatterns = LiveRedisItemReader.DEFAULT_KEY_PATTERNS;
            this.queueOptions = QueueOptions.builder().build();
        }

        public Builder<K, V> flushingOptions(FlushingOptions flushingOptions) {
            this.flushingOptions = flushingOptions;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // com.redis.spring.batch.reader.AbstractRedisItemReader.AbstractReaderBuilder
        public KeyspaceNotificationItemReader<K, V> keyReader() {
            return new KeyspaceNotificationItemReader<>(this.client, this.codec, this.queueOptions, LiveRedisItemReader.patterns(this.database, this.keyPatterns));
        }

        public LiveRedisItemReader<K, DataStructure<K>> dataStructure() {
            return reader(dataStructureValueReader());
        }

        public LiveRedisItemReader<K, KeyDump<K>> keyDump() {
            return reader(keyDumpValueReader());
        }

        protected <T> LiveRedisItemReader<K, T> reader(ValueReader<K, T> valueReader) {
            return new LiveRedisItemReader<>(jobRunner(), keyReader(), keyProcessor(), valueReader, this.readerOptions, this.flushingOptions);
        }

        public Builder<K, V> database(int i) {
            this.database = i;
            return this;
        }

        public Builder<K, V> keyPatterns(String... strArr) {
            this.keyPatterns = strArr;
            return this;
        }

        public Builder<K, V> queueOptions(QueueOptions queueOptions) {
            this.queueOptions = queueOptions;
            return this;
        }
    }

    public LiveRedisItemReader(JobRunner jobRunner, PollableItemReader<K> pollableItemReader, ItemProcessor<K, K> itemProcessor, ValueReader<K, T> valueReader, ReaderOptions readerOptions, FlushingOptions flushingOptions) {
        super(jobRunner, pollableItemReader, itemProcessor, valueReader, readerOptions);
        this.flushingOptions = flushingOptions;
    }

    public static String[] defaultKeyPatterns() {
        return DEFAULT_KEY_PATTERNS;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.redis.spring.batch.reader.AbstractRedisItemReader
    public synchronized void doOpen() throws Exception {
        super.doOpen();
        Awaitility.await().timeout(JobRunner.DEFAULT_RUNNING_TIMEOUT).until(this::isOpen);
    }

    @Override // com.redis.spring.batch.reader.AbstractRedisItemReader, com.redis.spring.batch.common.Openable
    public boolean isOpen() {
        return super.isOpen() && ((PollableItemReader) this.keyReader).isOpen();
    }

    @Override // com.redis.spring.batch.reader.PollableItemReader
    public T poll(long j, TimeUnit timeUnit) throws InterruptedException {
        return this.queue.poll(j, timeUnit);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.redis.spring.batch.reader.AbstractRedisItemReader
    public SimpleStepBuilder<K, K> step() {
        return JobRunner.flushing(super.step(), this.flushingOptions);
    }

    public static Builder<String, String> client(RedisModulesClusterClient redisModulesClusterClient) {
        return new Builder<>(redisModulesClusterClient, StringCodec.UTF8);
    }

    public static Builder<String, String> client(RedisModulesClient redisModulesClient) {
        return new Builder<>(redisModulesClient, StringCodec.UTF8);
    }

    public static <K, V> Builder<K, V> client(RedisModulesClient redisModulesClient, RedisCodec<K, V> redisCodec) {
        return new Builder<>(redisModulesClient, redisCodec);
    }

    public static <K, V> Builder<K, V> client(RedisModulesClusterClient redisModulesClusterClient, RedisCodec<K, V> redisCodec) {
        return new Builder<>(redisModulesClusterClient, redisCodec);
    }

    public static String[] patterns(int i, String... strArr) {
        return (String[]) ((List) Stream.of((Object[]) strArr).map(str -> {
            return String.format(PUBSUB_PATTERN_FORMAT, Integer.valueOf(i), str);
        }).collect(Collectors.toList())).toArray(new String[0]);
    }
}
