package com.redis.spring.batch;

import com.redis.spring.batch.util.ConnectionUtils;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.async.BaseRedisAsyncCommands;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.support.ConnectionPoolSupport;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamSupport;
import org.springframework.util.ClassUtils;

/* loaded from: input_file:com/redis/spring/batch/AbstractRedisItemStreamSupport.class */
public abstract class AbstractRedisItemStreamSupport<K, V, I> extends ItemStreamSupport {
    public static final int DEFAULT_POOL_SIZE = 8;
    protected final AbstractRedisClient client;
    protected final RedisCodec<K, V> codec;
    private int poolSize = 8;
    private GenericObjectPool<StatefulConnection<K, V>> pool;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractRedisItemStreamSupport(AbstractRedisClient abstractRedisClient, RedisCodec<K, V> redisCodec) {
        setName(ClassUtils.getShortName(getClass()));
        this.client = abstractRedisClient;
        this.codec = redisCodec;
    }

    public int getPoolSize() {
        return this.poolSize;
    }

    public void setPoolSize(int i) {
        this.poolSize = i;
    }

    public synchronized void open(ExecutionContext executionContext) {
        if (!isOpen()) {
            doOpen();
        }
        super.open(executionContext);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void doOpen() {
        GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
        genericObjectPoolConfig.setMaxTotal(this.poolSize);
        this.pool = ConnectionPoolSupport.createGenericObjectPool(connectionSupplier(), genericObjectPoolConfig);
    }

    protected Supplier<StatefulConnection<K, V>> connectionSupplier() {
        return ConnectionUtils.supplier(this.client, this.codec);
    }

    public boolean isOpen() {
        return this.pool != null;
    }

    public synchronized void close() {
        super.close();
        if (isOpen()) {
            doClose();
        }
    }

    protected void doClose() {
        this.pool.close();
        this.pool = null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<Object> execute(Collection<? extends I> collection) throws Exception {
        StatefulConnection statefulConnection = (StatefulConnection) this.pool.borrowObject();
        try {
            long millis = statefulConnection.getTimeout().toMillis();
            BaseRedisAsyncCommands<K, V> baseRedisAsyncCommands = (BaseRedisAsyncCommands) ConnectionUtils.async(statefulConnection);
            ArrayList arrayList = new ArrayList();
            try {
                statefulConnection.setAutoFlushCommands(false);
                execute((BaseRedisAsyncCommands) baseRedisAsyncCommands, (Collection) collection, (List<RedisFuture<?>>) arrayList);
                statefulConnection.flushCommands();
                ArrayList arrayList2 = new ArrayList(arrayList.size());
                Iterator<RedisFuture<?>> it = arrayList.iterator();
                while (it.hasNext()) {
                    arrayList2.add(it.next().get(millis, TimeUnit.MILLISECONDS));
                }
                if (statefulConnection != null) {
                    statefulConnection.close();
                }
                return arrayList2;
            } finally {
                statefulConnection.setAutoFlushCommands(true);
            }
        } catch (Throwable th) {
            if (statefulConnection != null) {
                try {
                    statefulConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void execute(BaseRedisAsyncCommands<K, V> baseRedisAsyncCommands, Collection<? extends I> collection, List<RedisFuture<?>> list) {
        Iterator<? extends I> it = collection.iterator();
        while (it.hasNext()) {
            execute(baseRedisAsyncCommands, (BaseRedisAsyncCommands<K, V>) it.next(), list);
        }
    }

    protected abstract void execute(BaseRedisAsyncCommands<K, V> baseRedisAsyncCommands, I i, List<RedisFuture<?>> list);
}
