package com.redis.spring.batch.common;

import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.ReadFrom;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.async.BaseRedisAsyncCommands;
import io.lettuce.core.codec.RedisCodec;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamSupport;
import org.springframework.util.ClassUtils;

/* loaded from: input_file:com/redis/spring/batch/common/AbstractOperationItemStreamSupport.class */
public abstract class AbstractOperationItemStreamSupport<K, V, I, O> extends ItemStreamSupport {
    private final AbstractRedisClient client;
    private final RedisCodec<K, V> codec;
    private GenericObjectPool<StatefulConnection<K, V>> pool;
    private Operation<K, V, I, O> operation;
    private final Object synchronizationLock = new Object();
    private PoolOptions poolOptions = PoolOptions.builder().build();
    private Optional<ReadFrom> readFrom = Optional.empty();

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractOperationItemStreamSupport(AbstractRedisClient abstractRedisClient, RedisCodec<K, V> redisCodec) {
        setName(ClassUtils.getShortName(getClass()));
        this.client = abstractRedisClient;
        this.codec = redisCodec;
    }

    public PoolOptions getPoolOptions() {
        return this.poolOptions;
    }

    public void setPoolOptions(PoolOptions poolOptions) {
        this.poolOptions = poolOptions;
    }

    public Optional<ReadFrom> getReadFrom() {
        return this.readFrom;
    }

    public void setReadFrom(Optional<ReadFrom> optional) {
        this.readFrom = optional;
    }

    public void open(ExecutionContext executionContext) {
        synchronized (this.synchronizationLock) {
            if (!isOpen()) {
                ConnectionPoolFactory client = ConnectionPoolFactory.client(this.client);
                client.withOptions(this.poolOptions);
                client.withReadFrom(this.readFrom);
                this.pool = client.build(this.codec);
                this.operation = operation();
            }
        }
        super.open(executionContext);
    }

    protected abstract Operation<K, V, I, O> operation();

    public boolean isOpen() {
        return this.pool != null;
    }

    public void close() {
        super.close();
        synchronized (this.synchronizationLock) {
            if (isOpen()) {
                this.operation = null;
                this.pool.close();
                this.pool = null;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<O> execute(List<? extends I> list) throws Exception {
        StatefulConnection statefulConnection = (StatefulConnection) this.pool.borrowObject();
        try {
            BaseRedisAsyncCommands<K, V> baseRedisAsyncCommands = (BaseRedisAsyncCommands) Utils.async(statefulConnection);
            ArrayList arrayList = new ArrayList();
            try {
                statefulConnection.setAutoFlushCommands(false);
                execute(baseRedisAsyncCommands, list, arrayList);
                statefulConnection.flushCommands();
                ArrayList arrayList2 = new ArrayList(arrayList.size());
                Iterator<RedisFuture<O>> it = arrayList.iterator();
                while (it.hasNext()) {
                    arrayList2.add(it.next().get(statefulConnection.getTimeout().toMillis(), TimeUnit.MILLISECONDS));
                }
                if (statefulConnection != null) {
                    statefulConnection.close();
                }
                return arrayList2;
            } finally {
                statefulConnection.setAutoFlushCommands(true);
            }
        } catch (Throwable th) {
            if (statefulConnection != null) {
                try {
                    statefulConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void execute(BaseRedisAsyncCommands<K, V> baseRedisAsyncCommands, List<? extends I> list, List<RedisFuture<O>> list2) {
        Iterator<? extends I> it = list.iterator();
        while (it.hasNext()) {
            this.operation.execute(baseRedisAsyncCommands, it.next(), list2);
        }
    }
}
