package com.redis.spring.batch.common;

import com.redis.spring.batch.util.ConnectionUtils;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.ReadFrom;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.async.BaseRedisAsyncCommands;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.internal.Exceptions;
import io.lettuce.core.support.ConnectionPoolSupport;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Supplier;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemStreamSupport;
import org.springframework.util.ClassUtils;

/* loaded from: input_file:com/redis/spring/batch/common/AbstractOperationExecutor.class */
public abstract class AbstractOperationExecutor<K, V, I, O> extends ItemStreamSupport implements ItemProcessor<List<I>, List<O>> {
    public static final int DEFAULT_POOL_SIZE = 8;
    protected final AbstractRedisClient client;
    private final RedisCodec<K, V> codec;
    private ReadFrom readFrom;
    private GenericObjectPool<StatefulConnection<K, V>> pool;
    private BatchOperation<K, V, I, O> batchOperation;
    private String name;
    private final Log log = LogFactory.getLog(getClass());
    private int poolSize = 8;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractOperationExecutor(AbstractRedisClient abstractRedisClient, RedisCodec<K, V> redisCodec) {
        setName(ClassUtils.getShortName(getClass()));
        this.client = abstractRedisClient;
        this.codec = redisCodec;
    }

    public void setPoolSize(int i) {
        this.poolSize = i;
    }

    public void setReadFrom(ReadFrom readFrom) {
        this.readFrom = readFrom;
    }

    public void setName(String str) {
        super.setName(str);
        this.name = str;
    }

    public synchronized void open(ExecutionContext executionContext) {
        super.open(executionContext);
        if (isOpen()) {
            return;
        }
        this.log.debug(String.format("Opening %s", this.name));
        Supplier supplier = ConnectionUtils.supplier(this.client, this.codec, this.readFrom);
        GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
        genericObjectPoolConfig.setMaxTotal(this.poolSize);
        this.batchOperation = batchOperation();
        this.pool = ConnectionPoolSupport.createGenericObjectPool(supplier, genericObjectPoolConfig);
        this.log.debug(String.format("Opened %s", this.name));
    }

    public synchronized boolean isOpen() {
        return this.pool != null;
    }

    protected abstract BatchOperation<K, V, I, O> batchOperation();

    public synchronized void close() {
        if (isOpen()) {
            this.log.debug(String.format("Closing %s", this.name));
            this.pool.close();
            this.pool = null;
            this.log.debug(String.format("Closed %s", this.name));
        }
        super.close();
    }

    public List<O> process(List<I> list) throws Exception {
        StatefulConnection statefulConnection = (StatefulConnection) this.pool.borrowObject();
        try {
            BaseRedisAsyncCommands<K, V> baseRedisAsyncCommands = (BaseRedisAsyncCommands) ConnectionUtils.async(statefulConnection);
            try {
                statefulConnection.setAutoFlushCommands(false);
                List<RedisFuture<O>> execute = this.batchOperation.execute(baseRedisAsyncCommands, list);
                statefulConnection.flushCommands();
                List<O> all = getAll(statefulConnection.getTimeout(), execute);
                statefulConnection.setAutoFlushCommands(true);
                if (statefulConnection != null) {
                    statefulConnection.close();
                }
                return all;
            } catch (Throwable th) {
                statefulConnection.setAutoFlushCommands(true);
                throw th;
            }
        } catch (Throwable th2) {
            if (statefulConnection != null) {
                try {
                    statefulConnection.close();
                } catch (Throwable th3) {
                    th2.addSuppressed(th3);
                }
            }
            throw th2;
        }
    }

    public static <T> List<T> getAll(Duration duration, List<RedisFuture<T>> list) {
        ArrayList arrayList = new ArrayList(list.size());
        try {
            long nanos = duration.toNanos();
            long nanoTime = System.nanoTime();
            for (RedisFuture<T> redisFuture : list) {
                if (duration.isNegative()) {
                    arrayList.add(redisFuture.get());
                } else {
                    if (nanos < 0) {
                        throw new TimeoutException(String.format("Timed out after %s", duration));
                    }
                    arrayList.add(redisFuture.get(nanos, TimeUnit.NANOSECONDS));
                    long nanoTime2 = System.nanoTime();
                    nanos -= nanoTime2 - nanoTime;
                    nanoTime = nanoTime2;
                }
            }
            return arrayList;
        } catch (Exception e) {
            throw Exceptions.fromSynchronization(e);
        }
    }
}
