package com.redis.spring.batch.reader;

import com.redis.spring.batch.support.ConnectionPoolItemStream;
import com.redis.spring.batch.support.Utils;
import io.lettuce.core.Consumer;
import io.lettuce.core.RedisBusyException;
import io.lettuce.core.StreamMessage;
import io.lettuce.core.XGroupCreateArgs;
import io.lettuce.core.XReadArgs;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.sync.BaseRedisCommands;
import io.lettuce.core.api.sync.RedisStreamCommands;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.util.Assert;

/* loaded from: input_file:com/redis/spring/batch/reader/StreamItemReader.class */
public class StreamItemReader<K, V> extends ConnectionPoolItemStream<K, V> implements PollableItemReader<StreamMessage<K, V>> {
    public static final long DEFAULT_COUNT = 50;
    private final Function<StatefulConnection<K, V>, BaseRedisCommands<K, V>> sync;
    private final XReadArgs.StreamOffset<K> offset;
    private long count;
    private Optional<Duration> block;
    private K consumerGroup;
    private K consumer;
    private AckPolicy ackPolicy;
    private Iterator<StreamMessage<K, V>> iterator;
    private static final Logger log = LoggerFactory.getLogger(StreamItemReader.class);
    public static final Duration DEFAULT_BLOCK = Duration.ofMillis(100);
    public static final AckPolicy DEFAULT_ACK_POLICY = AckPolicy.AUTO;

    /* loaded from: input_file:com/redis/spring/batch/reader/StreamItemReader$AckPolicy.class */
    public enum AckPolicy {
        AUTO,
        MANUAL
    }

    public StreamItemReader(Supplier<StatefulConnection<K, V>> supplier, GenericObjectPoolConfig<StatefulConnection<K, V>> genericObjectPoolConfig, Function<StatefulConnection<K, V>, BaseRedisCommands<K, V>> function, XReadArgs.StreamOffset<K> streamOffset) {
        super(supplier, genericObjectPoolConfig);
        this.count = 50L;
        this.block = Optional.of(DEFAULT_BLOCK);
        this.ackPolicy = DEFAULT_ACK_POLICY;
        this.iterator = Collections.emptyIterator();
        Assert.notNull(function, "A command provider is required");
        Assert.notNull(streamOffset, "A stream offset is required");
        this.sync = function;
        this.offset = streamOffset;
    }

    public void setConsumer(K k) {
        this.consumer = k;
    }

    public void setConsumerGroup(K k) {
        this.consumerGroup = k;
    }

    public void setCount(long j) {
        this.count = j;
    }

    public void setAckPolicy(AckPolicy ackPolicy) {
        this.ackPolicy = ackPolicy;
    }

    public void setBlock(Duration duration) {
        Utils.assertPositive(duration, "Block");
        this.block = Optional.of(duration);
    }

    @Override // com.redis.spring.batch.support.ConnectionPoolItemStream
    public void open(ExecutionContext executionContext) {
        super.open(executionContext);
        synchronized (this.sync) {
            try {
                StatefulConnection<K, V> borrowConnection = borrowConnection();
                try {
                    this.sync.apply(borrowConnection).xgroupCreate(this.offset, this.consumerGroup, XGroupCreateArgs.Builder.mkstream(true));
                    if (borrowConnection != null) {
                        borrowConnection.close();
                    }
                } catch (Throwable th) {
                    if (borrowConnection != null) {
                        try {
                            borrowConnection.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (Exception e) {
                throw new ItemStreamException("Failed to initialize the reader", e);
            } catch (RedisBusyException e2) {
            }
        }
    }

    /* renamed from: read, reason: merged with bridge method [inline-methods] */
    public StreamMessage<K, V> m26read() throws Exception {
        throw new IllegalAccessException("read() method is not supposed to be called");
    }

    @Override // com.redis.spring.batch.reader.PollableItemReader
    public StreamMessage<K, V> poll(long j, TimeUnit timeUnit) throws Exception {
        if (!this.iterator.hasNext()) {
            List<StreamMessage<K, V>> readMessages = readMessages(Optional.of(Long.valueOf(timeUnit.toMillis(j))));
            if (readMessages == null || readMessages.isEmpty()) {
                return null;
            }
            this.iterator = readMessages.iterator();
        }
        return this.iterator.next();
    }

    public List<StreamMessage<K, V>> readMessages() throws Exception {
        return readMessages(this.block.map((v0) -> {
            return v0.toMillis();
        }));
    }

    private List<StreamMessage<K, V>> readMessages(Optional<Long> optional) throws Exception {
        XReadArgs count = XReadArgs.Builder.count(this.count);
        Objects.requireNonNull(count);
        optional.ifPresent((v1) -> {
            r1.block(v1);
        });
        StatefulConnection<K, V> borrowConnection = borrowConnection();
        try {
            RedisStreamCommands<K, V> redisStreamCommands = (RedisStreamCommands) this.sync.apply(borrowConnection);
            List<? extends StreamMessage<K, V>> xreadgroup = redisStreamCommands.xreadgroup(Consumer.from(this.consumerGroup, this.consumer), count, new XReadArgs.StreamOffset[]{XReadArgs.StreamOffset.lastConsumed(this.offset.getName())});
            if (this.ackPolicy == AckPolicy.AUTO) {
                ack(xreadgroup, redisStreamCommands);
            }
            if (borrowConnection != null) {
                borrowConnection.close();
            }
            return xreadgroup;
        } catch (Throwable th) {
            if (borrowConnection != null) {
                try {
                    borrowConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public void ack(List<? extends StreamMessage<K, V>> list) throws Exception {
        if (list.isEmpty()) {
            return;
        }
        StatefulConnection<K, V> borrowConnection = borrowConnection();
        try {
            ack(list, (RedisStreamCommands) this.sync.apply(borrowConnection));
            if (borrowConnection != null) {
                borrowConnection.close();
            }
        } catch (Throwable th) {
            if (borrowConnection != null) {
                try {
                    borrowConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void ack(List<? extends StreamMessage<K, V>> list, RedisStreamCommands<K, V> redisStreamCommands) {
        for (Map.Entry<K, V> entry : ((Map) list.stream().collect(Collectors.groupingBy((v0) -> {
            return v0.getStream();
        }))).entrySet()) {
            String[] strArr = (String[]) ((List) entry.getValue()).stream().map((v0) -> {
                return v0.getId();
            }).toArray(i -> {
                return new String[i];
            });
            log.debug("Ack'ing message ids: {}", Arrays.asList(strArr));
            redisStreamCommands.xack(entry.getKey(), this.consumerGroup, strArr);
        }
    }
}
