package com.redis.spring.batch.writer.operation;

import com.redis.spring.batch.writer.BatchWriteOperation;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.StreamMessage;
import io.lettuce.core.XAddArgs;
import io.lettuce.core.api.async.BaseRedisAsyncCommands;
import io.lettuce.core.api.async.RedisStreamAsyncCommands;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.function.Function;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:com/redis/spring/batch/writer/operation/XAddAll.class */
public class XAddAll<K, V, T> implements BatchWriteOperation<K, V, T> {
    private Function<T, Collection<StreamMessage<K, V>>> messagesFunction;
    private Function<StreamMessage<K, V>, XAddArgs> argsFunction = streamMessage -> {
        return new XAddArgs().id(streamMessage.getId());
    };

    public void setMessagesFunction(Function<T, Collection<StreamMessage<K, V>>> function) {
        this.messagesFunction = function;
    }

    public void setArgs(XAddArgs xAddArgs) {
        this.argsFunction = streamMessage -> {
            return xAddArgs;
        };
    }

    public void setArgsFunction(Function<StreamMessage<K, V>, XAddArgs> function) {
        this.argsFunction = function;
    }

    @Override // com.redis.spring.batch.common.BatchOperation
    public List<RedisFuture<Object>> execute(BaseRedisAsyncCommands<K, V> baseRedisAsyncCommands, List<? extends T> list) {
        if (CollectionUtils.isEmpty(list)) {
            return Collections.emptyList();
        }
        ArrayList arrayList = new ArrayList();
        RedisStreamAsyncCommands redisStreamAsyncCommands = (RedisStreamAsyncCommands) baseRedisAsyncCommands;
        Iterator<? extends T> it = list.iterator();
        while (it.hasNext()) {
            Collection<StreamMessage<K, V>> apply = this.messagesFunction.apply(it.next());
            if (!CollectionUtils.isEmpty(apply)) {
                for (StreamMessage<K, V> streamMessage : apply) {
                    arrayList.add(redisStreamAsyncCommands.xadd(streamMessage.getStream(), this.argsFunction.apply(streamMessage), streamMessage.getBody()));
                }
            }
        }
        return arrayList;
    }
}
