package com.redis.spring.batch;

import com.redis.spring.batch.support.ConnectionPoolItemStream;
import com.redis.spring.batch.support.RedisConnectionBuilder;
import com.redis.spring.batch.writer.DataStructureOperation;
import com.redis.spring.batch.writer.MultiExecOperation;
import com.redis.spring.batch.writer.Operation;
import com.redis.spring.batch.writer.PipelinedOperation;
import com.redis.spring.batch.writer.SimplePipelinedOperation;
import com.redis.spring.batch.writer.WaitForReplicationOperation;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.LettuceFutures;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.StreamMessage;
import io.lettuce.core.XAddArgs;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.async.BaseRedisAsyncCommands;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.codec.StringCodec;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.batch.item.ItemWriter;
import org.springframework.core.convert.converter.Converter;

/* loaded from: input_file:com/redis/spring/batch/RedisItemWriter.class */
public class RedisItemWriter<K, V, T> extends ConnectionPoolItemStream<K, V> implements ItemWriter<T> {
    private final Function<StatefulConnection<K, V>, BaseRedisAsyncCommands<K, V>> async;
    private final PipelinedOperation<K, V, T> operation;

    /* loaded from: input_file:com/redis/spring/batch/RedisItemWriter$Builder.class */
    public static class Builder<K, V, T> extends RedisConnectionBuilder<K, V, Builder<K, V, T>> {
        private final PipelinedOperation<K, V, T> operation;
        private boolean multiExec;
        private Optional<WaitForReplication> waitForReplication;

        public Builder(AbstractRedisClient abstractRedisClient, RedisCodec<K, V> redisCodec, Operation<K, V, T> operation) {
            this(abstractRedisClient, redisCodec, new SimplePipelinedOperation(operation));
        }

        public Builder(AbstractRedisClient abstractRedisClient, RedisCodec<K, V> redisCodec, PipelinedOperation<K, V, T> pipelinedOperation) {
            super(abstractRedisClient, redisCodec);
            this.waitForReplication = Optional.empty();
            this.operation = pipelinedOperation;
        }

        public Builder<K, V, T> multiExec() {
            this.multiExec = true;
            return this;
        }

        public Builder<K, V, T> waitForReplication(WaitForReplication waitForReplication) {
            this.waitForReplication = Optional.of(waitForReplication);
            return this;
        }

        private PipelinedOperation<K, V, T> operation() {
            PipelinedOperation<K, V, T> multiExecOperation = this.multiExec ? new MultiExecOperation<>(this.operation) : this.operation;
            return this.waitForReplication.isPresent() ? new WaitForReplicationOperation(multiExecOperation, this.waitForReplication.get()) : multiExecOperation;
        }

        public RedisItemWriter<K, V, T> build() {
            return new RedisItemWriter<>(connectionSupplier(), this.poolConfig, super.async(), operation());
        }
    }

    /* loaded from: input_file:com/redis/spring/batch/RedisItemWriter$WaitForReplication.class */
    public static class WaitForReplication {
        public static final int DEFAULT_REPLICAS = 1;
        public static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(1);
        private final int replicas;
        private final Duration timeout;

        private WaitForReplication(int i, Duration duration) {
            this.replicas = i;
            this.timeout = duration;
        }

        public int getReplicas() {
            return this.replicas;
        }

        public Duration getTimeout() {
            return this.timeout;
        }

        public static WaitForReplication of(int i, Duration duration) {
            return new WaitForReplication(i, duration);
        }

        public static WaitForReplication of(int i) {
            return new WaitForReplication(i, DEFAULT_TIMEOUT);
        }

        public static WaitForReplication of(Duration duration) {
            return new WaitForReplication(1, duration);
        }
    }

    public RedisItemWriter(Supplier<StatefulConnection<K, V>> supplier, GenericObjectPoolConfig<StatefulConnection<K, V>> genericObjectPoolConfig, Function<StatefulConnection<K, V>, BaseRedisAsyncCommands<K, V>> function, PipelinedOperation<K, V, T> pipelinedOperation) {
        super(supplier, genericObjectPoolConfig);
        this.async = function;
        this.operation = pipelinedOperation;
    }

    public void write(List<? extends T> list) throws Exception {
        StatefulConnection<K, V> borrowConnection = borrowConnection();
        try {
            BaseRedisAsyncCommands<K, V> apply = this.async.apply(borrowConnection);
            apply.setAutoFlushCommands(false);
            Collection<RedisFuture<?>> execute = this.operation.execute(apply, list);
            apply.flushCommands();
            try {
                LettuceFutures.awaitAll(borrowConnection.getTimeout().toMillis(), TimeUnit.MILLISECONDS, (Future[]) execute.toArray(i -> {
                    return new Future[i];
                }));
                apply.setAutoFlushCommands(true);
                if (borrowConnection != null) {
                    borrowConnection.close();
                }
            } catch (Throwable th) {
                apply.setAutoFlushCommands(true);
                throw th;
            }
        } catch (Throwable th2) {
            if (borrowConnection != null) {
                try {
                    borrowConnection.close();
                } catch (Throwable th3) {
                    th2.addSuppressed(th3);
                }
            }
            throw th2;
        }
    }

    public static <K, V, T> Builder<K, V, T> operation(AbstractRedisClient abstractRedisClient, RedisCodec<K, V> redisCodec, Operation<K, V, T> operation) {
        return new Builder<>(abstractRedisClient, redisCodec, operation);
    }

    public static <T> Builder<String, String, T> operation(AbstractRedisClient abstractRedisClient, Operation<String, String, T> operation) {
        return new Builder<>(abstractRedisClient, (RedisCodec) StringCodec.UTF8, (Operation) operation);
    }

    public static <K, V> Builder<K, V, DataStructure<K>> dataStructure(AbstractRedisClient abstractRedisClient, RedisCodec<K, V> redisCodec) {
        return new Builder<>(abstractRedisClient, redisCodec, new DataStructureOperation(redisCodec));
    }

    public static <K, V> Builder<K, V, DataStructure<K>> dataStructure(AbstractRedisClient abstractRedisClient, RedisCodec<K, V> redisCodec, Converter<StreamMessage<K, V>, XAddArgs> converter) {
        DataStructureOperation dataStructureOperation = new DataStructureOperation(redisCodec);
        dataStructureOperation.getXadd().setArgs(converter);
        return new Builder<>(abstractRedisClient, redisCodec, dataStructureOperation);
    }

    public static Builder<String, String, DataStructure<String>> dataStructure(AbstractRedisClient abstractRedisClient) {
        return new Builder<>(abstractRedisClient, (RedisCodec) StringCodec.UTF8, (PipelinedOperation) new DataStructureOperation(StringCodec.UTF8));
    }

    public static Builder<String, String, DataStructure<String>> dataStructure(AbstractRedisClient abstractRedisClient, Converter<StreamMessage<String, String>, XAddArgs> converter) {
        DataStructureOperation dataStructureOperation = new DataStructureOperation(StringCodec.UTF8);
        dataStructureOperation.getXadd().setArgs(converter);
        return new Builder<>(abstractRedisClient, (RedisCodec) StringCodec.UTF8, (PipelinedOperation) dataStructureOperation);
    }

    public static <K, V> Builder<K, V, KeyValue<K, byte[]>> keyDump(AbstractRedisClient abstractRedisClient, RedisCodec<K, V> redisCodec) {
        return new Builder<>(abstractRedisClient, redisCodec, SimplePipelinedOperation.keyDump());
    }

    public static Builder<String, String, KeyValue<String, byte[]>> keyDump(AbstractRedisClient abstractRedisClient) {
        return new Builder<>(abstractRedisClient, (RedisCodec) StringCodec.UTF8, (PipelinedOperation) SimplePipelinedOperation.keyDump());
    }
}
