package com.redis.spring.batch.reader;

import io.lettuce.core.cluster.models.partitions.RedisClusterNode;
import io.lettuce.core.cluster.pubsub.RedisClusterPubSubAdapter;
import io.lettuce.core.cluster.pubsub.StatefulRedisClusterPubSubConnection;
import io.lettuce.core.cluster.pubsub.api.sync.NodeSelectionPubSubCommands;
import java.util.function.Supplier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.core.convert.converter.Converter;
import org.springframework.util.Assert;

/* loaded from: input_file:com/redis/spring/batch/reader/RedisClusterKeyspaceNotificationItemReader.class */
public class RedisClusterKeyspaceNotificationItemReader<K, V> extends AbstractKeyspaceNotificationItemReader<K> {
    private final Log log;
    private final RedisClusterKeyspaceNotificationItemReader<K, V>.Listener listener;
    private final Supplier<StatefulRedisClusterPubSubConnection<K, V>> connectionSupplier;
    private StatefulRedisClusterPubSubConnection<K, V> connection;

    /* loaded from: input_file:com/redis/spring/batch/reader/RedisClusterKeyspaceNotificationItemReader$Listener.class */
    private class Listener extends RedisClusterPubSubAdapter<K, V> {
        private Listener() {
        }

        public void message(RedisClusterNode redisClusterNode, K k, V v) {
            RedisClusterKeyspaceNotificationItemReader.this.message(k);
        }

        public void message(RedisClusterNode redisClusterNode, K k, K k2, V v) {
            RedisClusterKeyspaceNotificationItemReader.this.message(k2);
        }
    }

    public RedisClusterKeyspaceNotificationItemReader(Supplier<StatefulRedisClusterPubSubConnection<K, V>> supplier, Converter<K, K> converter, K[] kArr) {
        super(converter, kArr);
        this.log = LogFactory.getLog(getClass());
        this.listener = new Listener();
        Assert.notNull(supplier, "A pub/sub connection supplier is required");
        this.connectionSupplier = supplier;
    }

    @Override // com.redis.spring.batch.reader.AbstractKeyspaceNotificationItemReader
    protected synchronized void doOpen() {
        this.connection = this.connectionSupplier.get();
        this.log.debug("Adding pub/sub listener");
        this.connection.addListener(this.listener);
        this.connection.setNodeMessagePropagation(true);
        this.log.debug("Subscribing to keyspace notifications");
        ((NodeSelectionPubSubCommands) this.connection.sync().upstream().commands()).psubscribe(this.patterns);
    }

    @Override // com.redis.spring.batch.reader.AbstractKeyspaceNotificationItemReader
    protected synchronized void doClose() {
        if (this.connection == null) {
            return;
        }
        this.log.debug("Unsubscribing from keyspace notifications");
        ((NodeSelectionPubSubCommands) this.connection.sync().upstream().commands()).punsubscribe(this.patterns);
        this.log.debug("Removing pub/sub listener");
        this.connection.removeListener(this.listener);
        this.connection.close();
        this.connection = null;
    }
}
