package com.redis.spring.batch.reader;

import com.redis.spring.batch.common.SetBlockingQueue;
import com.redis.spring.batch.common.Utils;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.codec.StringCodec;
import io.micrometer.core.instrument.Tag;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.support.AbstractItemStreamItemReader;

/* loaded from: input_file:com/redis/spring/batch/reader/KeyspaceNotificationItemReader.class */
public class KeyspaceNotificationItemReader<K, V> extends AbstractItemStreamItemReader<K> implements KeyItemReader<K>, PollableItemReader<K> {
    public static final String PUBSUB_PATTERN_FORMAT = "__keyspace@%s__:%s";
    public static final String QUEUE_SIZE_GAUGE_NAME = "reader.notification.queue.size";
    private static final String SEPARATOR = ":";
    private static final KeyspaceNotificationComparator NOTIFICATION_COMPARATOR = new KeyspaceNotificationComparator();
    private static final Map<String, KeyEvent> keyEvents = (Map) Stream.of((Object[]) KeyEvent.values()).collect(Collectors.toMap((v0) -> {
        return v0.getString();
    }, Function.identity()));
    private final AbstractRedisClient client;
    private final RedisCodec<K, V> codec;
    private BlockingQueue<KeyspaceNotification> queue;
    private ItemStream publisher;
    private final List<KeyspaceNotificationListener> listeners = new ArrayList();
    private ScanOptions scanOptions = ScanOptions.builder().build();
    private KeyspaceNotificationOptions keyspaceNotificationOptions = KeyspaceNotificationOptions.builder().build();
    private final Set<String> blockedKeys = new HashSet();

    /* loaded from: input_file:com/redis/spring/batch/reader/KeyspaceNotificationItemReader$Builder.class */
    public static class Builder<K, V> {
        private final AbstractRedisClient client;
        private final RedisCodec<K, V> codec;
        private ScanOptions scanOptions = ScanOptions.builder().build();
        private KeyspaceNotificationOptions keyspaceNotificationOptions = KeyspaceNotificationOptions.builder().build();

        public Builder(AbstractRedisClient abstractRedisClient, RedisCodec<K, V> redisCodec) {
            this.client = abstractRedisClient;
            this.codec = redisCodec;
        }

        public Builder<K, V> scanOptions(ScanOptions scanOptions) {
            this.scanOptions = scanOptions;
            return this;
        }

        public Builder<K, V> keyspaceNotificationOptions(KeyspaceNotificationOptions keyspaceNotificationOptions) {
            this.keyspaceNotificationOptions = keyspaceNotificationOptions;
            return this;
        }

        public KeyspaceNotificationItemReader<K, V> build() {
            KeyspaceNotificationItemReader<K, V> keyspaceNotificationItemReader = new KeyspaceNotificationItemReader<>(this.client, this.codec);
            keyspaceNotificationItemReader.setScanOptions(this.scanOptions);
            keyspaceNotificationItemReader.setKeyspaceNotificationOptions(this.keyspaceNotificationOptions);
            return keyspaceNotificationItemReader;
        }
    }

    public KeyspaceNotificationItemReader(AbstractRedisClient abstractRedisClient, RedisCodec<K, V> redisCodec) {
        this.client = abstractRedisClient;
        this.codec = redisCodec;
    }

    public Set<String> getBlockedKeys() {
        return this.blockedKeys;
    }

    public void blockKeys(String... strArr) {
        blockKeys(Arrays.asList(strArr));
    }

    public void blockKeys(Iterable<String> iterable) {
        Set<String> set = this.blockedKeys;
        Objects.requireNonNull(set);
        iterable.forEach((v1) -> {
            r1.add(v1);
        });
    }

    public void addListener(KeyspaceNotificationListener keyspaceNotificationListener) {
        this.listeners.add(keyspaceNotificationListener);
    }

    public ScanOptions getScanOptions() {
        return this.scanOptions;
    }

    public void setScanOptions(ScanOptions scanOptions) {
        this.scanOptions = scanOptions;
    }

    public KeyspaceNotificationOptions getKeyspaceNotificationOptions() {
        return this.keyspaceNotificationOptions;
    }

    public void setKeyspaceNotificationOptions(KeyspaceNotificationOptions keyspaceNotificationOptions) {
        this.keyspaceNotificationOptions = keyspaceNotificationOptions;
    }

    public BlockingQueue<KeyspaceNotification> getQueue() {
        return this.queue;
    }

    private String pattern(int i, String str) {
        return String.format(PUBSUB_PATTERN_FORMAT, Integer.valueOf(i), str);
    }

    public synchronized void open(ExecutionContext executionContext) throws ItemStreamException {
        if (isOpen()) {
            return;
        }
        this.queue = new SetBlockingQueue(notificationQueue());
        Utils.createGaugeCollectionSize(QUEUE_SIZE_GAUGE_NAME, this.queue, new Tag[0]);
        this.publisher = publisher();
        this.publisher.open(executionContext);
    }

    public boolean isOpen() {
        return this.publisher != null;
    }

    private BlockingQueue<KeyspaceNotification> notificationQueue() {
        return this.keyspaceNotificationOptions.getOrderingStrategy() == KeyspaceNotificationOrderingStrategy.PRIORITY ? new PriorityBlockingQueue(this.keyspaceNotificationOptions.getQueueOptions().getCapacity(), NOTIFICATION_COMPARATOR) : new LinkedBlockingQueue(this.keyspaceNotificationOptions.getQueueOptions().getCapacity());
    }

    private ItemStream publisher() {
        String pattern = pattern(this.keyspaceNotificationOptions.getDatabase(), this.scanOptions.getMatch());
        BiConsumer biConsumer = this::notification;
        return this.client instanceof RedisClusterClient ? new RedisClusterKeyspaceNotificationPublisher(this.client, pattern, biConsumer) : new RedisKeyspaceNotificationPublisher(this.client, pattern, biConsumer);
    }

    private void notification(String str, String str2) {
        String key = key(str);
        if (isBlocked(key)) {
            return;
        }
        KeyEvent keyEvent = keyEvent(str2);
        if (accept(keyEvent)) {
            KeyspaceNotification keyspaceNotification = new KeyspaceNotification();
            keyspaceNotification.setKey(key);
            keyspaceNotification.setEvent(keyEvent);
            if (this.queue.offer(keyspaceNotification)) {
                return;
            }
            this.listeners.forEach(keyspaceNotificationListener -> {
                keyspaceNotificationListener.queueFull(keyspaceNotification);
            });
        }
    }

    private boolean isBlocked(String str) {
        return this.blockedKeys.contains(str);
    }

    private boolean accept(KeyEvent keyEvent) {
        Optional<String> type = this.scanOptions.getType();
        if (type.isPresent()) {
            return type.get().equals(keyEvent.getType());
        }
        return true;
    }

    private KeyEvent keyEvent(String str) {
        return keyEvents.getOrDefault(str, KeyEvent.UNKNOWN);
    }

    public synchronized void close() {
        if (isOpen()) {
            this.publisher.close();
            this.publisher = null;
        }
        super.close();
    }

    public K read() throws Exception {
        return poll(this.keyspaceNotificationOptions.getQueueOptions().getPollTimeout().toMillis(), TimeUnit.MILLISECONDS);
    }

    @Override // com.redis.spring.batch.reader.PollableItemReader
    public K poll(long j, TimeUnit timeUnit) throws InterruptedException {
        KeyspaceNotification poll = this.queue.poll(j, timeUnit);
        if (poll == null) {
            return null;
        }
        return (K) this.codec.decodeKey(StringCodec.UTF8.encodeKey(poll.getKey()));
    }

    private static String key(String str) {
        if (str == null) {
            return null;
        }
        return str.substring(str.indexOf(":") + 1);
    }
}
