package com.redis.spring.batch.reader;

import com.redis.spring.batch.common.DataType;
import com.redis.spring.batch.common.KeyComparison;
import com.redis.spring.batch.common.KeyValue;
import io.lettuce.core.StreamMessage;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:com/redis/spring/batch/reader/KeyComparisonValueReader.class */
public class KeyComparisonValueReader implements ItemProcessor<List<String>, List<KeyComparison>>, ItemStream {
    public static final Duration DEFAULT_TTL_TOLERANCE = Duration.ofMillis(100);
    private final ItemProcessor<List<String>, List<KeyValue<String>>> source;
    private ItemProcessor<String, String> keyProcessor;
    private final ItemProcessor<List<String>, List<KeyValue<String>>> target;
    private ItemProcessor<KeyValue<String>, KeyValue<String>> processor;
    private Duration ttlTolerance = DEFAULT_TTL_TOLERANCE;
    private boolean compareStreamMessageIds;

    public KeyComparisonValueReader(ItemProcessor<List<String>, List<KeyValue<String>>> itemProcessor, ItemProcessor<List<String>, List<KeyValue<String>>> itemProcessor2) {
        this.source = itemProcessor;
        this.target = itemProcessor2;
    }

    public void setKeyProcessor(ItemProcessor<String, String> itemProcessor) {
        this.keyProcessor = itemProcessor;
    }

    public void setProcessor(ItemProcessor<KeyValue<String>, KeyValue<String>> itemProcessor) {
        this.processor = itemProcessor;
    }

    public void setTtlTolerance(Duration duration) {
        this.ttlTolerance = duration;
    }

    public void setCompareStreamMessageIds(boolean z) {
        this.compareStreamMessageIds = z;
    }

    public void open(ExecutionContext executionContext) throws ItemStreamException {
        if (this.source instanceof ItemStream) {
            this.source.open(executionContext);
        }
        if (this.processor instanceof ItemStream) {
            this.processor.open(executionContext);
        }
        if (this.target instanceof ItemStream) {
            this.target.open(executionContext);
        }
    }

    public void update(ExecutionContext executionContext) throws ItemStreamException {
        if (this.source instanceof ItemStream) {
            this.source.update(executionContext);
        }
        if (this.processor instanceof ItemStream) {
            this.processor.update(executionContext);
        }
        if (this.target instanceof ItemStream) {
            this.target.update(executionContext);
        }
    }

    public void close() throws ItemStreamException {
        if (this.source instanceof ItemStream) {
            this.source.close();
        }
        if (this.processor instanceof ItemStream) {
            this.processor.close();
        }
        if (this.target instanceof ItemStream) {
            this.target.close();
        }
    }

    public List<KeyComparison> process(List<String> list) throws Exception {
        List<KeyValue<String>> list2 = (List) this.source.process(processKeys(list));
        if (CollectionUtils.isEmpty(list2)) {
            return Collections.emptyList();
        }
        List<KeyValue<String>> processValues = processValues(list2);
        List list3 = (List) this.target.process((List) processValues.stream().map((v0) -> {
            return v0.getKey();
        }).collect(Collectors.toList()));
        ArrayList arrayList = new ArrayList();
        if (CollectionUtils.isEmpty(processValues)) {
            throw new IllegalStateException("No source items found");
        }
        if (CollectionUtils.isEmpty(list3)) {
            throw new IllegalStateException("No target items found");
        }
        for (int i = 0; i < processValues.size(); i++) {
            KeyComparison keyComparison = new KeyComparison();
            keyComparison.setSource(processValues.get(i));
            if (i < list3.size()) {
                keyComparison.setTarget((KeyValue) list3.get(i));
            }
            keyComparison.setStatus(status(keyComparison));
            arrayList.add(keyComparison);
        }
        return arrayList;
    }

    private List<KeyValue<String>> processValues(List<KeyValue<String>> list) throws Exception {
        if (this.processor == null) {
            return list;
        }
        ArrayList arrayList = new ArrayList(list.size());
        Iterator<KeyValue<String>> it = list.iterator();
        while (it.hasNext()) {
            KeyValue keyValue = (KeyValue) this.processor.process(it.next());
            if (keyValue != null) {
                arrayList.add(keyValue);
            }
        }
        return arrayList;
    }

    private List<String> processKeys(List<String> list) throws Exception {
        if (this.keyProcessor == null) {
            return list;
        }
        ArrayList arrayList = new ArrayList(list.size());
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            String str = (String) this.keyProcessor.process(it.next());
            if (str != null) {
                arrayList.add(str);
            }
        }
        return arrayList;
    }

    private KeyComparison.Status status(KeyComparison keyComparison) {
        KeyValue<String> source = keyComparison.getSource();
        KeyValue<String> target = keyComparison.getTarget();
        return target == null ? source == null ? KeyComparison.Status.OK : KeyComparison.Status.MISSING : (target.exists() || !source.exists()) ? target.getType() != source.getType() ? KeyComparison.Status.TYPE : !valueEquals(source, target) ? KeyComparison.Status.VALUE : (source.getTtl() == target.getTtl() || Math.abs(source.getTtl() - target.getTtl()) <= this.ttlTolerance.toMillis()) ? KeyComparison.Status.OK : KeyComparison.Status.TTL : KeyComparison.Status.MISSING;
    }

    private boolean valueEquals(KeyValue<String> keyValue, KeyValue<String> keyValue2) {
        return keyValue.getType() == DataType.STREAM ? streamEquals((Collection) keyValue.getValue(), (Collection) keyValue2.getValue()) : Objects.deepEquals(keyValue.getValue(), keyValue2.getValue());
    }

    private boolean streamEquals(Collection<StreamMessage> collection, Collection<StreamMessage> collection2) {
        if (CollectionUtils.isEmpty(collection)) {
            return CollectionUtils.isEmpty(collection2);
        }
        if (collection.size() != collection2.size()) {
            return false;
        }
        Iterator<StreamMessage> it = collection.iterator();
        Iterator<StreamMessage> it2 = collection2.iterator();
        while (it.hasNext()) {
            if (!it2.hasNext() || !streamMessageEquals(it.next(), it2.next())) {
                return false;
            }
        }
        return true;
    }

    private boolean streamMessageEquals(StreamMessage streamMessage, StreamMessage streamMessage2) {
        if (!Objects.equals(streamMessage.getStream(), streamMessage2.getStream())) {
            return false;
        }
        if (this.compareStreamMessageIds && !Objects.equals(streamMessage.getId(), streamMessage2.getId())) {
            return false;
        }
        Map body = streamMessage.getBody();
        Map body2 = streamMessage2.getBody();
        return CollectionUtils.isEmpty(body) ? CollectionUtils.isEmpty(body2) : body.equals(body2);
    }
}
