package com.redis.spring.batch.reader;

import com.redis.lettucemod.util.RedisModulesUtils;
import com.redis.spring.batch.common.SetBlockingQueue;
import com.redis.spring.batch.common.Utils;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import io.lettuce.core.cluster.pubsub.RedisClusterPubSubAdapter;
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import io.lettuce.core.cluster.pubsub.api.sync.NodeSelectionPubSubCommands;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.codec.StringCodec;
import io.lettuce.core.pubsub.RedisPubSubAdapter;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.micrometer.core.instrument.Tag;
import java.util.Comparator;
import java.util.EnumMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader;
import org.springframework.util.ClassUtils;

/* loaded from: input_file:com/redis/spring/batch/reader/KeyspaceNotificationItemReader.class */
public class KeyspaceNotificationItemReader<K, V> extends AbstractItemCountingItemStreamItemReader<K> implements PollableItemReader<K>, AutoCloseable {
    public static final String QUEUE_SIZE_GAUGE_NAME = "reader.notification.queue.size";
    private static final String SEPARATOR = ":";
    private final AbstractRedisClient client;
    private final RedisCodec<K, V> codec;
    private final Map<String, KeyEventType> eventTypes = (Map) Stream.of((Object[]) KeyEventType.values()).collect(Collectors.toMap((v0) -> {
        return v0.getString();
    }, Function.identity()));
    private final QueueOptions queueOptions;
    private final String[] patterns;
    private Publisher publisher;
    private BlockingQueue<KeyspaceNotification> queue;
    private static final Log log = LogFactory.getLog(KeyspaceNotificationItemReader.class);
    protected static final KeyEventType[] EVENT_TYPES_ORDERED = {KeyEventType.DEL, KeyEventType.EXPIRED, KeyEventType.EVICTED, KeyEventType.EXPIRE, KeyEventType.PERSIST, KeyEventType.NEW_KEY, KeyEventType.RESTORE, KeyEventType.RENAME_FROM, KeyEventType.RENAME_TO, KeyEventType.MOVE_FROM, KeyEventType.MOVE_TO, KeyEventType.COPY_TO, KeyEventType.SET, KeyEventType.SETRANGE, KeyEventType.INCRBY, KeyEventType.INCRBYFLOAT, KeyEventType.APPEND, KeyEventType.HSET, KeyEventType.HINCRBY, KeyEventType.HINCRBYFLOAT, KeyEventType.HDEL, KeyEventType.JSON_SET, KeyEventType.LPUSH, KeyEventType.RPUSH, KeyEventType.RPOP, KeyEventType.LPOP, KeyEventType.LINSERT, KeyEventType.LSET, KeyEventType.LREM, KeyEventType.LTRIM, KeyEventType.SORTSTORE, KeyEventType.SADD, KeyEventType.SPOP, KeyEventType.SINTERSTORE, KeyEventType.SUNIONSTORE, KeyEventType.SDIFFSTORE, KeyEventType.ZINCR, KeyEventType.ZADD, KeyEventType.ZREM, KeyEventType.ZREMBYSCORE, KeyEventType.ZREMBYRANK, KeyEventType.ZDIFFSTORE, KeyEventType.ZINTERSTORE, KeyEventType.ZUNIONSTORE, KeyEventType.XADD, KeyEventType.XTRIM, KeyEventType.XDEL, KeyEventType.XGROUP_CREATE, KeyEventType.XGROUP_CREATECONSUMER, KeyEventType.XGROUP_DELCONSUMER, KeyEventType.XGROUP_DESTROY, KeyEventType.XGROUP_SETID, KeyEventType.XSETID, KeyEventType.TS_ADD, KeyEventType.UNKNOWN};
    private static final KeyspaceNotificationComparator NOTIFICATION_COMPARATOR = new KeyspaceNotificationComparator();

    /* loaded from: input_file:com/redis/spring/batch/reader/KeyspaceNotificationItemReader$KeyspaceNotificationComparator.class */
    private static class KeyspaceNotificationComparator implements Comparator<KeyspaceNotification> {
        private final Map<KeyEventType, Integer> ranking = new EnumMap(KeyEventType.class);

        public KeyspaceNotificationComparator() {
            for (int i = 0; i < KeyspaceNotificationItemReader.EVENT_TYPES_ORDERED.length; i++) {
                this.ranking.put(KeyspaceNotificationItemReader.EVENT_TYPES_ORDERED[i], Integer.valueOf(i));
            }
        }

        @Override // java.util.Comparator
        public int compare(KeyspaceNotification keyspaceNotification, KeyspaceNotification keyspaceNotification2) {
            return this.ranking.getOrDefault(keyspaceNotification.getEventType(), Integer.MAX_VALUE).intValue() - this.ranking.getOrDefault(keyspaceNotification2.getEventType(), Integer.MAX_VALUE).intValue();
        }
    }

    /* loaded from: input_file:com/redis/spring/batch/reader/KeyspaceNotificationItemReader$Publisher.class */
    private interface Publisher {
        void open(String... strArr);

        void close(String... strArr);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/redis/spring/batch/reader/KeyspaceNotificationItemReader$RedisClusterPublisher.class */
    public class RedisClusterPublisher extends RedisClusterPubSubAdapter<String, String> implements Publisher {
        private final StatefulRedisClusterPubSubConnection<String, String> connection;

        public RedisClusterPublisher(StatefulRedisClusterPubSubConnection<String, String> statefulRedisClusterPubSubConnection) {
            this.connection = statefulRedisClusterPubSubConnection;
        }

        @Override // com.redis.spring.batch.reader.KeyspaceNotificationItemReader.Publisher
        public void open(String... strArr) {
            this.connection.addListener(this);
            this.connection.setNodeMessagePropagation(true);
            ((NodeSelectionPubSubCommands) this.connection.sync().upstream().commands()).psubscribe(strArr);
        }

        @Override // com.redis.spring.batch.reader.KeyspaceNotificationItemReader.Publisher
        public void close(String... strArr) {
            ((NodeSelectionPubSubCommands) this.connection.sync().upstream().commands()).punsubscribe(strArr);
            this.connection.removeListener(this);
            this.connection.close();
        }

        public void message(RedisClusterNode redisClusterNode, String str, String str2) {
            KeyspaceNotificationItemReader.this.notification(str, str2);
        }

        public void message(RedisClusterNode redisClusterNode, String str, String str2, String str3) {
            KeyspaceNotificationItemReader.this.notification(str2, str3);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/redis/spring/batch/reader/KeyspaceNotificationItemReader$RedisPublisher.class */
    public class RedisPublisher extends RedisPubSubAdapter<String, String> implements Publisher {
        private final StatefulRedisPubSubConnection<String, String> connection;

        public RedisPublisher(StatefulRedisPubSubConnection<String, String> statefulRedisPubSubConnection) {
            this.connection = statefulRedisPubSubConnection;
        }

        @Override // com.redis.spring.batch.reader.KeyspaceNotificationItemReader.Publisher
        public void open(String... strArr) {
            this.connection.addListener(this);
            this.connection.sync().psubscribe(strArr);
        }

        @Override // com.redis.spring.batch.reader.KeyspaceNotificationItemReader.Publisher
        public void close(String... strArr) {
            this.connection.sync().punsubscribe(strArr);
            this.connection.removeListener(this);
            this.connection.close();
        }

        public void message(String str, String str2) {
            KeyspaceNotificationItemReader.this.notification(str, str2);
        }

        public void message(String str, String str2, String str3) {
            KeyspaceNotificationItemReader.this.notification(str2, str3);
        }
    }

    public KeyspaceNotificationItemReader(AbstractRedisClient abstractRedisClient, RedisCodec<K, V> redisCodec, QueueOptions queueOptions, String[] strArr) {
        setName(ClassUtils.getShortName(getClass()));
        this.client = abstractRedisClient;
        this.codec = redisCodec;
        this.queueOptions = queueOptions;
        this.patterns = strArr;
    }

    public BlockingQueue<KeyspaceNotification> getQueue() {
        return this.queue;
    }

    protected synchronized void doOpen() {
        if (this.publisher != null) {
            return;
        }
        this.queue = new SetBlockingQueue(new PriorityBlockingQueue(this.queueOptions.getCapacity(), NOTIFICATION_COMPARATOR));
        Utils.createGaugeCollectionSize(QUEUE_SIZE_GAUGE_NAME, this.queue, new Tag[0]);
        this.publisher = publisher();
        log.debug("Subscribing to keyspace notifications");
        this.publisher.open(this.patterns);
    }

    private Publisher publisher() {
        StatefulRedisClusterPubSubConnection pubSubConnection = RedisModulesUtils.pubSubConnection(this.client);
        return pubSubConnection instanceof StatefulRedisClusterPubSubConnection ? new RedisClusterPublisher(pubSubConnection) : new RedisPublisher(pubSubConnection);
    }

    protected synchronized void doClose() {
        if (this.publisher == null) {
            return;
        }
        log.info("Closing");
        log.debug("Unsubscribing from keyspace notifications");
        this.publisher.close(this.patterns);
        this.publisher = null;
    }

    @Override // com.redis.spring.batch.common.Openable
    public boolean isOpen() {
        return this.publisher != null;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void notification(String str, String str2) {
        if (str == null) {
            return;
        }
        if (this.queue.offer(new KeyspaceNotification(str.substring(str.indexOf(":") + 1), eventType(str2)))) {
            return;
        }
        log.warn("Could not add key because queue is full");
    }

    private KeyEventType eventType(String str) {
        return this.eventTypes.getOrDefault(str, KeyEventType.UNKNOWN);
    }

    protected K doRead() throws Exception {
        return poll(this.queueOptions.getPollTimeout().toMillis(), TimeUnit.MILLISECONDS);
    }

    @Override // com.redis.spring.batch.reader.PollableItemReader
    public K poll(long j, TimeUnit timeUnit) throws InterruptedException {
        KeyspaceNotification poll = this.queue.poll(j, timeUnit);
        if (poll == null) {
            return null;
        }
        return (K) this.codec.decodeKey(StringCodec.UTF8.encodeKey(poll.getKey()));
    }
}
