package com.redis.spring.batch.support.operation.executor;

import com.redis.spring.batch.support.DataStructure;
import com.redis.spring.batch.support.Utils;
import io.lettuce.core.LettuceFutures;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.RedisURI;
import io.lettuce.core.ScoredValue;
import io.lettuce.core.StreamMessage;
import io.lettuce.core.XAddArgs;
import io.lettuce.core.api.async.BaseRedisAsyncCommands;
import io.lettuce.core.api.async.RedisHashAsyncCommands;
import io.lettuce.core.api.async.RedisKeyAsyncCommands;
import io.lettuce.core.api.async.RedisListAsyncCommands;
import io.lettuce.core.api.async.RedisSetAsyncCommands;
import io.lettuce.core.api.async.RedisSortedSetAsyncCommands;
import io.lettuce.core.api.async.RedisStreamAsyncCommands;
import io.lettuce.core.api.async.RedisStringAsyncCommands;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.convert.converter.Converter;

/* loaded from: input_file:com/redis/spring/batch/support/operation/executor/DataStructureOperationExecutor.class */
public class DataStructureOperationExecutor<K, V> implements OperationExecutor<K, V, DataStructure<K>> {
    private static final Logger log = LoggerFactory.getLogger(DataStructureOperationExecutor.class);
    private static final int DEFAULT_BATCH_SIZE = 50;
    private Duration timeout = RedisURI.DEFAULT_TIMEOUT_DURATION;
    private Converter<StreamMessage<K, V>, XAddArgs> xaddArgs = streamMessage -> {
        return new XAddArgs().id(streamMessage.getId());
    };
    private int batchSize = 50;

    public void setTimeout(Duration duration) {
        Utils.assertPositive(duration, "Timeout duration");
        this.timeout = duration;
    }

    public void setXaddArgs(Converter<StreamMessage<K, V>, XAddArgs> converter) {
        this.xaddArgs = converter;
    }

    public void setBatchSize(int i) {
        Utils.assertPositive(Integer.valueOf(i), "Batch size");
        this.batchSize = i;
    }

    @Override // com.redis.spring.batch.support.operation.executor.OperationExecutor
    public List<Future<?>> execute(BaseRedisAsyncCommands<K, V> baseRedisAsyncCommands, List<? extends DataStructure<K>> list) {
        ArrayList arrayList = new ArrayList();
        Iterator<? extends DataStructure<K>> it = list.iterator();
        while (it.hasNext()) {
            execute(baseRedisAsyncCommands, arrayList, it.next());
        }
        return arrayList;
    }

    private void execute(BaseRedisAsyncCommands<K, V> baseRedisAsyncCommands, List<Future<?>> list, DataStructure<K> dataStructure) {
        if (dataStructure == null) {
            return;
        }
        if (dataStructure.getValue() == null) {
            list.add(((RedisKeyAsyncCommands) baseRedisAsyncCommands).del(new Object[]{dataStructure.getKey()}));
            return;
        }
        if (dataStructure.getType() == null) {
            return;
        }
        String lowerCase = dataStructure.getType().toLowerCase();
        if (lowerCase.equals(DataStructure.HASH)) {
            list.add(((RedisKeyAsyncCommands) baseRedisAsyncCommands).del(new Object[]{dataStructure.getKey()}));
            list.add(((RedisHashAsyncCommands) baseRedisAsyncCommands).hset(dataStructure.getKey(), (Map) dataStructure.getValue()));
        } else if (lowerCase.equals(DataStructure.STRING)) {
            list.add(((RedisStringAsyncCommands) baseRedisAsyncCommands).set(dataStructure.getKey(), dataStructure.getValue()));
        } else if (lowerCase.equals(DataStructure.LIST)) {
            flush(baseRedisAsyncCommands, ((RedisKeyAsyncCommands) baseRedisAsyncCommands).del(new Object[]{dataStructure.getKey()}), ((RedisListAsyncCommands) baseRedisAsyncCommands).rpush(dataStructure.getKey(), ((Collection) dataStructure.getValue()).toArray()));
        } else if (lowerCase.equals(DataStructure.SET)) {
            flush(baseRedisAsyncCommands, ((RedisKeyAsyncCommands) baseRedisAsyncCommands).del(new Object[]{dataStructure.getKey()}), ((RedisSetAsyncCommands) baseRedisAsyncCommands).sadd(dataStructure.getKey(), ((Collection) dataStructure.getValue()).toArray()));
        } else if (lowerCase.equals(DataStructure.ZSET)) {
            flush(baseRedisAsyncCommands, ((RedisKeyAsyncCommands) baseRedisAsyncCommands).del(new Object[]{dataStructure.getKey()}), ((RedisSortedSetAsyncCommands) baseRedisAsyncCommands).zadd(dataStructure.getKey(), (ScoredValue[]) ((Collection) dataStructure.getValue()).toArray(new ScoredValue[0])));
        } else if (lowerCase.equals(DataStructure.STREAM)) {
            RedisStreamAsyncCommands redisStreamAsyncCommands = (RedisStreamAsyncCommands) baseRedisAsyncCommands;
            flush(baseRedisAsyncCommands, ((RedisKeyAsyncCommands) baseRedisAsyncCommands).del(new Object[]{dataStructure.getKey()}));
            batches((List) dataStructure.getValue()).forEach(list2 -> {
                ArrayList arrayList = new ArrayList();
                Iterator it = list2.iterator();
                while (it.hasNext()) {
                    StreamMessage streamMessage = (StreamMessage) it.next();
                    arrayList.add(redisStreamAsyncCommands.xadd(dataStructure.getKey(), (XAddArgs) this.xaddArgs.convert(streamMessage), streamMessage.getBody()));
                }
                flush(baseRedisAsyncCommands, arrayList);
            });
        }
        if (dataStructure.hasTTL()) {
            list.add(((RedisKeyAsyncCommands) baseRedisAsyncCommands).pexpireat(dataStructure.getKey(), dataStructure.getAbsoluteTTL().longValue()));
        }
    }

    public <T> Stream<List<T>> batches(List<T> list) {
        int size = list.size();
        if (size <= 0) {
            return Stream.empty();
        }
        int i = (size - 1) / this.batchSize;
        return IntStream.range(0, i + 1).mapToObj(i2 -> {
            return list.subList(i2 * this.batchSize, i2 == i ? size : (i2 + 1) * this.batchSize);
        });
    }

    private void flush(BaseRedisAsyncCommands<K, V> baseRedisAsyncCommands, List<RedisFuture<?>> list) {
        flush(baseRedisAsyncCommands, (RedisFuture<?>[]) list.toArray(new RedisFuture[0]));
    }

    private void flush(BaseRedisAsyncCommands<K, V> baseRedisAsyncCommands, RedisFuture<?>... redisFutureArr) {
        baseRedisAsyncCommands.flushCommands();
        log.debug("Executing {} commands", Integer.valueOf(redisFutureArr.length));
        if (LettuceFutures.awaitAll(this.timeout.toMillis(), TimeUnit.MILLISECONDS, redisFutureArr)) {
            log.debug("Successfully executed {} commands", Integer.valueOf(redisFutureArr.length));
        } else {
            log.warn("Could not execute {} commands", Integer.valueOf(redisFutureArr.length));
        }
    }
}
