package com.redis.spring.batch.reader;

import com.redis.spring.batch.support.Utils;
import io.micrometer.core.instrument.Tag;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemStreamSupport;
import org.springframework.core.convert.converter.Converter;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;

/* loaded from: input_file:com/redis/spring/batch/reader/AbstractKeyspaceNotificationItemReader.class */
public abstract class AbstractKeyspaceNotificationItemReader<K> extends ItemStreamSupport implements PollableItemReader<K> {
    public static final int DEFAULT_QUEUE_CAPACITY = 10000;
    private final Converter<K, K> keyExtractor;
    protected final K[] patterns;
    private boolean open;
    private BlockingQueue<K> queue;
    private static final Logger log = LoggerFactory.getLogger(AbstractKeyspaceNotificationItemReader.class);
    public static final Duration DEFAULT_DEFAULT_QUEUE_POLL_TIMEOUT = Duration.ofMillis(100);
    private final Collection<Listener<K>> listeners = new ArrayList();
    private int queueCapacity = 10000;
    private Duration defaultQueuePollTimeout = DEFAULT_DEFAULT_QUEUE_POLL_TIMEOUT;

    /* loaded from: input_file:com/redis/spring/batch/reader/AbstractKeyspaceNotificationItemReader$Listener.class */
    public interface Listener<K> {
        void key(K k);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractKeyspaceNotificationItemReader(Converter<K, K> converter, K[] kArr) {
        Assert.notNull(kArr, "Patterns must not be null");
        setName(ClassUtils.getShortName(getClass()));
        this.keyExtractor = converter;
        this.patterns = kArr;
    }

    public void setQueueCapacity(int i) {
        Utils.assertPositive(Integer.valueOf(i), "Queue capacity");
        this.queueCapacity = i;
    }

    public void setDefaultQueuePollTimeout(Duration duration) {
        Utils.assertPositive(duration, "Default queue poll timeout");
        this.defaultQueuePollTimeout = duration;
    }

    public void addListener(Listener<K> listener) {
        this.listeners.add(listener);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void message(K k) {
        Object convert;
        if (k == null || (convert = this.keyExtractor.convert(k)) == null) {
            return;
        }
        this.listeners.forEach(listener -> {
            listener.key(convert);
        });
        if (this.queue.offer(convert)) {
            return;
        }
        log.warn("Could not add key because queue is full. Queue size: {}", Integer.valueOf(this.queue.size()));
    }

    public K read() throws Exception {
        return poll(this.defaultQueuePollTimeout.toMillis(), TimeUnit.MILLISECONDS);
    }

    public synchronized void open(ExecutionContext executionContext) throws ItemStreamException {
        if (this.queue == null) {
            this.queue = new LinkedBlockingQueue(this.queueCapacity);
            Utils.createGaugeCollectionSize("reader.notification.queue.size", this.queue, new Tag[0]);
            doOpen();
            this.open = true;
        }
    }

    protected abstract void doOpen();

    @Override // com.redis.spring.batch.reader.PollableItemReader
    public K poll(long j, TimeUnit timeUnit) throws InterruptedException {
        return this.queue.poll(j, timeUnit);
    }

    public synchronized void close() throws ItemStreamException {
        if (this.queue == null) {
            return;
        }
        if (!this.queue.isEmpty()) {
            log.warn("Closing {} with {} items still in queue", ClassUtils.getShortName(getClass()), Integer.valueOf(this.queue.size()));
        }
        doClose();
        this.queue = null;
        this.open = false;
    }

    public boolean isOpen() {
        return this.open;
    }

    protected abstract void doClose();
}
