package com.redis.spring.batch.writer;

import com.redis.spring.batch.common.BatchOperation;
import com.redis.spring.batch.common.DelegatingItemStreamSupport;
import io.lettuce.core.RedisCommandExecutionException;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.api.async.BaseRedisAsyncCommands;
import io.lettuce.core.cluster.PipelinedRedisFuture;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
import org.springframework.util.Assert;

/* loaded from: input_file:com/redis/spring/batch/writer/ReplicaWaitWriteOperation.class */
public class ReplicaWaitWriteOperation<K, V, T, U> extends DelegatingItemStreamSupport implements BatchOperation<K, V, T, U> {
    public static final int DEFAULT_REPLICAS = 0;
    public static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(1);
    private final BatchOperation<K, V, T, ?> delegate;
    private int replicas;
    private Duration timeout;

    public ReplicaWaitWriteOperation(BatchOperation<K, V, T, ?> batchOperation) {
        super(batchOperation);
        this.replicas = 0;
        this.timeout = DEFAULT_TIMEOUT;
        this.delegate = batchOperation;
    }

    public void setReplicas(int i) {
        this.replicas = i;
    }

    public void setTimeout(Duration duration) {
        Assert.notNull(duration, "Replication timeout should not be null");
        this.timeout = duration;
    }

    @Override // com.redis.spring.batch.common.BatchOperation
    public List<Future<U>> execute(BaseRedisAsyncCommands<K, V> baseRedisAsyncCommands, List<? extends T> list) {
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(this.delegate.execute(baseRedisAsyncCommands, list));
        arrayList.add(replicationFuture(baseRedisAsyncCommands));
        return arrayList;
    }

    private RedisFuture<?> replicationFuture(BaseRedisAsyncCommands<K, V> baseRedisAsyncCommands) {
        return new PipelinedRedisFuture(baseRedisAsyncCommands.waitForReplication(this.replicas, this.timeout.toMillis()).thenAccept(l -> {
            if (l.longValue() < this.replicas) {
                throw new RedisCommandExecutionException(String.format("Insufficient replication level - expected: %s, actual: %s", Integer.valueOf(this.replicas), l));
            }
        }));
    }
}
