package com.redis.spring.batch.reader;

import com.redis.lettucemod.api.StatefulRedisModulesConnection;
import com.redis.lettucemod.util.RedisModulesUtils;
import com.redis.spring.batch.common.Utils;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.Consumer;
import io.lettuce.core.RedisBusyException;
import io.lettuce.core.StreamMessage;
import io.lettuce.core.XGroupCreateArgs;
import io.lettuce.core.XReadArgs;
import io.lettuce.core.api.sync.RedisStreamCommands;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.codec.StringCodec;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.StreamSupport;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.support.AbstractItemStreamItemReader;
import org.springframework.util.ClassUtils;

/* loaded from: input_file:com/redis/spring/batch/reader/StreamItemReader.class */
public class StreamItemReader<K, V> extends AbstractItemStreamItemReader<StreamMessage<K, V>> implements PollableItemReader<StreamMessage<K, V>> {
    public static final String DEFAULT_CONSUMER = "consumer1";
    public static final String DEFAULT_OFFSET = "0-0";
    public static final long DEFAULT_COUNT = 50;
    private final AbstractRedisClient client;
    private final RedisCodec<K, V> codec;
    private final K stream;
    private Consumer<K> consumer;
    private StatefulRedisModulesConnection<K, V> connection;
    private MessageReader<K, V> messageReader;
    private String lastId;
    private RedisStreamCommands<K, V> commands;
    public static final String DEFAULT_CONSUMER_GROUP = ClassUtils.getShortName(StreamItemReader.class);
    public static final Duration DEFAULT_POLL_DURATION = Duration.ofSeconds(1);
    public static final Duration DEFAULT_BLOCK = Duration.ofMillis(100);
    public static final StreamAckPolicy DEFAULT_ACK_POLICY = StreamAckPolicy.AUTO;
    private String offset = DEFAULT_OFFSET;
    private Duration block = DEFAULT_BLOCK;
    private long count = 50;
    private StreamAckPolicy ackPolicy = DEFAULT_ACK_POLICY;
    private Iterator<StreamMessage<K, V>> iterator = Collections.emptyIterator();

    /* loaded from: input_file:com/redis/spring/batch/reader/StreamItemReader$AutoAckMessageReader.class */
    private class AutoAckMessageReader extends StreamItemReader<K, V>.ExplicitAckMessageReader {
        private AutoAckMessageReader() {
            super();
        }

        @Override // com.redis.spring.batch.reader.StreamItemReader.ExplicitAckMessageReader, com.redis.spring.batch.reader.StreamItemReader.MessageReader
        public List<StreamMessage<K, V>> read(long j) {
            List<StreamMessage<K, V>> read = super.read(j);
            StreamItemReader.this.ack(read);
            return read;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/redis/spring/batch/reader/StreamItemReader$AutoAckPendingMessageReader.class */
    public class AutoAckPendingMessageReader extends StreamItemReader<K, V>.ExplicitAckPendingMessageReader {
        private AutoAckPendingMessageReader() {
            super();
        }

        @Override // com.redis.spring.batch.reader.StreamItemReader.ExplicitAckPendingMessageReader
        protected MessageReader<K, V> messageReader() {
            return new AutoAckMessageReader();
        }

        @Override // com.redis.spring.batch.reader.StreamItemReader.ExplicitAckPendingMessageReader
        protected List<StreamMessage<K, V>> recover(List<StreamMessage<K, V>> list) {
            StreamItemReader.this.ack(list);
            return Collections.emptyList();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/redis/spring/batch/reader/StreamItemReader$ExplicitAckMessageReader.class */
    public class ExplicitAckMessageReader implements MessageReader<K, V> {
        private ExplicitAckMessageReader() {
        }

        @Override // com.redis.spring.batch.reader.StreamItemReader.MessageReader
        public List<StreamMessage<K, V>> read(long j) {
            return StreamItemReader.this.commands.xreadgroup(StreamItemReader.this.consumer, StreamItemReader.this.args(j), new XReadArgs.StreamOffset[]{XReadArgs.StreamOffset.lastConsumed(StreamItemReader.this.stream)});
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/redis/spring/batch/reader/StreamItemReader$ExplicitAckPendingMessageReader.class */
    public class ExplicitAckPendingMessageReader implements MessageReader<K, V> {
        private ExplicitAckPendingMessageReader() {
        }

        protected List<StreamMessage<K, V>> readMessages(XReadArgs xReadArgs) {
            return recover(StreamItemReader.this.commands.xreadgroup(StreamItemReader.this.consumer, xReadArgs, new XReadArgs.StreamOffset[]{XReadArgs.StreamOffset.from(StreamItemReader.this.stream, StreamItemReader.DEFAULT_OFFSET)}));
        }

        protected List<StreamMessage<K, V>> recover(List<StreamMessage<K, V>> list) {
            if (list.isEmpty()) {
                return list;
            }
            ArrayList arrayList = new ArrayList();
            ArrayList arrayList2 = new ArrayList();
            StreamId parse = StreamId.parse(StreamItemReader.this.lastId);
            for (StreamMessage<K, V> streamMessage : list) {
                if (StreamId.parse(streamMessage.getId()).compareTo(parse) > 0) {
                    arrayList.add(streamMessage);
                    StreamItemReader.this.lastId = streamMessage.getId();
                } else {
                    arrayList2.add(streamMessage);
                }
            }
            StreamItemReader.this.ack(arrayList2);
            return arrayList;
        }

        protected MessageReader<K, V> messageReader() {
            return new ExplicitAckMessageReader();
        }

        @Override // com.redis.spring.batch.reader.StreamItemReader.MessageReader
        public List<StreamMessage<K, V>> read(long j) {
            List<StreamMessage<K, V>> readMessages = readMessages(StreamItemReader.this.args(j));
            if (!readMessages.isEmpty()) {
                return readMessages;
            }
            StreamItemReader.this.messageReader = messageReader();
            return StreamItemReader.this.messageReader.read(j);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/redis/spring/batch/reader/StreamItemReader$MessageReader.class */
    public interface MessageReader<K, V> {
        List<StreamMessage<K, V>> read(long j);
    }

    /* loaded from: input_file:com/redis/spring/batch/reader/StreamItemReader$StreamId.class */
    public static class StreamId implements Comparable<StreamId> {
        public static final StreamId ZERO = of(0, 0);
        private final long millis;
        private final long sequence;

        public StreamId(long j, long j2) {
            this.millis = j;
            this.sequence = j2;
        }

        private static void checkPositive(String str, long j) {
            if (j < 0) {
                throw new IllegalArgumentException(String.format("not an id: %s", str));
            }
        }

        public static StreamId parse(String str) {
            int indexOf = str.indexOf("-");
            if (indexOf == -1) {
                long parseLong = Long.parseLong(str);
                checkPositive(str, parseLong);
                return of(parseLong, 0L);
            }
            long parseLong2 = Long.parseLong(str.substring(0, indexOf));
            checkPositive(str, parseLong2);
            long parseLong3 = Long.parseLong(str.substring(indexOf + 1));
            checkPositive(str, parseLong3);
            return of(parseLong2, parseLong3);
        }

        public static StreamId of(long j, long j2) {
            return new StreamId(j, j2);
        }

        public String toStreamId() {
            return this.millis + "-" + this.sequence;
        }

        public String toString() {
            return toStreamId();
        }

        @Override // java.lang.Comparable
        public int compareTo(StreamId streamId) {
            long j = this.millis - streamId.millis;
            return j != 0 ? Long.signum(j) : Long.signum(this.sequence - streamId.sequence);
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof StreamId)) {
                return false;
            }
            StreamId streamId = (StreamId) obj;
            return streamId.millis == this.millis && streamId.sequence == this.sequence;
        }

        public int hashCode() {
            long j = this.millis * 31 * this.sequence;
            return (int) (j ^ (j >> 32));
        }
    }

    public StreamItemReader(AbstractRedisClient abstractRedisClient, RedisCodec<K, V> redisCodec, K k) {
        setName(ClassUtils.getShortName(getClass()));
        this.client = abstractRedisClient;
        this.codec = redisCodec;
        this.stream = k;
        this.consumer = Consumer.from(toKey(redisCodec, DEFAULT_CONSUMER_GROUP), toKey(redisCodec, DEFAULT_CONSUMER));
    }

    private static <K, V> K toKey(RedisCodec<K, V> redisCodec, String str) {
        return (K) redisCodec.decodeKey(StringCodec.UTF8.encodeKey(str));
    }

    public void setConsumer(Consumer<K> consumer) {
        this.consumer = consumer;
    }

    public void setOffset(String str) {
        this.offset = str;
    }

    public void setBlock(Duration duration) {
        this.block = duration;
    }

    public void setCount(long j) {
        this.count = j;
    }

    public void setAckPolicy(StreamAckPolicy streamAckPolicy) {
        this.ackPolicy = streamAckPolicy;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public XReadArgs args(long j) {
        return XReadArgs.Builder.count(this.count).block(j);
    }

    public synchronized void open(ExecutionContext executionContext) {
        super.open(executionContext);
        if (isOpen()) {
            return;
        }
        doOpen();
    }

    private void doOpen() {
        this.connection = RedisModulesUtils.connection(this.client, this.codec);
        this.commands = (RedisStreamCommands) Utils.sync(this.connection);
        try {
            this.commands.xgroupCreate(XReadArgs.StreamOffset.from(this.stream, this.offset), this.consumer.getGroup(), XGroupCreateArgs.Builder.mkstream(true));
        } catch (RedisBusyException e) {
        }
        this.lastId = this.offset;
        this.messageReader = reader();
    }

    public boolean isOpen() {
        return this.messageReader != null;
    }

    public synchronized void close() {
        if (isOpen()) {
            doClose();
        }
        super.close();
    }

    private void doClose() {
        this.messageReader = null;
        this.lastId = null;
        this.connection.close();
        this.connection = null;
        this.commands = null;
    }

    private MessageReader<K, V> reader() {
        return this.ackPolicy == StreamAckPolicy.MANUAL ? new ExplicitAckPendingMessageReader() : new AutoAckPendingMessageReader();
    }

    public void update(ExecutionContext executionContext) throws ItemStreamException {
    }

    /* renamed from: read, reason: merged with bridge method [inline-methods] */
    public StreamMessage<K, V> m28read() throws Exception {
        return poll(DEFAULT_POLL_DURATION.toMillis(), TimeUnit.MILLISECONDS);
    }

    @Override // com.redis.spring.batch.reader.PollableItemReader
    public synchronized StreamMessage<K, V> poll(long j, TimeUnit timeUnit) throws PollingException {
        if (!this.iterator.hasNext()) {
            List<StreamMessage<K, V>> read = this.messageReader.read(timeUnit.toMillis(j));
            if (read == null || read.isEmpty()) {
                return null;
            }
            this.iterator = read.iterator();
        }
        return this.iterator.next();
    }

    public List<StreamMessage<K, V>> readMessages() {
        return this.messageReader.read(this.block.toMillis());
    }

    public Long ack(Iterable<? extends StreamMessage<K, V>> iterable) {
        if (iterable == null) {
            return 0L;
        }
        return doAck((String[]) StreamSupport.stream(iterable.spliterator(), false).map((v0) -> {
            return v0.getId();
        }).toArray(i -> {
            return new String[i];
        }));
    }

    public Long ack(String... strArr) {
        if (strArr.length == 0) {
            return 0L;
        }
        this.lastId = strArr[strArr.length - 1];
        return doAck(strArr);
    }

    private Long doAck(String... strArr) {
        if (strArr.length == 0) {
            return 0L;
        }
        return this.commands.xack(this.stream, this.consumer.getGroup(), strArr);
    }

    public long streamLength() {
        return this.commands.xlen(this.stream).longValue();
    }
}
