package com.redis.spring.batch.item.redis.writer.operation;

import com.redis.spring.batch.item.redis.writer.AbstractValueWriteOperation;
import io.lettuce.core.RedisFuture;
import io.lettuce.core.StreamMessage;
import io.lettuce.core.XAddArgs;
import io.lettuce.core.api.async.RedisAsyncCommands;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:com/redis/spring/batch/item/redis/writer/operation/Xadd.class */
public class Xadd<K, V, T> extends AbstractValueWriteOperation<K, V, Collection<StreamMessage<K, V>>, T> {
    private Function<StreamMessage<K, V>, XAddArgs> argsFunction;
    private boolean ignoreEmptyStreams;

    public Xadd(Function<T, K> function, Function<T, Collection<StreamMessage<K, V>>> function2) {
        super(function, function2);
        this.argsFunction = this::defaultArgs;
    }

    private XAddArgs defaultArgs(StreamMessage<K, V> streamMessage) {
        if (streamMessage == null || streamMessage.getId() == null) {
            return null;
        }
        return new XAddArgs().id(streamMessage.getId());
    }

    public boolean isIgnoreEmptyStreams() {
        return this.ignoreEmptyStreams;
    }

    public void setIgnoreEmptyStreams(boolean z) {
        this.ignoreEmptyStreams = z;
    }

    public void setArgs(XAddArgs xAddArgs) {
        setArgsFunction(streamMessage -> {
            return xAddArgs;
        });
    }

    public void setArgsFunction(Function<StreamMessage<K, V>, XAddArgs> function) {
        this.argsFunction = function;
    }

    @Override // com.redis.spring.batch.item.redis.common.Operation
    public List<RedisFuture<Object>> execute(RedisAsyncCommands<K, V> redisAsyncCommands, Iterable<? extends T> iterable) {
        ArrayList arrayList = new ArrayList();
        Iterator<? extends T> it = iterable.iterator();
        while (it.hasNext()) {
            arrayList.addAll(execute(redisAsyncCommands, (RedisAsyncCommands<K, V>) it.next()));
        }
        return arrayList;
    }

    private List<RedisFuture<Object>> execute(RedisAsyncCommands<K, V> redisAsyncCommands, T t) {
        K key = key(t);
        Collection<StreamMessage<K, V>> value = value(t);
        if (!CollectionUtils.isEmpty(value)) {
            return (List) value.stream().filter(this::hasBody).map(streamMessage -> {
                return execute(redisAsyncCommands, key, streamMessage);
            }).collect(Collectors.toList());
        }
        if (this.ignoreEmptyStreams) {
            return Collections.emptyList();
        }
        HashMap hashMap = new HashMap();
        hashMap.put(key, key);
        return Arrays.asList(redisAsyncCommands.xadd(key, hashMap), redisAsyncCommands.xtrim(key, 0L));
    }

    private boolean hasBody(StreamMessage<K, V> streamMessage) {
        return !CollectionUtils.isEmpty(streamMessage.getBody());
    }

    private RedisFuture<Object> execute(RedisAsyncCommands<K, V> redisAsyncCommands, K k, StreamMessage<K, V> streamMessage) {
        return redisAsyncCommands.xadd(k, this.argsFunction.apply(streamMessage), streamMessage.getBody());
    }
}
