package com.streamr.client.utils;

import com.streamr.client.exceptions.GapFillFailedException;
import com.streamr.client.protocol.message_layer.MessageRef;
import com.streamr.client.protocol.message_layer.StreamMessage;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.PriorityQueue;
import java.util.Timer;
import java.util.TimerTask;
import java.util.function.Consumer;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/streamr/client/utils/OrderedMsgChain.class */
public class OrderedMsgChain {
    private static final Logger log = LoggerFactory.getLogger(OrderedMsgChain.class);
    private static final int MAX_GAP_REQUESTS = 10;
    static final int MAX_QUEUE_SIZE = 10000;
    private final Address publisherId;
    private final String msgChainId;
    private final Consumer<StreamMessage> inOrderHandler;
    private final GapHandlerFunction gapHandler;
    private final Function<GapFillFailedException, Void> gapFillFailedHandler;
    private final long propagationTimeout;
    private final long resendTimeout;
    private final PriorityQueue<StreamMessage> queue;
    private MessageRef lastReceived;
    private Timer gap;
    private int gapRequestCount;
    private final boolean skipGapsOnFullQueue;
    private final GapFillFailedException gapException;

    @FunctionalInterface
    /* loaded from: input_file:com/streamr/client/utils/OrderedMsgChain$GapHandlerFunction.class */
    public interface GapHandlerFunction {
        void apply(MessageRef messageRef, MessageRef messageRef2, Address address, String str);
    }

    public OrderedMsgChain(Address address, String str, Consumer<StreamMessage> consumer, GapHandlerFunction gapHandlerFunction, Function<GapFillFailedException, Void> function, long j, long j2, boolean z) {
        this.lastReceived = null;
        this.gap = null;
        this.gapRequestCount = 0;
        this.gapException = null;
        this.publisherId = address;
        this.msgChainId = str;
        this.inOrderHandler = consumer;
        this.gapHandler = gapHandlerFunction;
        this.gapFillFailedHandler = function;
        this.propagationTimeout = j;
        this.resendTimeout = j2;
        this.skipGapsOnFullQueue = z;
        this.queue = new PriorityQueue<>(new Comparator<StreamMessage>() { // from class: com.streamr.client.utils.OrderedMsgChain.1
            @Override // java.util.Comparator
            public int compare(StreamMessage streamMessage, StreamMessage streamMessage2) {
                return streamMessage.getMessageRef().compareTo(streamMessage2.getMessageRef());
            }
        });
    }

    public OrderedMsgChain(Address address, String str, Consumer<StreamMessage> consumer, GapHandlerFunction gapHandlerFunction, long j, long j2, boolean z) {
        this(address, str, consumer, gapHandlerFunction, gapFillFailedException -> {
            throw gapFillFailedException;
        }, j, j2, z);
    }

    public synchronized void add(StreamMessage streamMessage) {
        MessageRef messageRef = streamMessage.getMessageRef();
        if (this.lastReceived != null && messageRef.compareTo(this.lastReceived) <= 0) {
            log.debug("Already received message: " + messageRef + ", lastReceivedMsgRef: " + this.lastReceived + ". Ignoring message.");
            return;
        }
        if (isNextMessage(streamMessage)) {
            process(streamMessage);
            checkQueue();
            return;
        }
        if (this.gap == null) {
            scheduleGap();
        }
        if (this.queue.size() < MAX_QUEUE_SIZE) {
            this.queue.offer(streamMessage);
            return;
        }
        ArrayList arrayList = new ArrayList(this.queue);
        String format = String.format("Queue for %s::%d was (%s, ..., %s) and new message is %s", streamMessage.getStreamId(), Integer.valueOf(streamMessage.getStreamPartition()), ((StreamMessage) arrayList.get(0)).getMessageRef(), ((StreamMessage) arrayList.get(arrayList.size() - 1)).getMessageRef(), streamMessage.getMessageRef());
        if (!this.skipGapsOnFullQueue) {
            throw new IllegalStateException("Queue is full! Message." + format);
        }
        log.warn("Queue is full. Emptying and processing new message. " + format);
        clearGap();
        this.queue.clear();
        process(streamMessage);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void clearGap() {
        if (this.gap != null) {
            this.gap.cancel();
            this.gap.purge();
            this.gap = null;
            if (this.gapException != null) {
                throw this.gapException;
            }
        }
    }

    public synchronized boolean hasGap() {
        return this.gap != null;
    }

    public Address getPublisherId() {
        return this.publisherId;
    }

    public String getMsgChainId() {
        return this.msgChainId;
    }

    public synchronized void setLastReceived(MessageRef messageRef) {
        this.lastReceived = messageRef;
    }

    public synchronized MessageRef getLastReceived() {
        return this.lastReceived;
    }

    private boolean isNextMessage(StreamMessage streamMessage) {
        return (this.lastReceived == null) || (streamMessage.getPreviousMessageRef() != null && streamMessage.getPreviousMessageRef().compareTo(this.lastReceived) == 0) || (streamMessage.getPreviousMessageRef() == null && streamMessage.getMessageRef().compareTo(this.lastReceived) > 0);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void checkQueue() {
        while (!this.queue.isEmpty()) {
            StreamMessage peek = this.queue.peek();
            if (peek != null && isNextMessage(peek)) {
                this.queue.poll();
                clearGap();
                process(peek);
            } else if (peek == null || this.lastReceived == null || peek.getMessageRef().compareTo(this.lastReceived) > 0) {
                return;
            } else {
                this.queue.poll();
            }
        }
    }

    private void process(StreamMessage streamMessage) {
        this.lastReceived = streamMessage.getMessageRef();
        this.inOrderHandler.accept(streamMessage);
    }

    private void scheduleGap() {
        this.gapRequestCount = 0;
        this.gap = new Timer(true);
        this.gap.schedule(new TimerTask() { // from class: com.streamr.client.utils.OrderedMsgChain.2
            /* JADX WARN: Finally extract failed */
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                synchronized (OrderedMsgChain.this) {
                    OrderedMsgChain.this.checkQueue();
                    if (OrderedMsgChain.this.gap == null || OrderedMsgChain.this.queue.isEmpty()) {
                        return;
                    }
                    MessageRef messageRef = new MessageRef(Long.valueOf(OrderedMsgChain.this.lastReceived.getTimestamp()), OrderedMsgChain.this.lastReceived.getSequenceNumber() + 1);
                    MessageRef previousMessageRef = ((StreamMessage) OrderedMsgChain.this.queue.peek()).getPreviousMessageRef();
                    if (messageRef.compareTo(previousMessageRef) > 0) {
                        throw new IllegalStateException(String.format("From (%s) is after to (%s)!", messageRef.toString(), previousMessageRef.toString()));
                    }
                    if (OrderedMsgChain.this.gapRequestCount < 10) {
                        OrderedMsgChain.access$408(OrderedMsgChain.this);
                        if (OrderedMsgChain.this.gapHandler != null) {
                            OrderedMsgChain.this.gapHandler.apply(messageRef, previousMessageRef, OrderedMsgChain.this.publisherId, OrderedMsgChain.this.msgChainId);
                        } else {
                            OrderedMsgChain.log.error(String.format("Failed to request gapfill because the gapHandler is null. streamId %s, streamPartition %d, publisherId %s, msgChainId %s.", ((StreamMessage) OrderedMsgChain.this.queue.peek()).getStreamId(), Integer.valueOf(((StreamMessage) OrderedMsgChain.this.queue.peek()).getStreamPartition()), OrderedMsgChain.this.publisherId, OrderedMsgChain.this.msgChainId));
                        }
                    } else {
                        try {
                            if (OrderedMsgChain.this.gapFillFailedHandler != null) {
                                OrderedMsgChain.this.gapFillFailedHandler.apply(new GapFillFailedException(messageRef, previousMessageRef, OrderedMsgChain.this.publisherId, OrderedMsgChain.this.msgChainId, 10));
                            } else {
                                OrderedMsgChain.log.error(String.format("Failed to report failed gapfill because gapFillFailedHandler is null. streamId %s, streamPartition %d, publisherId %s, msgChainId %s.", ((StreamMessage) OrderedMsgChain.this.queue.peek()).getStreamId(), Integer.valueOf(((StreamMessage) OrderedMsgChain.this.queue.peek()).getStreamPartition()), OrderedMsgChain.this.publisherId, OrderedMsgChain.this.msgChainId));
                            }
                            OrderedMsgChain.this.clearGap();
                            OrderedMsgChain.log.warn("Unable to fill gap: Max retries reached! Ignoring the error and continuing from the first processable message: " + ((StreamMessage) OrderedMsgChain.this.queue.peek()).getMessageRef());
                            OrderedMsgChain.this.lastReceived = ((StreamMessage) OrderedMsgChain.this.queue.peek()).getPreviousMessageRef();
                            OrderedMsgChain.this.checkQueue();
                        } catch (Throwable th) {
                            OrderedMsgChain.this.clearGap();
                            OrderedMsgChain.log.warn("Unable to fill gap: Max retries reached! Ignoring the error and continuing from the first processable message: " + ((StreamMessage) OrderedMsgChain.this.queue.peek()).getMessageRef());
                            OrderedMsgChain.this.lastReceived = ((StreamMessage) OrderedMsgChain.this.queue.peek()).getPreviousMessageRef();
                            OrderedMsgChain.this.checkQueue();
                            throw th;
                        }
                    }
                }
            }
        }, this.propagationTimeout, this.resendTimeout);
    }

    boolean isQueueFull() {
        return this.queue.size() == MAX_QUEUE_SIZE;
    }

    static /* synthetic */ int access$408(OrderedMsgChain orderedMsgChain) {
        int i = orderedMsgChain.gapRequestCount;
        orderedMsgChain.gapRequestCount = i + 1;
        return i;
    }
}
