package com.redis.spring.batch.reader;

import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import io.lettuce.core.cluster.pubsub.RedisClusterPubSubAdapter;
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import io.lettuce.core.cluster.pubsub.api.sync.NodeSelectionPubSubCommands;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.util.Assert;

/* loaded from: input_file:com/redis/spring/batch/reader/RedisClusterKeyspaceNotificationPublisher.class */
public class RedisClusterKeyspaceNotificationPublisher<K, V> extends RedisClusterPubSubAdapter<K, V> implements KeyspaceNotificationPublisher<K> {
    private final StatefulRedisClusterPubSubConnection<K, V> connection;
    private final Log log = LogFactory.getLog(getClass());
    private final List<KeyspaceNotificationListener<K>> listeners = new ArrayList();

    public RedisClusterKeyspaceNotificationPublisher(StatefulRedisClusterPubSubConnection<K, V> statefulRedisClusterPubSubConnection) {
        Assert.notNull(statefulRedisClusterPubSubConnection, "A pub/sub connection is required");
        this.connection = statefulRedisClusterPubSubConnection;
    }

    @Override // com.redis.spring.batch.reader.KeyspaceNotificationPublisher
    public void addListener(KeyspaceNotificationListener<K> keyspaceNotificationListener) {
        this.listeners.add(keyspaceNotificationListener);
    }

    @Override // com.redis.spring.batch.reader.KeyspaceNotificationPublisher
    public void subscribe(K... kArr) {
        this.log.debug("Adding pub/sub listener");
        this.connection.addListener(this);
        this.connection.setNodeMessagePropagation(true);
        this.log.debug("Subscribing to keyspace notifications");
        ((NodeSelectionPubSubCommands) this.connection.sync().upstream().commands()).psubscribe(kArr);
    }

    @Override // com.redis.spring.batch.reader.KeyspaceNotificationPublisher
    public void unsubscribe(K... kArr) {
        if (this.connection == null) {
            return;
        }
        this.log.debug("Unsubscribing from keyspace notifications");
        ((NodeSelectionPubSubCommands) this.connection.sync().upstream().commands()).punsubscribe(kArr);
        this.log.debug("Removing pub/sub listener");
        this.connection.removeListener(this);
    }

    private void notification(K k) {
        this.listeners.forEach(keyspaceNotificationListener -> {
            keyspaceNotificationListener.notification(k);
        });
    }

    public void message(RedisClusterNode redisClusterNode, K k, V v) {
        notification(k);
    }

    public void message(RedisClusterNode redisClusterNode, K k, K k2, V v) {
        notification(k2);
    }
}
