package com.redis.spring.batch.item.redis.reader;

import com.redis.spring.batch.item.AbstractPollableItemReader;
import com.redis.spring.batch.item.redis.common.BatchUtils;
import com.redis.spring.batch.item.redis.common.DataType;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.codec.RedisCodec;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.springframework.util.ClassUtils;

/* loaded from: input_file:com/redis/spring/batch/item/redis/reader/KeyNotificationItemReader.class */
public class KeyNotificationItemReader<K, V> extends AbstractPollableItemReader<K> {
    public static final int DEFAULT_QUEUE_CAPACITY = 10000;
    private static final String KEYSPACE_PATTERN = "__keyspace@%s__:%s";
    private static final String KEYEVENT_PATTERN = "__keyevent@%s__:*";
    private static final String SEPARATOR = ":";
    private final AbstractRedisClient client;
    private final RedisCodec<K, V> codec;
    private final BiPredicate<K, K> keyEquals;
    private final Function<String, K> keyEncoder;
    private final Function<K, String> keyDecoder;
    private final Function<V, String> valueDecoder;
    private int database;
    private String keyPattern;
    private String keyType;
    protected BlockingQueue<K> queue;
    private KeyNotificationPublisher publisher;
    private final Map<KeyNotificationStatus, AtomicLong> statusCounts = (Map) Stream.of((Object[]) KeyNotificationStatus.values()).collect(Collectors.toMap(Function.identity(), keyNotificationStatus -> {
        return new AtomicLong();
    }));
    private int queueCapacity = 10000;
    private Set<KeyNotificationListener<K>> listeners = new LinkedHashSet();

    public KeyNotificationItemReader(AbstractRedisClient abstractRedisClient, RedisCodec<K, V> redisCodec) {
        setName(ClassUtils.getShortName(getClass()));
        this.client = abstractRedisClient;
        this.codec = redisCodec;
        this.keyEquals = BatchUtils.keyEqualityPredicate(redisCodec);
        this.keyEncoder = BatchUtils.stringKeyFunction(redisCodec);
        this.keyDecoder = BatchUtils.toStringKeyFunction(redisCodec);
        this.valueDecoder = BatchUtils.toStringValueFunction(redisCodec);
    }

    public void addListener(KeyNotificationListener<K> keyNotificationListener) {
        this.listeners.add(keyNotificationListener);
    }

    public String pubSubPattern() {
        return isKeyEvents() ? String.format(KEYEVENT_PATTERN, Integer.valueOf(this.database)) : String.format(KEYSPACE_PATTERN, Integer.valueOf(this.database), this.keyPattern);
    }

    private boolean isKeyEvents() {
        return this.keyPattern == null;
    }

    public boolean isComplete() {
        return this.publisher == null;
    }

    protected synchronized void doOpen() throws Exception {
        if (this.queue == null) {
            this.queue = new LinkedBlockingQueue(this.queueCapacity);
        }
        if (this.publisher == null) {
            this.publisher = publisher();
            this.publisher.open();
        }
    }

    public boolean isOpen() {
        return this.publisher != null;
    }

    protected synchronized void doClose() throws Exception {
        if (this.publisher != null) {
            this.publisher.close();
            this.publisher = null;
        }
        this.queue = null;
    }

    private void keySpaceNotification(K k, V v) {
        notification(this.keyEncoder.apply(suffix(k)), this.valueDecoder.apply(v));
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void keyEventNotification(K k, V v) {
        notification(v, suffix(k));
    }

    private void notification(K k, String str) {
        KeyNotification<K> keyNotification = new KeyNotification<>();
        keyNotification.setKey(k);
        keyNotification.setEvent(str);
        keyNotification.setTime(System.currentTimeMillis());
        keyNotification.setType(eventDataType(str).getString());
        KeyNotificationStatus process = process(keyNotification);
        this.statusCounts.get(process).incrementAndGet();
        Iterator<KeyNotificationListener<K>> it = this.listeners.iterator();
        while (it.hasNext()) {
            it.next().notification(keyNotification, process);
        }
    }

    private boolean accept(String str) {
        return this.keyType == null || this.keyType.equalsIgnoreCase(str);
    }

    private KeyNotificationStatus process(KeyNotification<K> keyNotification) {
        return !accept(keyNotification.getType()) ? KeyNotificationStatus.REJECTED : this.queue.removeIf(obj -> {
            return this.keyEquals.test(obj, keyNotification.getKey());
        }) ? KeyNotificationStatus.DEBOUNCED : this.queue.offer(keyNotification.getKey()) ? KeyNotificationStatus.ACCEPTED : KeyNotificationStatus.DROPPED;
    }

    private DataType eventDataType(String str) {
        if (str == null) {
            return DataType.NONE;
        }
        String lowerCase = str.toLowerCase();
        if (lowerCase.startsWith("xgroup-")) {
            return DataType.STREAM;
        }
        if (lowerCase.startsWith("ts.")) {
            return DataType.TIMESERIES;
        }
        if (lowerCase.startsWith("json.")) {
            return DataType.JSON;
        }
        boolean z = -1;
        switch (lowerCase.hashCode()) {
            case -2076931528:
                if (lowerCase.equals("sinterstore")) {
                    z = 19;
                    break;
                }
                break;
            case -1960585179:
                if (lowerCase.equals("sunionstore")) {
                    z = 20;
                    break;
                }
                break;
            case -1934506309:
                if (lowerCase.equals("zrembyscore")) {
                    z = 25;
                    break;
                }
                break;
            case -1770689153:
                if (lowerCase.equals("zinterstore")) {
                    z = 28;
                    break;
                }
                break;
            case -1654342804:
                if (lowerCase.equals("zunionstore")) {
                    z = 29;
                    break;
                }
                break;
            case -1586455837:
                if (lowerCase.equals("zrembyrank")) {
                    z = 26;
                    break;
                }
                break;
            case -1411068134:
                if (lowerCase.equals("append")) {
                    z = 4;
                    break;
                }
                break;
            case -1232729711:
                if (lowerCase.equals("incrbyfloat")) {
                    z = 3;
                    break;
                }
                break;
            case -1184257109:
                if (lowerCase.equals("incrby")) {
                    z = 2;
                    break;
                }
                break;
            case -1184132926:
                if (lowerCase.equals("zdiffstore")) {
                    z = 27;
                    break;
                }
                break;
            case -750140539:
                if (lowerCase.equals("xsetid")) {
                    z = 33;
                    break;
                }
                break;
            case -693028823:
                if (lowerCase.equals("hincrbyfloat")) {
                    z = 15;
                    break;
                }
                break;
            case 113762:
                if (lowerCase.equals("set")) {
                    z = false;
                    break;
                }
                break;
            case 3197603:
                if (lowerCase.equals("hdel")) {
                    z = 16;
                    break;
                }
                break;
            case 3212026:
                if (lowerCase.equals("hset")) {
                    z = 13;
                    break;
                }
                break;
            case 3328613:
                if (lowerCase.equals("lpop")) {
                    z = 8;
                    break;
                }
                break;
            case 3330222:
                if (lowerCase.equals("lrem")) {
                    z = 11;
                    break;
                }
                break;
            case 3331190:
                if (lowerCase.equals("lset")) {
                    z = 10;
                    break;
                }
                break;
            case 3507359:
                if (lowerCase.equals("rpop")) {
                    z = 7;
                    break;
                }
                break;
            case 3522382:
                if (lowerCase.equals("sadd")) {
                    z = 17;
                    break;
                }
                break;
            case 3537150:
                if (lowerCase.equals("spop")) {
                    z = 18;
                    break;
                }
                break;
            case 3671337:
                if (lowerCase.equals("xadd")) {
                    z = 30;
                    break;
                }
                break;
            case 3674259:
                if (lowerCase.equals("xdel")) {
                    z = 32;
                    break;
                }
                break;
            case 3730919:
                if (lowerCase.equals("zadd")) {
                    z = 23;
                    break;
                }
                break;
            case 3747296:
                if (lowerCase.equals("zrem")) {
                    z = 24;
                    break;
                }
                break;
            case 103192966:
                if (lowerCase.equals("lpush")) {
                    z = 5;
                    break;
                }
                break;
            case 103308942:
                if (lowerCase.equals("ltrim")) {
                    z = 12;
                    break;
                }
                break;
            case 108734092:
                if (lowerCase.equals("rpush")) {
                    z = 6;
                    break;
                }
                break;
            case 114391194:
                if (lowerCase.equals("xtrim")) {
                    z = 31;
                    break;
                }
                break;
            case 115906510:
                if (lowerCase.equals("zincr")) {
                    z = 22;
                    break;
                }
                break;
            case 177324581:
                if (lowerCase.equals("linsert")) {
                    z = 9;
                    break;
                }
                break;
            case 191461609:
                if (lowerCase.equals("sdiffstore")) {
                    z = 21;
                    break;
                }
                break;
            case 921812499:
                if (lowerCase.equals("hincrby")) {
                    z = 14;
                    break;
                }
                break;
            case 1432545819:
                if (lowerCase.equals("setrange")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
            case true:
            case true:
            case true:
            case true:
                return DataType.STRING;
            case KeyValueRead.DEFAULT_MEM_USAGE_SAMPLES /* 5 */:
            case true:
            case true:
            case true:
            case true:
            case true:
            case true:
            case true:
                return DataType.LIST;
            case true:
            case true:
            case true:
            case true:
                return DataType.HASH;
            case true:
            case true:
            case true:
            case true:
            case true:
                return DataType.SET;
            case true:
            case true:
            case true:
            case true:
            case true:
            case true:
            case true:
            case true:
                return DataType.ZSET;
            case true:
            case true:
            case true:
            case true:
                return DataType.STREAM;
            default:
                return DataType.NONE;
        }
    }

    private KeyNotificationConsumer<K, V> notificationConsumer() {
        return isKeyEvents() ? this::keyEventNotification : this::keySpaceNotification;
    }

    private String suffix(K k) {
        String apply = this.keyDecoder.apply(k);
        int indexOf = apply.indexOf(":");
        if (indexOf > 0) {
            return apply.substring(indexOf + 1);
        }
        return null;
    }

    private KeyNotificationPublisher publisher() {
        K apply = this.keyEncoder.apply(pubSubPattern());
        KeyNotificationConsumer<K, V> notificationConsumer = notificationConsumer();
        if (this.client instanceof RedisClusterClient) {
            return new RedisClusterKeyNotificationPublisher(this.client, this.codec, new RedisClusterKeyNotificationListener(notificationConsumer), apply);
        }
        return new RedisKeyNotificationPublisher(this.client, this.codec, new RedisKeyNotificationListener(notificationConsumer), apply);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public K doPoll(long j, TimeUnit timeUnit) throws InterruptedException {
        return this.queue.poll(j, timeUnit);
    }

    public BlockingQueue<K> getQueue() {
        return this.queue;
    }

    public List<KeyNotificationStatusCount> statusCounts() {
        return (List) this.statusCounts.entrySet().stream().map(this::statusCount).collect(Collectors.toList());
    }

    private KeyNotificationStatusCount statusCount(Map.Entry<KeyNotificationStatus, AtomicLong> entry) {
        return new KeyNotificationStatusCount(entry.getKey(), entry.getValue().get());
    }

    public long count(KeyNotificationStatus keyNotificationStatus) {
        return this.statusCounts.get(keyNotificationStatus).get();
    }

    public int getQueueCapacity() {
        return this.queueCapacity;
    }

    public void setQueueCapacity(int i) {
        this.queueCapacity = i;
    }

    public int getDatabase() {
        return this.database;
    }

    public void setDatabase(int i) {
        this.database = i;
    }

    public String getKeyPattern() {
        return this.keyPattern;
    }

    public void setKeyPattern(String str) {
        this.keyPattern = str;
    }

    public String getKeyType() {
        return this.keyType;
    }

    public void setKeyType(String str) {
        this.keyType = str;
    }
}
