package com.redis.spring.batch.common;

import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.async.BaseRedisAsyncCommands;
import io.lettuce.core.codec.RedisCodec;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemProcessor;

/* loaded from: input_file:com/redis/spring/batch/common/OperationItemStreamSupport.class */
public class OperationItemStreamSupport<K, V, I, O> extends DelegatingItemStreamSupport implements ItemProcessor<List<? extends I>, List<O>> {
    private final AbstractRedisClient client;
    private final RedisCodec<K, V> codec;
    private PoolOptions poolOptions;
    private final BatchOperation<K, V, I, O> operation;
    private GenericObjectPool<StatefulConnection<K, V>> pool;

    public OperationItemStreamSupport(AbstractRedisClient abstractRedisClient, RedisCodec<K, V> redisCodec, BatchOperation<K, V, I, O> batchOperation) {
        super(batchOperation);
        this.poolOptions = PoolOptions.builder().build();
        this.client = abstractRedisClient;
        this.codec = redisCodec;
        this.operation = batchOperation;
    }

    public OperationItemStreamSupport<K, V, I, O> withPoolOptions(PoolOptions poolOptions) {
        this.poolOptions = poolOptions;
        return this;
    }

    @Override // com.redis.spring.batch.common.DelegatingItemStreamSupport
    public synchronized void open(ExecutionContext executionContext) {
        super.open(executionContext);
        if (this.pool == null) {
            this.pool = ConnectionPoolBuilder.client(this.client).options(this.poolOptions).codec(this.codec);
        }
    }

    @Override // com.redis.spring.batch.common.DelegatingItemStreamSupport
    public synchronized void close() {
        if (this.pool != null) {
            this.pool.close();
            this.pool = null;
        }
        super.close();
    }

    public synchronized List<O> process(List<? extends I> list) throws Exception {
        StatefulConnection statefulConnection = (StatefulConnection) this.pool.borrowObject();
        try {
            long millis = statefulConnection.getTimeout().toMillis();
            statefulConnection.setAutoFlushCommands(false);
            try {
                List<RedisFuture<O>> execute = this.operation.execute((BaseRedisAsyncCommands) Utils.async(statefulConnection), list);
                statefulConnection.flushCommands();
                ArrayList arrayList = new ArrayList(execute.size());
                Iterator<RedisFuture<O>> it = execute.iterator();
                while (it.hasNext()) {
                    arrayList.add(it.next().get(millis, TimeUnit.MILLISECONDS));
                }
                if (statefulConnection != null) {
                    statefulConnection.close();
                }
                return arrayList;
            } finally {
                statefulConnection.setAutoFlushCommands(true);
            }
        } catch (Throwable th) {
            if (statefulConnection != null) {
                try {
                    statefulConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
