package com.redis.spring.batch.writer;

import io.lettuce.core.RedisFuture;
import io.lettuce.core.StreamMessage;
import io.lettuce.core.XAddArgs;
import io.lettuce.core.api.async.BaseRedisAsyncCommands;
import io.lettuce.core.api.async.RedisStreamAsyncCommands;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.function.Function;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:com/redis/spring/batch/writer/XaddAll.class */
public class XaddAll<K, V, T> implements WriteOperation<K, V, T> {
    private final Function<T, Collection<StreamMessage<K, V>>> messagesFunction;
    private Function<StreamMessage<K, V>, XAddArgs> argsFunction = streamMessage -> {
        return new XAddArgs().id(streamMessage.getId());
    };

    public XaddAll(Function<T, Collection<StreamMessage<K, V>>> function) {
        this.messagesFunction = function;
    }

    public XaddAll<K, V, T> args(XAddArgs xAddArgs) {
        return args(streamMessage -> {
            return xAddArgs;
        });
    }

    public XaddAll<K, V, T> args(Function<StreamMessage<K, V>, XAddArgs> function) {
        this.argsFunction = function;
        return this;
    }

    @Override // com.redis.spring.batch.common.Operation
    public void execute(BaseRedisAsyncCommands<K, V> baseRedisAsyncCommands, Iterable<? extends T> iterable, List<RedisFuture<Object>> list) {
        RedisStreamAsyncCommands redisStreamAsyncCommands = (RedisStreamAsyncCommands) baseRedisAsyncCommands;
        Iterator<? extends T> it = iterable.iterator();
        while (it.hasNext()) {
            Collection<StreamMessage<K, V>> apply = this.messagesFunction.apply(it.next());
            if (!CollectionUtils.isEmpty(apply)) {
                for (StreamMessage<K, V> streamMessage : apply) {
                    list.add(redisStreamAsyncCommands.xadd(streamMessage.getStream(), this.argsFunction.apply(streamMessage), streamMessage.getBody()));
                }
            }
        }
    }

    public static <K, V, T> XaddAll<K, V, T> of(Function<T, Collection<StreamMessage<K, V>>> function, Function<StreamMessage<K, V>, XAddArgs> function2) {
        return new XaddAll(function).args(function2);
    }
}
