package com.redis.spring.batch.reader;

import com.redis.spring.batch.common.DataType;
import com.redis.spring.batch.common.SetBlockingQueue;
import com.redis.spring.batch.util.CodecUtils;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.codec.RedisCodec;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Metrics;
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.support.AbstractItemStreamItemReader;
import org.springframework.util.ClassUtils;

/* loaded from: input_file:com/redis/spring/batch/reader/KeyspaceNotificationItemReader.class */
public class KeyspaceNotificationItemReader<K> extends AbstractItemStreamItemReader<K> implements KeyItemReader<K>, PollableItemReader<K> {
    public static final String MATCH_ALL = "*";
    public static final String PUBSUB_PATTERN_FORMAT = "__keyspace@%s__:%s";
    public static final String QUEUE_METER = "redis.batch.notification.queue.size";
    public static final String QUEUE_MISS_COUNTER = "redis.batch.notification.queue.misses";
    private final AbstractRedisClient client;
    private final Function<String, K> stringKeyEncoder;
    private int database;
    private String keyPattern;
    private DataType keyType;
    private BlockingQueue<KeyspaceNotification> queue;
    private Counter queueMissCounter;
    private KeyspaceNotificationPublisher notificationPublisher;
    private String name;
    public static final Duration DEFAULT_POLL_TIMEOUT = Duration.ofMillis(100);
    public static final OrderingStrategy DEFAULT_ORDERING = OrderingStrategy.PRIORITY;
    private static final KeyspaceNotificationComparator NOTIFICATION_COMPARATOR = new KeyspaceNotificationComparator();
    private final Log log = LogFactory.getLog(getClass());
    private OrderingStrategy orderingStrategy = DEFAULT_ORDERING;
    private int queueCapacity = 10000;
    private Duration pollTimeout = DEFAULT_POLL_TIMEOUT;

    /* loaded from: input_file:com/redis/spring/batch/reader/KeyspaceNotificationItemReader$OrderingStrategy.class */
    public enum OrderingStrategy {
        FIFO,
        PRIORITY
    }

    public KeyspaceNotificationItemReader(AbstractRedisClient abstractRedisClient, RedisCodec<K, ?> redisCodec) {
        setName(ClassUtils.getShortName(getClass()));
        this.client = abstractRedisClient;
        this.stringKeyEncoder = CodecUtils.stringKeyFunction(redisCodec);
    }

    public void setName(String str) {
        super.setName(str);
        this.name = str;
    }

    public BlockingQueue<KeyspaceNotification> getQueue() {
        return this.queue;
    }

    public void setDatabase(int i) {
        this.database = i;
    }

    public void setKeyPattern(String str) {
        this.keyPattern = str;
    }

    public void setPollTimeout(Duration duration) {
        this.pollTimeout = duration;
    }

    public void setQueueCapacity(int i) {
        this.queueCapacity = i;
    }

    public void setKeyType(DataType dataType) {
        this.keyType = dataType;
    }

    public void setOrderingStrategy(OrderingStrategy orderingStrategy) {
        this.orderingStrategy = orderingStrategy;
    }

    public synchronized void open(ExecutionContext executionContext) throws ItemStreamException {
        if (this.notificationPublisher == null) {
            this.log.debug(String.format("Opening %s", this.name));
            this.queue = new SetBlockingQueue(notificationQueue(), this.queueCapacity);
            Metrics.globalRegistry.gaugeCollectionSize(QUEUE_METER, Collections.emptyList(), this.queue);
            this.queueMissCounter = Metrics.globalRegistry.counter(QUEUE_MISS_COUNTER, new String[0]);
            this.notificationPublisher = publisher();
            this.log.debug(String.format("Opened %s", this.name));
        }
    }

    private KeyspaceNotificationPublisher publisher() {
        AbstractKeyspaceNotificationPublisher publisher = publisher(String.format(PUBSUB_PATTERN_FORMAT, Integer.valueOf(this.database), keyPattern()));
        publisher.addConsumer(this::acceptKeyspaceNotification);
        publisher.open();
        return publisher;
    }

    private void acceptKeyspaceNotification(KeyspaceNotification keyspaceNotification) {
        if ((this.keyType == null || keyspaceNotification.getEvent().getType() == this.keyType) && this.queue.remainingCapacity() > 0 && !this.queue.offer(keyspaceNotification)) {
            this.queueMissCounter.increment();
        }
    }

    private AbstractKeyspaceNotificationPublisher publisher(String str) {
        return this.client instanceof RedisClusterClient ? new RedisClusterKeyspaceNotificationPublisher(this.client, str) : new RedisKeyspaceNotificationPublisher(this.client, str);
    }

    private String keyPattern() {
        return this.keyPattern == null ? MATCH_ALL : this.keyPattern;
    }

    private BlockingQueue<KeyspaceNotification> notificationQueue() {
        return this.orderingStrategy == OrderingStrategy.PRIORITY ? new PriorityBlockingQueue(this.queueCapacity, NOTIFICATION_COMPARATOR) : new LinkedBlockingQueue(this.queueCapacity);
    }

    @Override // com.redis.spring.batch.reader.KeyItemReader
    public boolean isOpen() {
        return this.notificationPublisher != null;
    }

    public synchronized void close() {
        if (this.notificationPublisher != null) {
            this.log.debug(String.format("Closing %s", this.name));
            this.notificationPublisher.close();
            this.notificationPublisher = null;
            this.log.debug(String.format("Closed %s", this.name));
        }
        super.close();
    }

    public K read() throws Exception {
        return poll(this.pollTimeout.toMillis(), TimeUnit.MILLISECONDS);
    }

    @Override // com.redis.spring.batch.reader.PollableItemReader
    public K poll(long j, TimeUnit timeUnit) throws InterruptedException {
        KeyspaceNotification poll = this.queue.poll(j, timeUnit);
        if (poll == null) {
            return null;
        }
        return this.stringKeyEncoder.apply(poll.getKey());
    }
}
