package com.redis.spring.batch.reader;

import io.lettuce.core.RedisClient;
import io.lettuce.core.pubsub.RedisPubSubAdapter;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import java.util.function.BiConsumer;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamSupport;

/* loaded from: input_file:com/redis/spring/batch/reader/RedisKeyspaceNotificationPublisher.class */
public class RedisKeyspaceNotificationPublisher extends ItemStreamSupport {
    private final RedisClient client;
    private final String pattern;
    private final BiConsumer<String, String> consumer;
    private StatefulRedisPubSubConnection<String, String> connection;
    private Listener<String, String> listener;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/redis/spring/batch/reader/RedisKeyspaceNotificationPublisher$Listener.class */
    public static class Listener<K, V> extends RedisPubSubAdapter<K, V> {
        private final BiConsumer<K, V> consumer;

        public Listener(BiConsumer<K, V> biConsumer) {
            this.consumer = biConsumer;
        }

        public void message(K k, V v) {
            this.consumer.accept(k, v);
        }

        public void message(K k, K k2, V v) {
            this.consumer.accept(k2, v);
        }
    }

    public RedisKeyspaceNotificationPublisher(RedisClient redisClient, String str, BiConsumer<String, String> biConsumer) {
        this.client = redisClient;
        this.pattern = str;
        this.consumer = biConsumer;
    }

    public synchronized void open(ExecutionContext executionContext) {
        super.open(executionContext);
        if (isOpen()) {
            return;
        }
        doOpen();
    }

    private void doOpen() {
        this.connection = this.client.connectPubSub();
        this.connection.sync().psubscribe(new String[]{this.pattern});
        this.listener = new Listener<>(this.consumer);
        this.connection.addListener(this.listener);
    }

    public boolean isOpen() {
        return this.connection != null;
    }

    public synchronized void close() {
        if (isOpen()) {
            doClose();
        }
        super.close();
    }

    private void doClose() {
        this.connection.sync().punsubscribe(new String[]{this.pattern});
        this.connection.removeListener(this.listener);
        this.connection.close();
        this.connection = null;
    }
}
