package com.redis.spring.batch.operation;

import com.redis.lettucemod.api.StatefulRedisModulesConnection;
import com.redis.spring.batch.util.BatchUtils;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.ReadFrom;
import io.lettuce.core.RedisCommandInterruptedException;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.api.async.BaseRedisAsyncCommands;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.internal.Exceptions;
import io.lettuce.core.support.ConnectionPoolSupport;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.util.Assert;

/* loaded from: input_file:com/redis/spring/batch/operation/OperationExecutor.class */
public class OperationExecutor<K, V, I, O> implements InitializingBean, AutoCloseable {
    public static final int DEFAULT_POOL_SIZE = 8;
    private final Operation<K, V, I, O> operation;
    private final RedisCodec<K, V> codec;
    private AbstractRedisClient client;
    private ReadFrom readFrom;
    private int poolSize = 8;
    private GenericObjectPool<StatefulRedisModulesConnection<K, V>> pool;

    public OperationExecutor(RedisCodec<K, V> redisCodec, Operation<K, V, I, O> operation) {
        this.codec = redisCodec;
        this.operation = operation;
    }

    public void setClient(AbstractRedisClient abstractRedisClient) {
        this.client = abstractRedisClient;
    }

    public void setReadFrom(ReadFrom readFrom) {
        this.readFrom = readFrom;
    }

    public void setPoolSize(int i) {
        this.poolSize = i;
    }

    public void afterPropertiesSet() throws Exception {
        Assert.notNull(this.client, "Redis client not set");
        GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
        genericObjectPoolConfig.setMaxTotal(this.poolSize);
        this.pool = ConnectionPoolSupport.createGenericObjectPool(BatchUtils.supplier(this.client, this.codec, this.readFrom), genericObjectPoolConfig);
        if (this.operation instanceof InitializingOperation) {
            StatefulRedisModulesConnection<K, V> statefulRedisModulesConnection = (StatefulRedisModulesConnection) this.pool.borrowObject();
            try {
                ((InitializingOperation) this.operation).afterPropertiesSet(statefulRedisModulesConnection);
                if (statefulRedisModulesConnection != null) {
                    statefulRedisModulesConnection.close();
                }
            } catch (Throwable th) {
                if (statefulRedisModulesConnection != null) {
                    try {
                        statefulRedisModulesConnection.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
                throw th;
            }
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.pool.close();
    }

    public List<O> apply(Iterable<? extends I> iterable) {
        try {
            StatefulRedisModulesConnection statefulRedisModulesConnection = (StatefulRedisModulesConnection) this.pool.borrowObject();
            try {
                statefulRedisModulesConnection.setAutoFlushCommands(false);
                BaseRedisAsyncCommands<K, V> async = statefulRedisModulesConnection.async();
                ArrayList arrayList = new ArrayList();
                this.operation.execute(async, iterable, arrayList);
                statefulRedisModulesConnection.flushCommands();
                List<O> all = getAll(statefulRedisModulesConnection.getTimeout(), arrayList);
                statefulRedisModulesConnection.setAutoFlushCommands(true);
                if (statefulRedisModulesConnection != null) {
                    statefulRedisModulesConnection.close();
                }
                return all;
            } finally {
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RedisCommandInterruptedException(e);
        } catch (Exception e2) {
            throw Exceptions.fromSynchronization(e2);
        }
    }

    public static <T> List<T> getAll(Duration duration, Iterable<RedisFuture<T>> iterable) throws TimeoutException, InterruptedException, ExecutionException {
        ArrayList arrayList = new ArrayList();
        long nanos = duration.toNanos();
        long nanoTime = System.nanoTime();
        for (RedisFuture<T> redisFuture : iterable) {
            if (duration.isNegative()) {
                arrayList.add(redisFuture.get());
            } else {
                if (nanos < 0) {
                    throw new TimeoutException(String.format("Timed out after %s", duration));
                }
                arrayList.add(redisFuture.get(nanos, TimeUnit.NANOSECONDS));
                long nanoTime2 = System.nanoTime();
                nanos -= nanoTime2 - nanoTime;
                nanoTime = nanoTime2;
            }
        }
        return arrayList;
    }
}
