package com.redis.spring.batch.reader;

import com.redis.spring.batch.common.Utils;
import com.redis.spring.batch.reader.StreamReaderOptions;
import io.lettuce.core.Consumer;
import io.lettuce.core.RedisBusyException;
import io.lettuce.core.StreamMessage;
import io.lettuce.core.XGroupCreateArgs;
import io.lettuce.core.XReadArgs;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.sync.RedisStreamCommands;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.util.Assert;

/* loaded from: input_file:com/redis/spring/batch/reader/StreamItemReader.class */
public class StreamItemReader<K, V> implements PollableItemReader<StreamMessage<K, V>> {
    private final GenericObjectPool<StatefulConnection<K, V>> pool;
    private final K stream;
    private final Consumer<K> consumer;
    private final StreamReaderOptions options;
    private Iterator<StreamMessage<K, V>> iterator = Collections.emptyIterator();
    private boolean open;

    public StreamItemReader(GenericObjectPool<StatefulConnection<K, V>> genericObjectPool, K k, Consumer<K> consumer, StreamReaderOptions streamReaderOptions) {
        Assert.notNull(genericObjectPool, "A connection pool is required");
        this.pool = genericObjectPool;
        this.stream = k;
        this.consumer = consumer;
        this.options = streamReaderOptions;
    }

    public void open(ExecutionContext executionContext) {
        synchronized (this.pool) {
            try {
                StatefulConnection statefulConnection = (StatefulConnection) this.pool.borrowObject();
                try {
                    ((RedisStreamCommands) Utils.sync(statefulConnection)).xgroupCreate(XReadArgs.StreamOffset.from(this.stream, this.options.getOffset()), this.consumer.getGroup(), XGroupCreateArgs.Builder.mkstream(true));
                    this.open = true;
                    if (statefulConnection != null) {
                        statefulConnection.close();
                    }
                } catch (Throwable th) {
                    if (statefulConnection != null) {
                        try {
                            statefulConnection.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (Exception e) {
                throw new ItemStreamException("Failed to initialize the reader", e);
            } catch (RedisBusyException e2) {
            }
        }
    }

    public boolean isOpen() {
        return this.open;
    }

    public void update(ExecutionContext executionContext) throws ItemStreamException {
    }

    /* renamed from: read, reason: merged with bridge method [inline-methods] */
    public StreamMessage<K, V> m20read() throws Exception {
        throw new IllegalAccessException("read() method is not supposed to be called");
    }

    @Override // com.redis.spring.batch.reader.PollableItemReader
    public StreamMessage<K, V> poll(long j, TimeUnit timeUnit) throws Exception {
        if (!this.iterator.hasNext()) {
            List<StreamMessage<K, V>> readMessages = readMessages(timeUnit.toMillis(j));
            if (readMessages == null || readMessages.isEmpty()) {
                return null;
            }
            this.iterator = readMessages.iterator();
        }
        return this.iterator.next();
    }

    public List<StreamMessage<K, V>> readMessages() throws Exception {
        return readMessages(this.options.getBlock().toMillis());
    }

    private List<StreamMessage<K, V>> readMessages(long j) throws Exception {
        XReadArgs block = XReadArgs.Builder.count(this.options.getCount()).block(j);
        StatefulConnection statefulConnection = (StatefulConnection) this.pool.borrowObject();
        try {
            RedisStreamCommands<K, V> redisStreamCommands = (RedisStreamCommands) Utils.sync(statefulConnection);
            List<? extends StreamMessage<K, V>> xreadgroup = redisStreamCommands.xreadgroup(this.consumer, block, new XReadArgs.StreamOffset[]{XReadArgs.StreamOffset.lastConsumed(this.stream)});
            if (this.options.getAckPolicy() == StreamReaderOptions.AckPolicy.AUTO) {
                ack(xreadgroup, redisStreamCommands);
            }
            if (statefulConnection != null) {
                statefulConnection.close();
            }
            return xreadgroup;
        } catch (Throwable th) {
            if (statefulConnection != null) {
                try {
                    statefulConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public void ack(List<? extends StreamMessage<K, V>> list) throws Exception {
        if (list.isEmpty()) {
            return;
        }
        StatefulConnection statefulConnection = (StatefulConnection) this.pool.borrowObject();
        try {
            ack(list, (RedisStreamCommands) Utils.sync(statefulConnection));
            if (statefulConnection != null) {
                statefulConnection.close();
            }
        } catch (Throwable th) {
            if (statefulConnection != null) {
                try {
                    statefulConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void ack(List<? extends StreamMessage<K, V>> list, RedisStreamCommands<K, V> redisStreamCommands) {
        for (Map.Entry<K, V> entry : ((Map) list.stream().collect(Collectors.groupingBy((v0) -> {
            return v0.getStream();
        }))).entrySet()) {
            redisStreamCommands.xack(entry.getKey(), this.consumer.getGroup(), (String[]) ((List) entry.getValue()).stream().map((v0) -> {
                return v0.getId();
            }).toArray(i -> {
                return new String[i];
            }));
        }
    }

    public void close() throws ItemStreamException {
        this.open = false;
    }
}
