package com.redis.spring.batch.common;

import com.redis.spring.batch.util.ConnectionUtils;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.ReadFrom;
import io.lettuce.core.RedisCommandInterruptedException;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.async.BaseRedisAsyncCommands;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.internal.Exceptions;
import io.lettuce.core.support.ConnectionPoolSupport;
import java.time.Duration;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.batch.item.Chunk;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.policy.MaxAttemptsRetryPolicy;

/* loaded from: input_file:com/redis/spring/batch/common/AbstractOperationExecutor.class */
public abstract class AbstractOperationExecutor<K, V, I, O> implements AutoCloseable {
    public static final int DEFAULT_POOL_SIZE = 8;
    public static final int DEFAULT_MAX_ATTEMPTS = 3;
    public static final RetryPolicy DEFAULT_RETRY_POLICY = new MaxAttemptsRetryPolicy();
    private final AbstractRedisClient client;
    private final RedisCodec<K, V> codec;
    private ReadFrom readFrom;
    private int poolSize = 8;
    private GenericObjectPool<StatefulConnection<K, V>> pool;
    private Operation<K, V, I, O> operation;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractOperationExecutor(AbstractRedisClient abstractRedisClient, RedisCodec<K, V> redisCodec) {
        this.client = abstractRedisClient;
        this.codec = redisCodec;
    }

    public void setPoolSize(int i) {
        this.poolSize = i;
    }

    public void setReadFrom(ReadFrom readFrom) {
        this.readFrom = readFrom;
    }

    public synchronized void open() {
        if (this.pool == null) {
            Supplier supplier = ConnectionUtils.supplier(this.client, this.codec, this.readFrom);
            GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
            genericObjectPoolConfig.setMaxTotal(this.poolSize);
            this.operation = operation();
            this.pool = ConnectionPoolSupport.createGenericObjectPool(supplier, genericObjectPoolConfig);
        }
    }

    protected abstract Operation<K, V, I, O> operation();

    @Override // java.lang.AutoCloseable
    public synchronized void close() {
        if (this.pool != null) {
            this.pool.close();
            this.pool = null;
        }
    }

    public O execute(I i) {
        return (O) execute((Chunk) new Chunk<>(new Object[]{i})).getItems().get(0);
    }

    public Chunk<O> execute(Chunk<? extends I> chunk) {
        try {
            StatefulConnection statefulConnection = (StatefulConnection) this.pool.borrowObject();
            try {
                statefulConnection.setAutoFlushCommands(false);
                BaseRedisAsyncCommands<K, V> baseRedisAsyncCommands = (BaseRedisAsyncCommands) ConnectionUtils.async(statefulConnection);
                Chunk<RedisFuture<O>> chunk2 = new Chunk<>(new RedisFuture[0]);
                this.operation.execute(baseRedisAsyncCommands, chunk, chunk2);
                statefulConnection.flushCommands();
                Chunk<O> all = getAll(statefulConnection.getTimeout(), chunk2);
                statefulConnection.setAutoFlushCommands(true);
                if (statefulConnection != null) {
                    statefulConnection.close();
                }
                return all;
            } catch (Throwable th) {
                if (statefulConnection != null) {
                    try {
                        statefulConnection.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RedisCommandInterruptedException(e);
        } catch (Exception e2) {
            throw Exceptions.fromSynchronization(e2);
        }
    }

    public static <T> Chunk<T> getAll(Duration duration, Chunk<RedisFuture<T>> chunk) throws TimeoutException, InterruptedException, ExecutionException {
        Chunk<T> chunk2 = new Chunk<>(new Object[0]);
        long nanos = duration.toNanos();
        long nanoTime = System.nanoTime();
        Chunk.ChunkIterator it = chunk.iterator();
        while (it.hasNext()) {
            RedisFuture redisFuture = (RedisFuture) it.next();
            if (duration.isNegative()) {
                chunk2.add(redisFuture.get());
            } else {
                if (nanos < 0) {
                    throw new TimeoutException(String.format("Timed out after %s", duration));
                }
                Object obj = redisFuture.get(nanos, TimeUnit.NANOSECONDS);
                if (obj != null) {
                    chunk2.add(obj);
                }
                long nanoTime2 = System.nanoTime();
                nanos -= nanoTime2 - nanoTime;
                nanoTime = nanoTime2;
            }
        }
        return chunk2;
    }
}
