package org.springframework.batch.item.redis;

import io.lettuce.core.Consumer;
import io.lettuce.core.RedisBusyException;
import io.lettuce.core.RedisClient;
import io.lettuce.core.StreamMessage;
import io.lettuce.core.XGroupCreateArgs;
import io.lettuce.core.XReadArgs;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.sync.BaseRedisCommands;
import io.lettuce.core.api.sync.RedisStreamCommands;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.codec.StringCodec;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.redis.support.CommandBuilder;
import org.springframework.batch.item.redis.support.ConnectionPoolItemStream;
import org.springframework.batch.item.redis.support.PollableItemReader;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;

/* loaded from: input_file:org/springframework/batch/item/redis/StreamItemReader.class */
public class StreamItemReader extends ConnectionPoolItemStream<String, String> implements PollableItemReader<StreamMessage<String, String>> {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(StreamItemReader.class);
    private final Function<StatefulConnection<String, String>, BaseRedisCommands<String, String>> sync;
    private final Long count;
    private final Duration block;
    private final XReadArgs.StreamOffset<String> offset;
    private final String consumerGroup;
    private final String consumer;
    private final AckPolicy ackPolicy;
    private Iterator<StreamMessage<String, String>> iterator;

    /* loaded from: input_file:org/springframework/batch/item/redis/StreamItemReader$AckPolicy.class */
    public enum AckPolicy {
        AUTO,
        MANUAL
    }

    /* loaded from: input_file:org/springframework/batch/item/redis/StreamItemReader$RedisClientStreamItemReaderBuilder.class */
    public static class RedisClientStreamItemReaderBuilder {
        private final RedisClient client;

        public RedisClientStreamItemReaderBuilder(RedisClient redisClient) {
            this.client = redisClient;
        }

        public StreamItemReaderBuilder offset(XReadArgs.StreamOffset<String> streamOffset) {
            return new StreamItemReaderBuilder(this.client, streamOffset);
        }
    }

    /* loaded from: input_file:org/springframework/batch/item/redis/StreamItemReader$RedisClusterClientStreamItemReaderBuilder.class */
    public static class RedisClusterClientStreamItemReaderBuilder {
        private final RedisClusterClient client;

        public RedisClusterClientStreamItemReaderBuilder(RedisClusterClient redisClusterClient) {
            this.client = redisClusterClient;
        }

        public StreamItemReaderBuilder offset(XReadArgs.StreamOffset<String> streamOffset) {
            return new StreamItemReaderBuilder(this.client, streamOffset);
        }
    }

    /* loaded from: input_file:org/springframework/batch/item/redis/StreamItemReader$StreamItemReaderBuilder.class */
    public static class StreamItemReaderBuilder extends CommandBuilder<String, String, StreamItemReaderBuilder> {
        public static final long DEFAULT_COUNT = 50;
        public static final String DEFAULT_CONSUMER = "consumer1";
        private final XReadArgs.StreamOffset<String> offset;
        private Duration block;
        private Long count;
        private String consumerGroup;
        private String consumer;
        private AckPolicy ackPolicy;
        public static final Duration DEFAULT_BLOCK = Duration.ofMillis(100);
        public static final String DEFAULT_CONSUMER_GROUP = ClassUtils.getShortName(StreamItemReader.class);
        public static final AckPolicy DEFAULT_ACK_POLICY = AckPolicy.AUTO;

        public StreamItemReaderBuilder(RedisClient redisClient, XReadArgs.StreamOffset<String> streamOffset) {
            super(redisClient, (RedisCodec) StringCodec.UTF8);
            this.block = DEFAULT_BLOCK;
            this.count = 50L;
            this.consumerGroup = DEFAULT_CONSUMER_GROUP;
            this.consumer = DEFAULT_CONSUMER;
            this.ackPolicy = DEFAULT_ACK_POLICY;
            this.offset = streamOffset;
        }

        public StreamItemReaderBuilder(RedisClusterClient redisClusterClient, XReadArgs.StreamOffset<String> streamOffset) {
            super(redisClusterClient, (RedisCodec) StringCodec.UTF8);
            this.block = DEFAULT_BLOCK;
            this.count = 50L;
            this.consumerGroup = DEFAULT_CONSUMER_GROUP;
            this.consumer = DEFAULT_CONSUMER;
            this.ackPolicy = DEFAULT_ACK_POLICY;
            this.offset = streamOffset;
        }

        public StreamItemReader build() {
            return new StreamItemReader(this.connectionSupplier, this.poolConfig, this.sync, this.count, this.block, this.consumerGroup, this.consumer, this.offset, this.ackPolicy);
        }

        @Generated
        public StreamItemReaderBuilder block(Duration duration) {
            this.block = duration;
            return this;
        }

        @Generated
        public StreamItemReaderBuilder count(Long l) {
            this.count = l;
            return this;
        }

        @Generated
        public StreamItemReaderBuilder consumerGroup(String str) {
            this.consumerGroup = str;
            return this;
        }

        @Generated
        public StreamItemReaderBuilder consumer(String str) {
            this.consumer = str;
            return this;
        }

        @Generated
        public StreamItemReaderBuilder ackPolicy(AckPolicy ackPolicy) {
            this.ackPolicy = ackPolicy;
            return this;
        }
    }

    public StreamItemReader(Supplier<StatefulConnection<String, String>> supplier, GenericObjectPoolConfig<StatefulConnection<String, String>> genericObjectPoolConfig, Function<StatefulConnection<String, String>, BaseRedisCommands<String, String>> function, Long l, Duration duration, String str, String str2, XReadArgs.StreamOffset<String> streamOffset, AckPolicy ackPolicy) {
        super(supplier, genericObjectPoolConfig);
        this.iterator = Collections.emptyIterator();
        Assert.notNull(function, "A command provider is required");
        this.sync = function;
        this.count = l;
        this.block = duration;
        this.consumerGroup = str;
        this.consumer = str2;
        this.offset = streamOffset;
        this.ackPolicy = ackPolicy;
    }

    @Override // org.springframework.batch.item.redis.support.ConnectionPoolItemStream
    public synchronized void open(ExecutionContext executionContext) {
        super.open(executionContext);
        try {
            StatefulConnection<String, String> statefulConnection = (StatefulConnection) this.pool.borrowObject();
            try {
                try {
                    this.sync.apply(statefulConnection).xgroupCreate(this.offset, this.consumerGroup, XGroupCreateArgs.Builder.mkstream(true));
                } catch (RedisBusyException e) {
                }
                if (statefulConnection != null) {
                    statefulConnection.close();
                }
            } finally {
            }
        } catch (Exception e2) {
            throw new ItemStreamException("Failed to initialize the reader", e2);
        }
    }

    /* renamed from: read, reason: merged with bridge method [inline-methods] */
    public StreamMessage<String, String> m1read() throws Exception {
        throw new IllegalAccessException("read() method is not supposed to be called");
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.springframework.batch.item.redis.support.PollableItemReader
    public StreamMessage<String, String> poll(long j, TimeUnit timeUnit) throws Exception {
        if (!this.iterator.hasNext()) {
            List<StreamMessage<String, String>> readMessages = readMessages(Duration.ofMillis(timeUnit.toMillis(j)));
            if (readMessages == null || readMessages.isEmpty()) {
                return null;
            }
            this.iterator = readMessages.iterator();
        }
        return this.iterator.next();
    }

    public List<StreamMessage<String, String>> readMessages() throws Exception {
        return readMessages(this.block);
    }

    private List<StreamMessage<String, String>> readMessages(Duration duration) throws Exception {
        XReadArgs count = XReadArgs.Builder.count(this.count.longValue());
        if (duration != null) {
            count.block(duration);
        }
        StatefulConnection<String, String> statefulConnection = (StatefulConnection) this.pool.borrowObject();
        try {
            List<StreamMessage<String, String>> xreadgroup = this.sync.apply(statefulConnection).xreadgroup(Consumer.from(this.consumerGroup, this.consumer), count, new XReadArgs.StreamOffset[]{XReadArgs.StreamOffset.lastConsumed((String) this.offset.getName())});
            if (this.ackPolicy == AckPolicy.AUTO) {
                ack(xreadgroup);
            }
            if (statefulConnection != null) {
                statefulConnection.close();
            }
            return xreadgroup;
        } catch (Throwable th) {
            if (statefulConnection != null) {
                try {
                    statefulConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public void ack(List<? extends StreamMessage<String, String>> list) throws Exception {
        if (list.isEmpty()) {
            return;
        }
        StatefulConnection<String, String> statefulConnection = (StatefulConnection) this.pool.borrowObject();
        try {
            RedisStreamCommands apply = this.sync.apply(statefulConnection);
            for (Map.Entry entry : ((Map) list.stream().collect(Collectors.groupingBy((v0) -> {
                return v0.getStream();
            }))).entrySet()) {
                String[] strArr = (String[]) ((List) entry.getValue()).stream().map((v0) -> {
                    return v0.getId();
                }).toArray(i -> {
                    return new String[i];
                });
                log.info("Ack'ing message ids: {}", Arrays.asList(strArr));
                apply.xack((String) entry.getKey(), this.consumerGroup, strArr);
            }
            if (statefulConnection != null) {
                statefulConnection.close();
            }
        } catch (Throwable th) {
            if (statefulConnection != null) {
                try {
                    statefulConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public static RedisClientStreamItemReaderBuilder client(RedisClient redisClient) {
        return new RedisClientStreamItemReaderBuilder(redisClient);
    }

    public static RedisClusterClientStreamItemReaderBuilder client(RedisClusterClient redisClusterClient) {
        return new RedisClusterClientStreamItemReaderBuilder(redisClusterClient);
    }
}
