package org.springframework.batch.item.redis.support;

import com.hybhub.util.concurrent.ConcurrentSetBlockingQueue;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.micrometer.core.instrument.Tag;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import lombok.Generated;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemStreamSupport;
import org.springframework.batch.item.redis.support.convert.KeyMaker;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;

/* loaded from: input_file:org/springframework/batch/item/redis/support/AbstractKeyspaceNotificationItemReader.class */
public abstract class AbstractKeyspaceNotificationItemReader<C extends StatefulRedisPubSubConnection<String, String>> extends ItemStreamSupport implements PollableItemReader<String> {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(AbstractKeyspaceNotificationItemReader.class);
    private final Supplier<C> connectionSupplier;
    private final BlockingQueue<String> queue;
    private final List<String> pubSubPatterns;
    private C connection;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractKeyspaceNotificationItemReader(Supplier<C> supplier, List<String> list, int i) {
        this(supplier, list, new ConcurrentSetBlockingQueue(i));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractKeyspaceNotificationItemReader(Supplier<C> supplier, List<String> list, BlockingQueue<String> blockingQueue) {
        setName(ClassUtils.getShortName(getClass()));
        Assert.notNull(supplier, "A pub/sub connection supplier is required");
        Assert.notNull(blockingQueue, "A queue is required");
        Assert.notEmpty(list, "A pub/sub pattern is required");
        this.connectionSupplier = supplier;
        this.queue = blockingQueue;
        this.pubSubPatterns = list;
    }

    /* renamed from: read, reason: merged with bridge method [inline-methods] */
    public String m5read() throws Exception {
        throw new IllegalAccessException("read() method should not be called");
    }

    public synchronized void open(ExecutionContext executionContext) throws ItemStreamException {
        if (this.connection == null) {
            MetricsUtils.createGaugeCollectionSize("reader.notification.queue.size", this.queue, new Tag[0]);
            log.debug("Connecting to Redis pub/sub");
            this.connection = this.connectionSupplier.get();
            subscribe(this.connection, this.pubSubPatterns);
        }
    }

    protected abstract void subscribe(C c, List<String> list);

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.springframework.batch.item.redis.support.PollableItemReader
    public String poll(long j, TimeUnit timeUnit) throws InterruptedException {
        return this.queue.poll(j, timeUnit);
    }

    public synchronized void close() throws ItemStreamException {
        if (this.connection == null) {
            return;
        }
        unsubscribe(this.connection, this.pubSubPatterns);
        this.connection.close();
        this.connection = null;
    }

    protected abstract void unsubscribe(C c, List<String> list);

    /* JADX INFO: Access modifiers changed from: protected */
    public void add(String str) {
        String substring;
        if (str == null || (substring = str.substring(str.indexOf(KeyMaker.DEFAULT_SEPARATOR) + 1)) == null || this.queue.offer(substring)) {
            return;
        }
        log.debug("Notification queue full for key '{}' (size={})", substring, Integer.valueOf(this.queue.size()));
    }
}
