package com.redis.spring.batch.reader;

import com.redis.spring.batch.KeyValue;
import com.redis.spring.batch.RedisItemReader;
import com.redis.spring.batch.operation.KeyValueRead;
import com.redis.spring.batch.util.BatchUtils;
import com.redis.spring.batch.util.SetBlockingQueue;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.RedisClient;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import io.lettuce.core.cluster.pubsub.RedisClusterPubSubAdapter;
import io.lettuce.core.cluster.pubsub.RedisClusterPubSubListener;
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import io.lettuce.core.cluster.pubsub.api.sync.NodeSelectionPubSubCommands;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.pubsub.RedisPubSubAdapter;
import io.lettuce.core.pubsub.RedisPubSubListener;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;

/* loaded from: input_file:com/redis/spring/batch/reader/KeyNotificationItemReader.class */
public class KeyNotificationItemReader<K, V> extends AbstractPollableItemReader<K> {
    private static final String KEYSPACE_PATTERN = "__keyspace@%s__:%s";
    private static final String KEYEVENT_PATTERN = "__keyevent@%s__:*";
    private static final String SEPARATOR = ":";
    private final AbstractRedisClient client;
    private final RedisCodec<K, V> codec;
    private final Function<String, K> keyEncoder;
    private final Function<K, String> keyDecoder;
    private final Function<V, String> valueDecoder;
    public static final int DEFAULT_QUEUE_CAPACITY = 10000;
    private int database;
    private String keyPattern;
    private String keyType;
    private BlockingQueue<KeyEvent<K>> queue;
    private AutoCloseable publisher;
    private final Log log = LogFactory.getLog(KeyNotificationItemReader.class);
    private int queueCapacity = 10000;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/redis/spring/batch/reader/KeyNotificationItemReader$ClusterKeyNotificationListener.class */
    public static class ClusterKeyNotificationListener<K, V> extends RedisClusterPubSubAdapter<K, V> {
        private final NotificationConsumer<K, V> consumer;

        public ClusterKeyNotificationListener(NotificationConsumer<K, V> notificationConsumer) {
            this.consumer = notificationConsumer;
        }

        public void message(RedisClusterNode redisClusterNode, K k, K k2, V v) {
            this.consumer.accept(k2, v);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/redis/spring/batch/reader/KeyNotificationItemReader$KeyNotificationListener.class */
    public static class KeyNotificationListener<K, V> extends RedisPubSubAdapter<K, V> {
        private final NotificationConsumer<K, V> consumer;

        public KeyNotificationListener(NotificationConsumer<K, V> notificationConsumer) {
            this.consumer = notificationConsumer;
        }

        public void message(K k, K k2, V v) {
            this.consumer.accept(k2, v);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/redis/spring/batch/reader/KeyNotificationItemReader$NotificationConsumer.class */
    public interface NotificationConsumer<K, V> {
        void accept(K k, V v);
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/redis/spring/batch/reader/KeyNotificationItemReader$RedisClusterKeyNotificationPublisher.class */
    public static class RedisClusterKeyNotificationPublisher<K, V> implements AutoCloseable {
        private final StatefulRedisClusterPubSubConnection<K, V> connection;
        private final RedisClusterPubSubListener<K, V> listener;
        private final K pattern;

        public RedisClusterKeyNotificationPublisher(RedisClusterClient redisClusterClient, RedisCodec<K, V> redisCodec, RedisClusterPubSubListener<K, V> redisClusterPubSubListener, K k) {
            this.connection = redisClusterClient.connectPubSub(redisCodec);
            this.listener = redisClusterPubSubListener;
            this.pattern = k;
            this.connection.setNodeMessagePropagation(true);
            this.connection.addListener(redisClusterPubSubListener);
            ((NodeSelectionPubSubCommands) this.connection.sync().upstream().commands()).psubscribe(new Object[]{k});
        }

        @Override // java.lang.AutoCloseable
        public synchronized void close() throws Exception {
            if (this.connection.isOpen()) {
                ((NodeSelectionPubSubCommands) this.connection.sync().upstream().commands()).punsubscribe(new Object[]{this.pattern});
                this.connection.removeListener(this.listener);
                this.connection.close();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/redis/spring/batch/reader/KeyNotificationItemReader$RedisKeyNotificationPublisher.class */
    public static class RedisKeyNotificationPublisher<K, V> implements AutoCloseable {
        private final StatefulRedisPubSubConnection<K, V> connection;
        private final K pattern;
        private final RedisPubSubListener<K, V> listener;

        public RedisKeyNotificationPublisher(RedisClient redisClient, RedisCodec<K, V> redisCodec, RedisPubSubListener<K, V> redisPubSubListener, K k) {
            this.connection = redisClient.connectPubSub(redisCodec);
            this.listener = redisPubSubListener;
            this.pattern = k;
            this.connection.addListener(redisPubSubListener);
            this.connection.sync().psubscribe(new Object[]{k});
        }

        @Override // java.lang.AutoCloseable
        public synchronized void close() {
            if (this.connection.isOpen()) {
                this.connection.sync().punsubscribe(new Object[]{this.pattern});
                this.connection.removeListener(this.listener);
                this.connection.close();
            }
        }
    }

    public KeyNotificationItemReader(AbstractRedisClient abstractRedisClient, RedisCodec<K, V> redisCodec) {
        setName(ClassUtils.getShortName(getClass()));
        this.client = abstractRedisClient;
        this.codec = redisCodec;
        this.keyEncoder = BatchUtils.stringKeyFunction(redisCodec);
        this.keyDecoder = BatchUtils.toStringKeyFunction(redisCodec);
        this.valueDecoder = BatchUtils.toStringValueFunction(redisCodec);
    }

    public BlockingQueue<KeyEvent<K>> getQueue() {
        return this.queue;
    }

    public String pubSubPattern() {
        return isKeyEvents() ? String.format(KEYEVENT_PATTERN, Integer.valueOf(this.database)) : String.format(KEYSPACE_PATTERN, Integer.valueOf(this.database), this.keyPattern);
    }

    private boolean isKeyEvents() {
        return this.keyPattern == null;
    }

    @Override // com.redis.spring.batch.reader.AbstractPollableItemReader
    public boolean isRunning() {
        return this.publisher != null;
    }

    protected synchronized void doOpen() throws Exception {
        Assert.notNull(this.client, "Redis client not set");
        if (this.queue == null) {
            this.queue = new SetBlockingQueue(new LinkedBlockingQueue(this.queueCapacity), this.queueCapacity);
        }
        if (this.publisher == null) {
            this.publisher = publisher();
        }
    }

    private void keySpaceNotification(K k, V v) {
        addEvent(new KeyEvent<>(this.keyEncoder.apply(suffix(k)), keyType(this.valueDecoder.apply(v))));
    }

    private void keyEventNotification(K k, V v) {
        addEvent(new KeyEvent<>(v, keyType(suffix(k))));
    }

    private void addEvent(KeyEvent<K> keyEvent) {
        if (this.keyType == null || this.keyType.equals(keyEvent.getType().getCode())) {
            try {
                this.queue.put(keyEvent);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new ItemStreamException("Interrupted while queueing key event", e);
            }
        }
    }

    private NotificationConsumer<K, V> notificationConsumer() {
        return isKeyEvents() ? this::keyEventNotification : this::keySpaceNotification;
    }

    private String suffix(K k) {
        String apply = this.keyDecoder.apply(k);
        int indexOf = apply.indexOf(":");
        if (indexOf > 0) {
            return apply.substring(indexOf + 1);
        }
        return null;
    }

    private AutoCloseable publisher() {
        K apply = this.keyEncoder.apply(pubSubPattern());
        NotificationConsumer<K, V> notificationConsumer = notificationConsumer();
        if (this.client instanceof RedisClusterClient) {
            return new RedisClusterKeyNotificationPublisher(this.client, this.codec, new ClusterKeyNotificationListener(notificationConsumer), apply);
        }
        return new RedisKeyNotificationPublisher(this.client, this.codec, new KeyNotificationListener(notificationConsumer), apply);
    }

    protected synchronized void doClose() throws Exception {
        if (this.publisher != null) {
            this.publisher.close();
            this.publisher = null;
        }
        if (this.queue != null) {
            if (!this.queue.isEmpty()) {
                this.log.warn("Queue still contains elements");
            }
            this.queue = null;
        }
    }

    @Override // com.redis.spring.batch.reader.AbstractPollableItemReader
    protected K doPoll(long j, TimeUnit timeUnit) throws InterruptedException {
        KeyEvent<K> poll = this.queue.poll(j, timeUnit);
        if (poll == null) {
            return null;
        }
        return poll.getKey();
    }

    private KeyValue.Type keyType(String str) {
        if (str == null) {
            return null;
        }
        String lowerCase = str.toLowerCase();
        if (lowerCase.startsWith("xgroup-")) {
            return KeyValue.Type.STREAM;
        }
        if (lowerCase.startsWith("ts.")) {
            return KeyValue.Type.TIMESERIES;
        }
        if (lowerCase.startsWith("json.")) {
            return KeyValue.Type.JSON;
        }
        boolean z = -1;
        switch (lowerCase.hashCode()) {
            case -2076931528:
                if (lowerCase.equals("sinterstore")) {
                    z = 19;
                    break;
                }
                break;
            case -1960585179:
                if (lowerCase.equals("sunionstore")) {
                    z = 20;
                    break;
                }
                break;
            case -1934506309:
                if (lowerCase.equals("zrembyscore")) {
                    z = 25;
                    break;
                }
                break;
            case -1770689153:
                if (lowerCase.equals("zinterstore")) {
                    z = 28;
                    break;
                }
                break;
            case -1654342804:
                if (lowerCase.equals("zunionstore")) {
                    z = 29;
                    break;
                }
                break;
            case -1586455837:
                if (lowerCase.equals("zrembyrank")) {
                    z = 26;
                    break;
                }
                break;
            case -1411068134:
                if (lowerCase.equals("append")) {
                    z = 4;
                    break;
                }
                break;
            case -1232729711:
                if (lowerCase.equals("incrbyfloat")) {
                    z = 3;
                    break;
                }
                break;
            case -1184257109:
                if (lowerCase.equals("incrby")) {
                    z = 2;
                    break;
                }
                break;
            case -1184132926:
                if (lowerCase.equals("zdiffstore")) {
                    z = 27;
                    break;
                }
                break;
            case -750140539:
                if (lowerCase.equals("xsetid")) {
                    z = 33;
                    break;
                }
                break;
            case -693028823:
                if (lowerCase.equals("hincrbyfloat")) {
                    z = 15;
                    break;
                }
                break;
            case 113762:
                if (lowerCase.equals("set")) {
                    z = false;
                    break;
                }
                break;
            case 3197603:
                if (lowerCase.equals("hdel")) {
                    z = 16;
                    break;
                }
                break;
            case 3212026:
                if (lowerCase.equals("hset")) {
                    z = 13;
                    break;
                }
                break;
            case 3328613:
                if (lowerCase.equals("lpop")) {
                    z = 8;
                    break;
                }
                break;
            case 3330222:
                if (lowerCase.equals("lrem")) {
                    z = 11;
                    break;
                }
                break;
            case 3331190:
                if (lowerCase.equals("lset")) {
                    z = 10;
                    break;
                }
                break;
            case 3507359:
                if (lowerCase.equals("rpop")) {
                    z = 7;
                    break;
                }
                break;
            case 3522382:
                if (lowerCase.equals("sadd")) {
                    z = 17;
                    break;
                }
                break;
            case 3537150:
                if (lowerCase.equals("spop")) {
                    z = 18;
                    break;
                }
                break;
            case 3671337:
                if (lowerCase.equals("xadd")) {
                    z = 30;
                    break;
                }
                break;
            case 3674259:
                if (lowerCase.equals("xdel")) {
                    z = 32;
                    break;
                }
                break;
            case 3730919:
                if (lowerCase.equals("zadd")) {
                    z = 23;
                    break;
                }
                break;
            case 3747296:
                if (lowerCase.equals("zrem")) {
                    z = 24;
                    break;
                }
                break;
            case 103192966:
                if (lowerCase.equals("lpush")) {
                    z = 5;
                    break;
                }
                break;
            case 103308942:
                if (lowerCase.equals("ltrim")) {
                    z = 12;
                    break;
                }
                break;
            case 108734092:
                if (lowerCase.equals("rpush")) {
                    z = 6;
                    break;
                }
                break;
            case 114391194:
                if (lowerCase.equals("xtrim")) {
                    z = 31;
                    break;
                }
                break;
            case 115906510:
                if (lowerCase.equals("zincr")) {
                    z = 22;
                    break;
                }
                break;
            case 177324581:
                if (lowerCase.equals("linsert")) {
                    z = 9;
                    break;
                }
                break;
            case 191461609:
                if (lowerCase.equals("sdiffstore")) {
                    z = 21;
                    break;
                }
                break;
            case 921812499:
                if (lowerCase.equals("hincrby")) {
                    z = 14;
                    break;
                }
                break;
            case 1432545819:
                if (lowerCase.equals("setrange")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case RedisItemReader.DEFAULT_SKIP_LIMIT /* 0 */:
            case RedisItemReader.DEFAULT_THREADS /* 1 */:
            case true:
            case RedisItemReader.DEFAULT_RETRY_LIMIT /* 3 */:
            case true:
                return KeyValue.Type.STRING;
            case KeyValueRead.DEFAULT_MEM_USAGE_SAMPLES /* 5 */:
            case true:
            case true:
            case true:
            case true:
            case true:
            case true:
            case true:
                return KeyValue.Type.LIST;
            case true:
            case true:
            case true:
            case true:
                return KeyValue.Type.HASH;
            case true:
            case true:
            case true:
            case true:
            case true:
                return KeyValue.Type.SET;
            case true:
            case true:
            case true:
            case true:
            case true:
            case true:
            case true:
            case true:
                return KeyValue.Type.ZSET;
            case true:
            case true:
            case true:
            case true:
                return KeyValue.Type.STREAM;
            default:
                return null;
        }
    }

    public int getQueueCapacity() {
        return this.queueCapacity;
    }

    public void setQueueCapacity(int i) {
        this.queueCapacity = i;
    }

    public int getDatabase() {
        return this.database;
    }

    public void setDatabase(int i) {
        this.database = i;
    }

    public String getKeyPattern() {
        return this.keyPattern;
    }

    public void setKeyPattern(String str) {
        this.keyPattern = str;
    }

    public String getKeyType() {
        return this.keyType;
    }

    public void setKeyType(String str) {
        this.keyType = str;
    }
}
