package com.redis.spring.batch.reader;

import com.redis.spring.batch.common.OrPredicate;
import com.redis.spring.batch.common.Utils;
import io.micrometer.core.instrument.Tag;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemStreamSupport;
import org.springframework.core.convert.converter.Converter;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;

/* loaded from: input_file:com/redis/spring/batch/reader/KeyspaceNotificationItemReader.class */
public class KeyspaceNotificationItemReader<K> extends ItemStreamSupport implements PollableItemReader<K>, KeyspaceNotificationListener<K> {
    public static final String QUEUE_SIZE_GAUGE_NAME = "reader.notification.queue.size";
    private final KeyspaceNotificationPublisher<K> publisher;
    private final Converter<K, K> keyExtractor;
    protected final K[] patterns;
    private final QueueOptions queueOptions;
    private boolean open;
    private BlockingQueue<K> queue;
    private final Log log = LogFactory.getLog(getClass());
    private Predicate<K> filter = Objects::isNull;

    public KeyspaceNotificationItemReader(KeyspaceNotificationPublisher<K> keyspaceNotificationPublisher, Converter<K, K> converter, K[] kArr, QueueOptions queueOptions) {
        Assert.notNull(kArr, "Patterns must not be null");
        setName(ClassUtils.getShortName(getClass()));
        this.publisher = keyspaceNotificationPublisher;
        this.keyExtractor = converter;
        this.patterns = kArr;
        this.queueOptions = queueOptions;
    }

    public void setFilter(Predicate<K> predicate) {
        this.filter = OrPredicate.of(this.filter, predicate);
    }

    @Override // com.redis.spring.batch.reader.KeyspaceNotificationListener
    public void notification(K k) {
        if (k == null) {
            return;
        }
        Object convert = this.keyExtractor.convert(k);
        if (this.filter.test(convert)) {
            return;
        }
        this.queue.removeIf(obj -> {
            return obj.equals(convert);
        });
        if (this.queue.offer(convert)) {
            return;
        }
        this.log.warn("Could not add key because queue is full");
    }

    public K read() throws Exception {
        return poll(this.queueOptions.getPollTimeout().toMillis(), TimeUnit.MILLISECONDS);
    }

    public synchronized void open(ExecutionContext executionContext) throws ItemStreamException {
        if (this.queue == null) {
            this.queue = new LinkedBlockingQueue(this.queueOptions.getCapacity());
            Utils.createGaugeCollectionSize(QUEUE_SIZE_GAUGE_NAME, this.queue, new Tag[0]);
            this.publisher.addListener(this);
            this.publisher.subscribe(this.patterns);
            this.open = true;
        }
    }

    @Override // com.redis.spring.batch.reader.PollableItemReader
    public K poll(long j, TimeUnit timeUnit) throws InterruptedException {
        return this.queue.poll(j, timeUnit);
    }

    public synchronized void close() throws ItemStreamException {
        if (this.queue == null) {
            return;
        }
        if (!this.queue.isEmpty()) {
            this.log.warn("Closing with items still in queue");
        }
        this.publisher.unsubscribe(this.patterns);
        this.queue = null;
        this.open = false;
    }

    public boolean isOpen() {
        return this.open;
    }
}
