package com.redis.spring.batch.reader;

import com.redis.lettucemod.cluster.RedisModulesClusterClient;
import com.redis.spring.batch.common.SetBlockingQueue;
import com.redis.spring.batch.common.Utils;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import io.lettuce.core.cluster.pubsub.RedisClusterPubSubAdapter;
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import io.lettuce.core.cluster.pubsub.api.sync.NodeSelectionPubSubCommands;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.codec.StringCodec;
import io.lettuce.core.pubsub.RedisPubSubAdapter;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.micrometer.core.instrument.Tag;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.support.AbstractItemStreamItemReader;
import org.springframework.util.ClassUtils;

/* loaded from: input_file:com/redis/spring/batch/reader/KeyspaceNotificationItemReader.class */
public class KeyspaceNotificationItemReader<K, V> extends AbstractItemStreamItemReader<K> implements PollableItemReader<K> {
    public static final String PUBSUB_PATTERN_FORMAT = "__keyspace@%s__:%s";
    public static final String QUEUE_SIZE_GAUGE_NAME = "reader.notification.queue.size";
    private static final String SEPARATOR = ":";
    private final AbstractRedisClient client;
    private final RedisCodec<K, V> codec;
    private KeyspaceNotificationOptions options = KeyspaceNotificationOptions.builder().build();
    private BlockingQueue<KeyspaceNotification> queue;
    private KeyspaceNotificationPublisher publisher;
    private static final Log log = LogFactory.getLog(KeyspaceNotificationItemReader.class);
    private static final KeyspaceNotificationComparator NOTIFICATION_COMPARATOR = new KeyspaceNotificationComparator();
    private static final Map<String, KeyEventType> eventTypes = (Map) Stream.of((Object[]) KeyEventType.values()).collect(Collectors.toMap((v0) -> {
        return v0.getString();
    }, Function.identity()));

    /* loaded from: input_file:com/redis/spring/batch/reader/KeyspaceNotificationItemReader$KeyspaceNotificationPublisher.class */
    private interface KeyspaceNotificationPublisher extends AutoCloseable {
        @Override // java.lang.AutoCloseable
        void close();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/redis/spring/batch/reader/KeyspaceNotificationItemReader$RedisClusterKeyspaceNotificationPublisher.class */
    public class RedisClusterKeyspaceNotificationPublisher extends RedisClusterPubSubAdapter<String, String> implements KeyspaceNotificationPublisher {
        private final StatefulRedisClusterPubSubConnection<String, String> connection;
        private final String pattern;

        public RedisClusterKeyspaceNotificationPublisher(StatefulRedisClusterPubSubConnection<String, String> statefulRedisClusterPubSubConnection, String str) {
            this.connection = statefulRedisClusterPubSubConnection;
            this.pattern = str;
            statefulRedisClusterPubSubConnection.addListener(this);
            statefulRedisClusterPubSubConnection.setNodeMessagePropagation(true);
            ((NodeSelectionPubSubCommands) statefulRedisClusterPubSubConnection.sync().upstream().commands()).psubscribe(new String[]{str});
        }

        @Override // com.redis.spring.batch.reader.KeyspaceNotificationItemReader.KeyspaceNotificationPublisher, java.lang.AutoCloseable
        public void close() {
            ((NodeSelectionPubSubCommands) this.connection.sync().upstream().commands()).punsubscribe(new String[]{this.pattern});
            this.connection.removeListener(this);
        }

        public void message(RedisClusterNode redisClusterNode, String str, String str2) {
            KeyspaceNotificationItemReader.this.notification(str, str2);
        }

        public void message(RedisClusterNode redisClusterNode, String str, String str2, String str3) {
            KeyspaceNotificationItemReader.this.notification(str2, str3);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/redis/spring/batch/reader/KeyspaceNotificationItemReader$RedisKeyspaceNotificationPublisher.class */
    public class RedisKeyspaceNotificationPublisher extends RedisPubSubAdapter<String, String> implements KeyspaceNotificationPublisher {
        private final StatefulRedisPubSubConnection<String, String> connection;
        private final String pattern;

        public RedisKeyspaceNotificationPublisher(StatefulRedisPubSubConnection<String, String> statefulRedisPubSubConnection, String str) {
            this.connection = statefulRedisPubSubConnection;
            this.pattern = str;
            statefulRedisPubSubConnection.addListener(this);
            statefulRedisPubSubConnection.sync().psubscribe(new String[]{str});
        }

        @Override // com.redis.spring.batch.reader.KeyspaceNotificationItemReader.KeyspaceNotificationPublisher, java.lang.AutoCloseable
        public void close() {
            this.connection.sync().punsubscribe(new String[]{this.pattern});
            this.connection.removeListener(this);
            this.connection.close();
        }

        public void message(String str, String str2) {
            KeyspaceNotificationItemReader.this.notification(str, str2);
        }

        public void message(String str, String str2, String str3) {
            KeyspaceNotificationItemReader.this.notification(str2, str3);
        }
    }

    public KeyspaceNotificationItemReader(AbstractRedisClient abstractRedisClient, RedisCodec<K, V> redisCodec) {
        setName(ClassUtils.getShortName(getClass()));
        this.client = abstractRedisClient;
        this.codec = redisCodec;
    }

    public KeyspaceNotificationOptions getOptions() {
        return this.options;
    }

    public void setOptions(KeyspaceNotificationOptions keyspaceNotificationOptions) {
        this.options = keyspaceNotificationOptions;
    }

    public BlockingQueue<KeyspaceNotification> getQueue() {
        return this.queue;
    }

    private String pattern(int i, String str) {
        return String.format(PUBSUB_PATTERN_FORMAT, Integer.valueOf(i), str);
    }

    public synchronized void open(ExecutionContext executionContext) {
        super.open(executionContext);
        if (this.publisher == null) {
            this.queue = new SetBlockingQueue(notificationQueue());
            Utils.createGaugeCollectionSize(QUEUE_SIZE_GAUGE_NAME, this.queue, new Tag[0]);
            this.publisher = publisher();
        }
    }

    private BlockingQueue<KeyspaceNotification> notificationQueue() {
        return this.options.getOrderingStrategy() == KeyspaceNotificationOrderingStrategy.PRIORITY ? new PriorityBlockingQueue(this.options.getQueueOptions().getCapacity(), NOTIFICATION_COMPARATOR) : new LinkedBlockingQueue(this.options.getQueueOptions().getCapacity());
    }

    private KeyspaceNotificationPublisher publisher() {
        String pattern = pattern(this.options.getDatabase(), this.options.getMatch());
        return this.client instanceof RedisModulesClusterClient ? new RedisClusterKeyspaceNotificationPublisher(this.client.connectPubSub(), pattern) : new RedisKeyspaceNotificationPublisher(this.client.connectPubSub(), pattern);
    }

    public void close() {
        if (this.publisher != null) {
            log.debug("Unsubscribing from keyspace notifications");
            this.publisher.close();
            this.publisher = null;
        }
        super.close();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void notification(String str, String str2) {
        if (str != null) {
            String substring = str.substring(str.indexOf(":") + 1);
            KeyEventType eventType = eventType(str2);
            Optional<String> type = this.options.getType();
            if (!type.isPresent() || type.get().equals(eventType.getType())) {
                KeyspaceNotification keyspaceNotification = new KeyspaceNotification(substring, eventType);
                if (this.queue.size() >= this.options.getQueueOptions().getCapacity() || !this.queue.offer(keyspaceNotification)) {
                    log.warn("Could not add key because queue is full");
                }
            }
        }
    }

    private KeyEventType eventType(String str) {
        return eventTypes.getOrDefault(str, KeyEventType.UNKNOWN);
    }

    public K read() throws Exception {
        return poll(this.options.getQueueOptions().getPollTimeout().toMillis(), TimeUnit.MILLISECONDS);
    }

    @Override // com.redis.spring.batch.reader.PollableItemReader
    public K poll(long j, TimeUnit timeUnit) throws InterruptedException {
        KeyspaceNotification poll = this.queue.poll(j, timeUnit);
        if (poll == null) {
            return null;
        }
        return (K) this.codec.decodeKey(StringCodec.UTF8.encodeKey(poll.getKey()));
    }
}
