package com.redis.spring.batch.reader;

import com.redis.lettucemod.RedisModulesClient;
import com.redis.lettucemod.api.StatefulRedisModulesConnection;
import com.redis.lettucemod.cluster.RedisModulesClusterClient;
import com.redis.lettucemod.util.RedisModulesUtils;
import com.redis.spring.batch.common.Utils;
import com.redis.spring.batch.reader.PollableItemReader;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.Consumer;
import io.lettuce.core.RedisBusyException;
import io.lettuce.core.StreamMessage;
import io.lettuce.core.XGroupCreateArgs;
import io.lettuce.core.XReadArgs;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.sync.RedisStreamCommands;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.codec.StringCodec;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader;
import org.springframework.util.ClassUtils;

/* loaded from: input_file:com/redis/spring/batch/reader/StreamItemReader.class */
public class StreamItemReader<K, V> extends AbstractItemCountingItemStreamItemReader<StreamMessage<K, V>> implements PollableItemReader<StreamMessage<K, V>> {
    public static final String START_OFFSET = "0-0";
    public static final String DEFAULT_OFFSET = "0-0";
    public static final long DEFAULT_COUNT = 50;
    private final AbstractRedisClient client;
    private final RedisCodec<K, V> codec;
    private final K stream;
    private final Consumer<K> consumer;
    private final String offset;
    private final Duration block;
    private final long count;
    private final AckPolicy ackPolicy;
    private StatefulRedisModulesConnection<K, V> connection;
    private Iterator<StreamMessage<K, V>> iterator = Collections.emptyIterator();
    private MessageReader<K, V> reader;
    private String lastId;
    public static final Duration DEFAULT_POLL_DURATION = Duration.ofSeconds(1);
    public static final Duration DEFAULT_BLOCK = Duration.ofMillis(100);
    public static final AckPolicy DEFAULT_ACK_POLICY = AckPolicy.AUTO;

    /* loaded from: input_file:com/redis/spring/batch/reader/StreamItemReader$AckPolicy.class */
    public enum AckPolicy {
        AUTO,
        MANUAL
    }

    /* loaded from: input_file:com/redis/spring/batch/reader/StreamItemReader$AutoAckMessageReader.class */
    private class AutoAckMessageReader extends StreamItemReader<K, V>.ExplicitAckMessageReader {
        private AutoAckMessageReader() {
            super();
        }

        @Override // com.redis.spring.batch.reader.StreamItemReader.ExplicitAckMessageReader, com.redis.spring.batch.reader.StreamItemReader.MessageReader
        public List<StreamMessage<K, V>> read(long j) {
            List<StreamMessage<K, V>> read = super.read(j);
            StreamItemReader.this.ack(read);
            return read;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/redis/spring/batch/reader/StreamItemReader$AutoAckPendingMessageReader.class */
    public class AutoAckPendingMessageReader extends StreamItemReader<K, V>.ExplicitAckPendingMessageReader {
        private AutoAckPendingMessageReader() {
            super();
        }

        @Override // com.redis.spring.batch.reader.StreamItemReader.ExplicitAckPendingMessageReader
        protected MessageReader<K, V> messageReader() {
            return new AutoAckMessageReader();
        }

        @Override // com.redis.spring.batch.reader.StreamItemReader.ExplicitAckPendingMessageReader
        protected List<StreamMessage<K, V>> recover(RedisStreamCommands<K, V> redisStreamCommands, List<StreamMessage<K, V>> list) {
            StreamItemReader.this.ack(redisStreamCommands, list);
            return Collections.emptyList();
        }
    }

    /* loaded from: input_file:com/redis/spring/batch/reader/StreamItemReader$Builder.class */
    public static class Builder<K, V> {
        public static final String DEFAULT_CONSUMER_GROUP = ClassUtils.getShortName(StreamItemReader.class);
        public static final String DEFAULT_CONSUMER = "consumer1";
        private final AbstractRedisClient client;
        private final RedisCodec<K, V> codec;
        private final K stream;
        private Consumer<K> consumer;
        private String offset = "0-0";
        private Duration block = StreamItemReader.DEFAULT_BLOCK;
        private long count = 50;
        private AckPolicy ackPolicy = StreamItemReader.DEFAULT_ACK_POLICY;

        protected Builder(AbstractRedisClient abstractRedisClient, RedisCodec<K, V> redisCodec, K k) {
            this.client = abstractRedisClient;
            this.codec = redisCodec;
            this.consumer = Consumer.from(key(DEFAULT_CONSUMER_GROUP, redisCodec), key(DEFAULT_CONSUMER, redisCodec));
            this.stream = k;
        }

        private static <K, V> K key(String str, RedisCodec<K, V> redisCodec) {
            return (K) redisCodec.decodeKey(StringCodec.UTF8.encodeKey(str));
        }

        public Builder<K, V> consumer(Consumer<K> consumer) {
            this.consumer = consumer;
            return this;
        }

        public Builder<K, V> offset(String str) {
            this.offset = str;
            return this;
        }

        public Builder<K, V> block(Duration duration) {
            this.block = duration;
            return this;
        }

        public Builder<K, V> count(long j) {
            this.count = j;
            return this;
        }

        public Builder<K, V> ackPolicy(AckPolicy ackPolicy) {
            this.ackPolicy = ackPolicy;
            return this;
        }

        public StreamItemReader<K, V> build() {
            return new StreamItemReader<>(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/redis/spring/batch/reader/StreamItemReader$ExplicitAckMessageReader.class */
    public class ExplicitAckMessageReader implements MessageReader<K, V> {
        private ExplicitAckMessageReader() {
        }

        @Override // com.redis.spring.batch.reader.StreamItemReader.MessageReader
        public List<StreamMessage<K, V>> read(long j) {
            return StreamItemReader.this.commands(StreamItemReader.this.connection).xreadgroup(StreamItemReader.this.consumer, StreamItemReader.this.args(j), new XReadArgs.StreamOffset[]{XReadArgs.StreamOffset.lastConsumed(StreamItemReader.this.stream)});
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/redis/spring/batch/reader/StreamItemReader$ExplicitAckPendingMessageReader.class */
    public class ExplicitAckPendingMessageReader implements MessageReader<K, V> {
        private ExplicitAckPendingMessageReader() {
        }

        protected List<StreamMessage<K, V>> readMessages(RedisStreamCommands<K, V> redisStreamCommands, XReadArgs xReadArgs) {
            return recover(redisStreamCommands, redisStreamCommands.xreadgroup(StreamItemReader.this.consumer, xReadArgs, new XReadArgs.StreamOffset[]{XReadArgs.StreamOffset.from(StreamItemReader.this.stream, "0-0")}));
        }

        protected List<StreamMessage<K, V>> recover(RedisStreamCommands<K, V> redisStreamCommands, List<StreamMessage<K, V>> list) {
            if (list.isEmpty()) {
                return list;
            }
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            StreamId parse = StreamId.parse(StreamItemReader.this.lastId);
            for (StreamMessage<K, V> streamMessage : list) {
                if (StreamId.parse(streamMessage.getId()).compareTo(parse) > 0) {
                    arrayList.add(streamMessage);
                    StreamItemReader.this.lastId = streamMessage.getId();
                } else {
                    arrayList2.add(streamMessage);
                }
            }
            StreamItemReader.this.ack(redisStreamCommands, arrayList2);
            return arrayList;
        }

        protected MessageReader<K, V> messageReader() {
            return new ExplicitAckMessageReader();
        }

        @Override // com.redis.spring.batch.reader.StreamItemReader.MessageReader
        public List<StreamMessage<K, V>> read(long j) {
            List<StreamMessage<K, V>> readMessages = readMessages(StreamItemReader.this.commands(StreamItemReader.this.connection), StreamItemReader.this.args(j));
            if (!readMessages.isEmpty()) {
                return readMessages;
            }
            StreamItemReader.this.reader = messageReader();
            return StreamItemReader.this.reader.read(j);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/redis/spring/batch/reader/StreamItemReader$MessageReader.class */
    public interface MessageReader<K, V> {
        List<StreamMessage<K, V>> read(long j);
    }

    /* loaded from: input_file:com/redis/spring/batch/reader/StreamItemReader$StreamBuilder.class */
    public static class StreamBuilder<K, V> {
        private final AbstractRedisClient client;
        private final RedisCodec<K, V> codec;

        protected StreamBuilder(AbstractRedisClient abstractRedisClient, RedisCodec<K, V> redisCodec) {
            this.client = abstractRedisClient;
            this.codec = redisCodec;
        }

        public Builder<K, V> stream(K k) {
            return new Builder<>(this.client, this.codec, k);
        }
    }

    /* loaded from: input_file:com/redis/spring/batch/reader/StreamItemReader$StreamId.class */
    public static class StreamId implements Comparable<StreamId> {
        public static final StreamId ZERO = of(0, 0);
        private final long millis;
        private final long sequence;

        public StreamId(long j, long j2) {
            this.millis = j;
            this.sequence = j2;
        }

        private static void checkPositive(String str, long j) {
            if (j < 0) {
                throw new IllegalArgumentException(String.format("not an id: %s", str));
            }
        }

        public static StreamId parse(String str) {
            int indexOf = str.indexOf("-");
            if (indexOf == -1) {
                long parseLong = Long.parseLong(str);
                checkPositive(str, parseLong);
                return of(parseLong, 0L);
            }
            long parseLong2 = Long.parseLong(str.substring(0, indexOf));
            checkPositive(str, parseLong2);
            long parseLong3 = Long.parseLong(str.substring(indexOf + 1));
            checkPositive(str, parseLong3);
            return of(parseLong2, parseLong3);
        }

        public static StreamId of(long j, long j2) {
            return new StreamId(j, j2);
        }

        public String toStreamId() {
            return this.millis + "-" + this.sequence;
        }

        public String toString() {
            return toStreamId();
        }

        @Override // java.lang.Comparable
        public int compareTo(StreamId streamId) {
            long j = this.millis - streamId.millis;
            return j != 0 ? Long.signum(j) : Long.signum(this.sequence - streamId.sequence);
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof StreamId)) {
                return false;
            }
            StreamId streamId = (StreamId) obj;
            return streamId.millis == this.millis && streamId.sequence == this.sequence;
        }

        public int hashCode() {
            long j = this.millis * 31 * this.sequence;
            return (int) (j ^ (j >> 32));
        }
    }

    public StreamItemReader(Builder<K, V> builder) {
        setName(ClassUtils.getShortName(getClass()));
        this.client = ((Builder) builder).client;
        this.codec = ((Builder) builder).codec;
        this.stream = (K) ((Builder) builder).stream;
        this.consumer = ((Builder) builder).consumer;
        this.offset = ((Builder) builder).offset;
        this.block = ((Builder) builder).block;
        this.count = ((Builder) builder).count;
        this.ackPolicy = ((Builder) builder).ackPolicy;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public XReadArgs args(long j) {
        return XReadArgs.Builder.count(this.count).block(j);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public RedisStreamCommands<K, V> commands(StatefulConnection<K, V> statefulConnection) {
        return (RedisStreamCommands) Utils.sync(statefulConnection);
    }

    protected void doOpen() {
        if (this.connection != null) {
            return;
        }
        this.connection = RedisModulesUtils.connection(this.client, this.codec);
        try {
            ((RedisStreamCommands) Utils.sync(this.connection)).xgroupCreate(XReadArgs.StreamOffset.from(this.stream, this.offset), this.consumer.getGroup(), XGroupCreateArgs.Builder.mkstream(true));
        } catch (RedisBusyException e) {
        }
        this.lastId = this.offset;
        this.reader = reader();
    }

    protected void doClose() {
        if (this.connection == null) {
            return;
        }
        this.reader = null;
        this.lastId = null;
        this.connection.close();
    }

    private MessageReader<K, V> reader() {
        return this.ackPolicy == AckPolicy.MANUAL ? new ExplicitAckPendingMessageReader() : new AutoAckPendingMessageReader();
    }

    public void update(ExecutionContext executionContext) throws ItemStreamException {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: doRead, reason: merged with bridge method [inline-methods] */
    public StreamMessage<K, V> m24doRead() throws PollableItemReader.PollingException {
        return poll(DEFAULT_POLL_DURATION.toMillis(), TimeUnit.MILLISECONDS);
    }

    @Override // com.redis.spring.batch.reader.PollableItemReader
    public StreamMessage<K, V> poll(long j, TimeUnit timeUnit) throws PollableItemReader.PollingException {
        if (!this.iterator.hasNext()) {
            List<StreamMessage<K, V>> read = this.reader.read(timeUnit.toMillis(j));
            if (read == null || read.isEmpty()) {
                return null;
            }
            this.iterator = read.iterator();
        }
        return this.iterator.next();
    }

    public List<StreamMessage<K, V>> readMessages() {
        return this.reader.read(this.block.toMillis());
    }

    public long ack(Iterable<? extends StreamMessage<K, V>> iterable) {
        if (iterable == null) {
            return 0L;
        }
        ArrayList arrayList = new ArrayList();
        iterable.forEach(streamMessage -> {
            arrayList.add(streamMessage.getId());
        });
        return ack((String[]) arrayList.toArray(new String[0]));
    }

    public long ack(String... strArr) {
        if (strArr.length == 0) {
            return 0L;
        }
        this.lastId = strArr[strArr.length - 1];
        return ack((RedisStreamCommands) Utils.sync(this.connection), strArr).longValue();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void ack(RedisStreamCommands<K, V> redisStreamCommands, Iterable<StreamMessage<K, V>> iterable) {
        ArrayList arrayList = new ArrayList();
        Iterator<StreamMessage<K, V>> it = iterable.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getId());
        }
        ack(redisStreamCommands, (String[]) arrayList.toArray(new String[0]));
    }

    private Long ack(RedisStreamCommands<K, V> redisStreamCommands, String... strArr) {
        if (strArr.length == 0) {
            return 0L;
        }
        return redisStreamCommands.xack(this.stream, this.consumer.getGroup(), strArr);
    }

    public static StreamBuilder<String, String> client(RedisModulesClient redisModulesClient) {
        return new StreamBuilder<>(redisModulesClient, StringCodec.UTF8);
    }

    public static StreamBuilder<String, String> client(RedisModulesClusterClient redisModulesClusterClient) {
        return new StreamBuilder<>(redisModulesClusterClient, StringCodec.UTF8);
    }

    public static <K, V> StreamBuilder<K, V> client(RedisModulesClient redisModulesClient, RedisCodec<K, V> redisCodec) {
        return new StreamBuilder<>(redisModulesClient, redisCodec);
    }

    public static <K, V> StreamBuilder<K, V> client(RedisModulesClusterClient redisModulesClusterClient, RedisCodec<K, V> redisCodec) {
        return new StreamBuilder<>(redisModulesClusterClient, redisCodec);
    }
}
