package com.redis.spring.batch.reader;

import com.redis.lettucemod.util.RedisModulesUtils;
import com.redis.spring.batch.RedisItemReader;
import com.redis.spring.batch.common.KeyBuilder;
import com.redis.spring.batch.common.KeyValue;
import com.redis.spring.batch.common.ValueType;
import com.redis.spring.batch.step.FlushingStepBuilder;
import com.redis.spring.batch.step.FlushingStepOptions;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.RedisException;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.codec.StringCodec;
import java.text.MessageFormat;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.CompositeItemWriter;

/* loaded from: input_file:com/redis/spring/batch/reader/LiveRedisItemReader.class */
public class LiveRedisItemReader<K, V> extends AbstractRedisItemReader<K, V> implements PollableItemReader<KeyValue<K>> {
    public static final String CONFIG_NOTIFY_KEYSPACE_EVENTS = "notify-keyspace-events";
    private KeyspaceNotificationOptions keyspaceNotificationOptions;
    private FlushingStepOptions flushingOptions;

    /* loaded from: input_file:com/redis/spring/batch/reader/LiveRedisItemReader$BigKeyItemWriter.class */
    private class BigKeyItemWriter implements ItemWriter<KeyValue<K>> {
        private final long memLimit;

        private BigKeyItemWriter() {
            this.memLimit = LiveRedisItemReader.this.options.getMemoryUsageOptions().getLimit().toBytes();
        }

        public void write(List<? extends KeyValue<K>> list) throws Exception {
            LiveRedisItemReader.this.getKeyReader().blockKeys((List) list.stream().filter(keyValue -> {
                return keyValue.getMemoryUsage() > this.memLimit;
            }).map((v0) -> {
                return v0.getKey();
            }).map(this::toString).collect(Collectors.toList()));
        }

        private String toString(K k) {
            return StringCodec.UTF8.decodeKey(LiveRedisItemReader.this.codec.encodeKey(k));
        }
    }

    /* loaded from: input_file:com/redis/spring/batch/reader/LiveRedisItemReader$Builder.class */
    public static class Builder<K, V> extends RedisItemReader.BaseBuilder<K, V, Builder<K, V>> {
        private KeyspaceNotificationOptions keyspaceNotificationOptions;
        private FlushingStepOptions flushingOptions;

        public Builder(AbstractRedisClient abstractRedisClient, RedisCodec<K, V> redisCodec) {
            super(abstractRedisClient, redisCodec);
            this.keyspaceNotificationOptions = KeyspaceNotificationOptions.builder().build();
            this.flushingOptions = FlushingStepOptions.builder().build();
        }

        public Builder<K, V> keyspaceNotificationOptions(KeyspaceNotificationOptions keyspaceNotificationOptions) {
            this.keyspaceNotificationOptions = keyspaceNotificationOptions;
            return this;
        }

        public Builder<K, V> flushingOptions(FlushingStepOptions flushingStepOptions) {
            this.flushingOptions = flushingStepOptions;
            return this;
        }

        public LiveRedisItemReader<K, V> dump() {
            return build(ValueType.DUMP);
        }

        public LiveRedisItemReader<K, V> struct() {
            return build(ValueType.STRUCT);
        }

        public LiveRedisItemReader<K, V> build(ValueType valueType) {
            LiveRedisItemReader<K, V> liveRedisItemReader = new LiveRedisItemReader<>(this.client, this.codec, valueType);
            configure(liveRedisItemReader);
            liveRedisItemReader.setFlushingOptions(this.flushingOptions);
            liveRedisItemReader.setKeyspaceNotificationOptions(this.keyspaceNotificationOptions);
            return liveRedisItemReader;
        }
    }

    public LiveRedisItemReader(AbstractRedisClient abstractRedisClient, RedisCodec<K, V> redisCodec, ValueType valueType) {
        super(abstractRedisClient, redisCodec, new KeyspaceNotificationItemReader(abstractRedisClient, redisCodec), valueType);
        this.keyspaceNotificationOptions = KeyspaceNotificationOptions.builder().build();
        this.flushingOptions = FlushingStepOptions.builder().build();
    }

    @Override // com.redis.spring.batch.reader.AbstractRedisItemReader
    public KeyspaceNotificationItemReader<K, V> getKeyReader() {
        return (KeyspaceNotificationItemReader) super.getKeyReader();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.redis.spring.batch.reader.AbstractRedisItemReader
    public void doOpen() {
        String str;
        try {
            str = (String) RedisModulesUtils.connection(this.client).sync().configGet(CONFIG_NOTIFY_KEYSPACE_EVENTS).getOrDefault(CONFIG_NOTIFY_KEYSPACE_EVENTS, KeyBuilder.EMPTY_STRING);
        } catch (RedisException e) {
        }
        if (!str.contains("K")) {
            throw new ItemStreamException(MessageFormat.format("Keyspace notifications not property configured: {0}={1}. Make sure it contains at least \"K\".", CONFIG_NOTIFY_KEYSPACE_EVENTS, str));
        }
        getKeyReader().setKeyspaceNotificationOptions(this.keyspaceNotificationOptions);
        getKeyReader().setScanOptions(this.options.getScanOptions());
        super.doOpen();
    }

    @Override // com.redis.spring.batch.reader.PollableItemReader
    public KeyValue<K> poll(long j, TimeUnit timeUnit) throws InterruptedException {
        return this.queue.poll(j, timeUnit);
    }

    public KeyspaceNotificationOptions getKeyspaceNotificationOptions() {
        return this.keyspaceNotificationOptions;
    }

    public void setKeyspaceNotificationOptions(KeyspaceNotificationOptions keyspaceNotificationOptions) {
        this.keyspaceNotificationOptions = keyspaceNotificationOptions;
    }

    public FlushingStepOptions getFlushingOptions() {
        return this.flushingOptions;
    }

    public void setFlushingOptions(FlushingStepOptions flushingStepOptions) {
        this.flushingOptions = flushingStepOptions;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.redis.spring.batch.reader.AbstractRedisItemReader
    public SimpleStepBuilder<K, K> step() {
        return new FlushingStepBuilder(super.step()).options(this.flushingOptions);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.redis.spring.batch.reader.AbstractRedisItemReader
    public ItemWriter<KeyValue<K>> queueWriter() {
        ItemWriter<KeyValue<K>> queueWriter = super.queueWriter();
        CompositeItemWriter compositeItemWriter = new CompositeItemWriter();
        compositeItemWriter.setDelegates(Arrays.asList(queueWriter, new BigKeyItemWriter()));
        return compositeItemWriter;
    }

    public static Builder<String, String> client(AbstractRedisClient abstractRedisClient) {
        return new Builder<>(abstractRedisClient, StringCodec.UTF8);
    }

    public static <K, V> Builder<K, V> client(AbstractRedisClient abstractRedisClient, RedisCodec<K, V> redisCodec) {
        return new Builder<>(abstractRedisClient, redisCodec);
    }
}
