package org.apache.hedwig.server.delivery;

import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.bookkeeper.util.MathUtils;
import org.apache.hedwig.client.data.TopicSubscriber;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.filter.ServerMessageFilter;
import org.apache.hedwig.protocol.PubSubProtocol;
import org.apache.hedwig.protoextensions.PubSubResponseUtils;
import org.apache.hedwig.server.common.ServerConfiguration;
import org.apache.hedwig.server.common.UnexpectedError;
import org.apache.hedwig.server.handlers.SubscriptionChannelManager;
import org.apache.hedwig.server.netty.ServerStats;
import org.apache.hedwig.server.persistence.CancelScanRequest;
import org.apache.hedwig.server.persistence.Factory;
import org.apache.hedwig.server.persistence.MapMethods;
import org.apache.hedwig.server.persistence.PersistenceManager;
import org.apache.hedwig.server.persistence.ReadAheadCache;
import org.apache.hedwig.server.persistence.ScanCallback;
import org.apache.hedwig.server.persistence.ScanRequest;
import org.apache.hedwig.util.Callback;
import org.apache.hedwig.util.VarArgs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hedwig/server/delivery/FIFODeliveryManager.class */
public class FIFODeliveryManager implements Runnable, DeliveryManager, SubscriptionChannelManager.SubChannelDisconnectedListener {
    protected static final Logger logger;
    private static Callback<Void> NOP_CALLBACK;
    Map<ByteString, SortedMap<Long, Set<ActiveSubscriberState>>> perTopicDeliveryPtrs;
    Map<TopicSubscriber, ActiveSubscriberState> subscriberStates;
    private final ReadAheadCache cache;
    private final PersistenceManager persistenceMgr;
    private ServerConfiguration cfg;
    private final Thread workerThread;
    static final /* synthetic */ boolean $assertionsDisabled;
    BlockingQueue<DeliveryManagerRequest> requestQueue = new LinkedBlockingQueue();
    Queue<ActiveSubscriberState> retryQueue = new PriorityBlockingQueue(32, new Comparator<ActiveSubscriberState>() { // from class: org.apache.hedwig.server.delivery.FIFODeliveryManager.2
        @Override // java.util.Comparator
        public int compare(ActiveSubscriberState activeSubscriberState, ActiveSubscriberState activeSubscriberState2) {
            long j = activeSubscriberState.lastScanErrorTime - activeSubscriberState2.lastScanErrorTime;
            if (j > 0) {
                return 1;
            }
            return j < 0 ? -1 : 0;
        }
    });
    protected boolean keepRunning = true;
    private Object suspensionLock = new Object();
    private boolean suspended = false;

    /* loaded from: input_file:org/apache/hedwig/server/delivery/FIFODeliveryManager$ActiveSubscriberState.class */
    public class ActiveSubscriberState implements ScanCallback, DeliveryCallback, DeliveryManagerRequest, CancelScanRequest {
        static final int UNLIMITED = 0;
        ByteString topic;
        ByteString subscriberId;
        long lastLocalSeqIdDelivered;
        DeliveryEndPoint deliveryEndPoint;
        long localSeqIdDeliveringNow;
        long lastSeqIdCommunicatedExternally;
        long lastSeqIdConsumedUtil;
        final int messageWindowSize;
        ServerMessageFilter filter;
        Callback<Void> cb;
        Object ctx;
        ScanRequest outstandingScanRequest;
        static final int SEQ_ID_SLACK = 10;
        boolean connected = true;
        ReentrantReadWriteLock connectedLock = new ReentrantReadWriteLock();
        long lastScanErrorTime = -1;
        boolean isThrottled = false;

        public ActiveSubscriberState(ByteString byteString, ByteString byteString2, PubSubProtocol.SubscriptionPreferences subscriptionPreferences, long j, DeliveryEndPoint deliveryEndPoint, ServerMessageFilter serverMessageFilter, Callback<Void> callback, Object obj) {
            this.topic = byteString;
            this.subscriberId = byteString2;
            this.lastLocalSeqIdDelivered = j;
            this.lastSeqIdConsumedUtil = j;
            this.deliveryEndPoint = deliveryEndPoint;
            this.filter = serverMessageFilter;
            if (subscriptionPreferences.hasMessageWindowSize()) {
                this.messageWindowSize = subscriptionPreferences.getMessageWindowSize();
            } else if (FIFODeliveryManager.this.cfg.getDefaultMessageWindowSize() > 0) {
                this.messageWindowSize = FIFODeliveryManager.this.cfg.getDefaultMessageWindowSize();
            } else {
                this.messageWindowSize = UNLIMITED;
            }
            this.cb = callback;
            this.ctx = obj;
        }

        public void setNotConnected(PubSubProtocol.SubscriptionEvent subscriptionEvent) {
            this.connectedLock.writeLock().lock();
            try {
                if (this.connected) {
                    this.connected = false;
                    if (UNLIMITED != FIFODeliveryManager.this.cache && UNLIMITED != this.outstandingScanRequest) {
                        FIFODeliveryManager.this.cache.cancelScanRequest(this.topic, this);
                    }
                    this.connectedLock.writeLock().unlock();
                    if (UNLIMITED != subscriptionEvent && (PubSubProtocol.SubscriptionEvent.TOPIC_MOVED == subscriptionEvent || PubSubProtocol.SubscriptionEvent.SUBSCRIPTION_FORCED_CLOSED == subscriptionEvent)) {
                        this.deliveryEndPoint.send(PubSubResponseUtils.getResponseForSubscriptionEvent(this.topic, this.subscriberId, subscriptionEvent), new DeliveryCallback() { // from class: org.apache.hedwig.server.delivery.FIFODeliveryManager.ActiveSubscriberState.1
                            @Override // org.apache.hedwig.server.delivery.DeliveryCallback
                            public void sendingFinished() {
                            }

                            @Override // org.apache.hedwig.server.delivery.DeliveryCallback
                            public void transientErrorOnSend() {
                            }

                            @Override // org.apache.hedwig.server.delivery.DeliveryCallback
                            public void permanentErrorOnSend() {
                                ActiveSubscriberState.this.deliveryEndPoint.close();
                            }
                        });
                    }
                    this.filter.uninitialize();
                }
            } finally {
                this.connectedLock.writeLock().unlock();
            }
        }

        public ByteString getTopic() {
            return this.topic;
        }

        public synchronized long getLastScanErrorTime() {
            return this.lastScanErrorTime;
        }

        public synchronized void setLastScanErrorTime(long j) {
            this.lastScanErrorTime = j;
        }

        protected synchronized void clearLastScanErrorTime() {
            this.lastScanErrorTime = -1L;
        }

        protected boolean isConnected() {
            this.connectedLock.readLock().lock();
            try {
                boolean z = this.connected;
                this.connectedLock.readLock().unlock();
                return z;
            } catch (Throwable th) {
                this.connectedLock.readLock().unlock();
                throw th;
            }
        }

        protected synchronized void messageConsumed(long j) {
            if (j <= this.lastSeqIdConsumedUtil) {
                return;
            }
            if (FIFODeliveryManager.logger.isDebugEnabled()) {
                FIFODeliveryManager.logger.debug("Subscriber ({}) moved consumed ptr from {} to {}.", VarArgs.va(new Object[]{this, Long.valueOf(this.lastSeqIdConsumedUtil), Long.valueOf(j)}));
            }
            this.lastSeqIdConsumedUtil = j;
            if (!msgLimitExceeded() && this.isThrottled) {
                this.isThrottled = false;
                FIFODeliveryManager.logger.info("Try to wake up subscriber ({}) to deliver messages again : last delivered {}, last consumed {}.", VarArgs.va(new Object[]{this, Long.valueOf(this.lastLocalSeqIdDelivered), Long.valueOf(this.lastSeqIdConsumedUtil)}));
                FIFODeliveryManager.this.enqueueWithoutFailure(new DeliveryManagerRequest() { // from class: org.apache.hedwig.server.delivery.FIFODeliveryManager.ActiveSubscriberState.2
                    @Override // org.apache.hedwig.server.delivery.FIFODeliveryManager.DeliveryManagerRequest
                    public void performRequest() {
                        FIFODeliveryManager.this.clearRetryDelayForSubscriber(ActiveSubscriberState.this);
                    }
                });
            }
        }

        protected boolean msgLimitExceeded() {
            return this.messageWindowSize != 0 && this.lastLocalSeqIdDelivered - this.lastSeqIdConsumedUtil >= ((long) this.messageWindowSize);
        }

        public void deliverNextMessage() {
            this.connectedLock.readLock().lock();
            try {
                doDeliverNextMessage();
                this.connectedLock.readLock().unlock();
            } catch (Throwable th) {
                this.connectedLock.readLock().unlock();
                throw th;
            }
        }

        private void doDeliverNextMessage() {
            if (this.connected) {
                synchronized (this) {
                    if (msgLimitExceeded()) {
                        FIFODeliveryManager.logger.info("Subscriber ({}) is throttled : last delivered {}, last consumed {}.", VarArgs.va(new Object[]{this, Long.valueOf(this.lastLocalSeqIdDelivered), Long.valueOf(this.lastSeqIdConsumedUtil)}));
                        this.isThrottled = true;
                    } else {
                        this.localSeqIdDeliveringNow = FIFODeliveryManager.this.persistenceMgr.getSeqIdAfterSkipping(this.topic, this.lastLocalSeqIdDelivered, 1);
                        this.outstandingScanRequest = new ScanRequest(this.topic, this.localSeqIdDeliveringNow, this, null);
                        FIFODeliveryManager.this.persistenceMgr.scanSingleMessage(this.outstandingScanRequest);
                    }
                }
            }
        }

        @Override // org.apache.hedwig.server.persistence.CancelScanRequest
        public ScanRequest getScanRequest() {
            return this.outstandingScanRequest;
        }

        private boolean checkConnected() {
            this.connectedLock.readLock().lock();
            try {
                this.outstandingScanRequest = null;
                boolean z = this.connected;
                this.connectedLock.readLock().unlock();
                return z;
            } catch (Throwable th) {
                this.connectedLock.readLock().unlock();
                throw th;
            }
        }

        @Override // org.apache.hedwig.server.persistence.ScanCallback
        public void messageScanned(Object obj, PubSubProtocol.Message message) {
            if (checkConnected()) {
                if (this.filter.testMessage(message)) {
                    this.deliveryEndPoint.send(PubSubProtocol.PubSubResponse.newBuilder().setProtocolVersion(PubSubProtocol.ProtocolVersion.VERSION_ONE).setStatusCode(PubSubProtocol.StatusCode.SUCCESS).setTxnId(0L).setMessage(message).setTopic(this.topic).setSubscriberId(this.subscriberId).build(), this);
                } else {
                    messageConsumed(message.getMsgId().getLocalComponent());
                    sendingFinished();
                }
            }
        }

        @Override // org.apache.hedwig.server.persistence.ScanCallback
        public void scanFailed(Object obj, Exception exc) {
            if (checkConnected()) {
                FIFODeliveryManager.this.retryErroredSubscriberAfterDelay(this);
            }
        }

        @Override // org.apache.hedwig.server.persistence.ScanCallback
        public void scanFinished(Object obj, ScanCallback.ReasonForFinish reasonForFinish) {
            checkConnected();
        }

        @Override // org.apache.hedwig.server.delivery.DeliveryCallback
        public void sendingFinished() {
            if (isConnected()) {
                synchronized (this) {
                    this.lastLocalSeqIdDelivered = this.localSeqIdDeliveringNow;
                    if (this.lastLocalSeqIdDelivered > this.lastSeqIdCommunicatedExternally + 10) {
                        long j = this.lastSeqIdCommunicatedExternally;
                        this.lastSeqIdCommunicatedExternally = this.lastLocalSeqIdDelivered;
                        FIFODeliveryManager.this.moveDeliveryPtrForward(this, j, this.lastLocalSeqIdDelivered);
                    }
                }
                ServerStats.getInstance().incrementMessagesDelivered();
                deliverNextMessage();
            }
        }

        public synchronized long getLastSeqIdCommunicatedExternally() {
            return this.lastSeqIdCommunicatedExternally;
        }

        @Override // org.apache.hedwig.server.delivery.DeliveryCallback
        public void permanentErrorOnSend() {
            FIFODeliveryManager.this.stopServingSubscriber(this.topic, this.subscriberId, null, FIFODeliveryManager.NOP_CALLBACK, null);
        }

        @Override // org.apache.hedwig.server.delivery.DeliveryCallback
        public void transientErrorOnSend() {
            FIFODeliveryManager.this.retryErroredSubscriberAfterDelay(this);
        }

        @Override // org.apache.hedwig.server.delivery.FIFODeliveryManager.DeliveryManagerRequest
        public void performRequest() {
            PubSubProtocol.SubscriptionEvent subscriptionEvent;
            ActiveSubscriberState put = FIFODeliveryManager.this.subscriberStates.put(new TopicSubscriber(this.topic, this.subscriberId), this);
            this.cb.operationFinished(this.ctx, (Void) null);
            if (put != null) {
                if (this.deliveryEndPoint.equals(put.deliveryEndPoint)) {
                    FIFODeliveryManager.logger.debug("Subscriber {} replaced a duplicated subscriber {} at same delivery point {}.", VarArgs.va(new Object[]{this, put, this.deliveryEndPoint}));
                    subscriptionEvent = UNLIMITED;
                } else {
                    FIFODeliveryManager.logger.debug("Subscriber {} from delivery point {} forcelly closed delivery point {}.", VarArgs.va(new Object[]{this, this.deliveryEndPoint, put.deliveryEndPoint}));
                    subscriptionEvent = PubSubProtocol.SubscriptionEvent.SUBSCRIPTION_FORCED_CLOSED;
                }
                FIFODeliveryManager.this.doStopServingSubscriber(put, subscriptionEvent);
            }
            synchronized (this) {
                this.lastSeqIdCommunicatedExternally = this.lastLocalSeqIdDelivered;
                FIFODeliveryManager.this.addDeliveryPtr(this, Long.valueOf(this.lastLocalSeqIdDelivered));
            }
            deliverNextMessage();
        }

        public String toString() {
            return "Topic: " + this.topic.toStringUtf8() + "Subscriber: " + this.subscriberId.toStringUtf8() + ", DeliveryPtr: " + this.lastLocalSeqIdDelivered;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/hedwig/server/delivery/FIFODeliveryManager$DeliveryManagerRequest.class */
    public interface DeliveryManagerRequest {
        void performRequest();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/hedwig/server/delivery/FIFODeliveryManager$DeliveryPtrMove.class */
    public class DeliveryPtrMove implements DeliveryManagerRequest {
        ActiveSubscriberState subscriber;
        Long oldSeqId;
        Long newSeqId;

        public DeliveryPtrMove(ActiveSubscriberState activeSubscriberState, Long l, Long l2) {
            this.subscriber = activeSubscriberState;
            this.oldSeqId = l;
            this.newSeqId = l2;
        }

        @Override // org.apache.hedwig.server.delivery.FIFODeliveryManager.DeliveryManagerRequest
        public void performRequest() {
            ByteString topic = this.subscriber.getTopic();
            long minimumSeqId = FIFODeliveryManager.this.getMinimumSeqId(topic);
            if (this.subscriber.isConnected()) {
                FIFODeliveryManager.this.removeDeliveryPtr(this.subscriber, this.oldSeqId, false, false);
                FIFODeliveryManager.this.addDeliveryPtr(this.subscriber, this.newSeqId);
            } else {
                FIFODeliveryManager.this.removeDeliveryPtr(this.subscriber, this.oldSeqId, true, true);
            }
            long minimumSeqId2 = FIFODeliveryManager.this.getMinimumSeqId(topic);
            if (minimumSeqId2 > minimumSeqId) {
                FIFODeliveryManager.this.persistenceMgr.deliveredUntil(topic, Long.valueOf(minimumSeqId2));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/hedwig/server/delivery/FIFODeliveryManager$HashMapSubscriberFactory.class */
    public static class HashMapSubscriberFactory implements Factory<Set<ActiveSubscriberState>> {
        static HashMapSubscriberFactory instance = new HashMapSubscriberFactory();

        protected HashMapSubscriberFactory() {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.hedwig.server.persistence.Factory
        public Set<ActiveSubscriberState> newInstance() {
            return new HashSet();
        }
    }

    /* loaded from: input_file:org/apache/hedwig/server/delivery/FIFODeliveryManager$ShutdownDeliveryManagerRequest.class */
    protected class ShutdownDeliveryManagerRequest implements DeliveryManagerRequest {
        protected ShutdownDeliveryManagerRequest() {
        }

        @Override // org.apache.hedwig.server.delivery.FIFODeliveryManager.DeliveryManagerRequest
        public void performRequest() {
            FIFODeliveryManager.this.keepRunning = false;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/hedwig/server/delivery/FIFODeliveryManager$StopServingSubscriber.class */
    public class StopServingSubscriber implements DeliveryManagerRequest {
        TopicSubscriber ts;
        PubSubProtocol.SubscriptionEvent event;
        final Callback<Void> cb;
        final Object ctx;

        public StopServingSubscriber(ByteString byteString, ByteString byteString2, PubSubProtocol.SubscriptionEvent subscriptionEvent, Callback<Void> callback, Object obj) {
            this.ts = new TopicSubscriber(byteString, byteString2);
            this.event = subscriptionEvent;
            this.cb = callback;
            this.ctx = obj;
        }

        @Override // org.apache.hedwig.server.delivery.FIFODeliveryManager.DeliveryManagerRequest
        public void performRequest() {
            ActiveSubscriberState remove = FIFODeliveryManager.this.subscriberStates.remove(this.ts);
            if (null != remove) {
                FIFODeliveryManager.this.doStopServingSubscriber(remove, this.event);
            }
            this.cb.operationFinished(this.ctx, (Object) null);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/hedwig/server/delivery/FIFODeliveryManager$TreeMapLongToSetSubscriberFactory.class */
    public static class TreeMapLongToSetSubscriberFactory implements Factory<SortedMap<Long, Set<ActiveSubscriberState>>> {
        static TreeMapLongToSetSubscriberFactory instance = new TreeMapLongToSetSubscriberFactory();

        protected TreeMapLongToSetSubscriberFactory() {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.hedwig.server.persistence.Factory
        public SortedMap<Long, Set<ActiveSubscriberState>> newInstance() {
            return new TreeMap();
        }
    }

    public FIFODeliveryManager(PersistenceManager persistenceManager, ServerConfiguration serverConfiguration) {
        this.persistenceMgr = persistenceManager;
        if (persistenceManager instanceof ReadAheadCache) {
            this.cache = (ReadAheadCache) persistenceManager;
        } else {
            this.cache = null;
        }
        this.perTopicDeliveryPtrs = new HashMap();
        this.subscriberStates = new HashMap();
        this.workerThread = new Thread(this, "DeliveryManagerThread");
        this.cfg = serverConfiguration;
    }

    @Override // org.apache.hedwig.server.delivery.DeliveryManager
    public void start() {
        this.workerThread.start();
    }

    @VisibleForTesting
    public void suspendProcessing() {
        synchronized (this.suspensionLock) {
            this.suspended = true;
        }
    }

    @VisibleForTesting
    public void resumeProcessing() {
        synchronized (this.suspensionLock) {
            this.suspended = false;
            this.suspensionLock.notify();
        }
    }

    protected void enqueueWithoutFailure(DeliveryManagerRequest deliveryManagerRequest) {
        if (!this.requestQueue.offer(deliveryManagerRequest)) {
            throw new UnexpectedError("Could not enqueue object: " + deliveryManagerRequest + " to delivery manager request queue.");
        }
    }

    @Override // org.apache.hedwig.server.delivery.DeliveryManager
    public void startServingSubscription(ByteString byteString, ByteString byteString2, PubSubProtocol.SubscriptionPreferences subscriptionPreferences, PubSubProtocol.MessageSeqId messageSeqId, DeliveryEndPoint deliveryEndPoint, ServerMessageFilter serverMessageFilter, Callback<Void> callback, Object obj) {
        enqueueWithoutFailure(new ActiveSubscriberState(byteString, byteString2, subscriptionPreferences, messageSeqId.getLocalComponent() - 1, deliveryEndPoint, serverMessageFilter, callback, obj));
    }

    @Override // org.apache.hedwig.server.delivery.DeliveryManager
    public void stopServingSubscriber(ByteString byteString, ByteString byteString2, PubSubProtocol.SubscriptionEvent subscriptionEvent, Callback<Void> callback, Object obj) {
        enqueueWithoutFailure(new StopServingSubscriber(byteString, byteString2, subscriptionEvent, callback, obj));
    }

    public void retryErroredSubscriberAfterDelay(ActiveSubscriberState activeSubscriberState) {
        activeSubscriberState.setLastScanErrorTime(MathUtils.now());
        if (!this.retryQueue.offer(activeSubscriberState)) {
            throw new UnexpectedError("Could not enqueue to delivery manager retry queue");
        }
    }

    public void clearRetryDelayForSubscriber(ActiveSubscriberState activeSubscriberState) {
        activeSubscriberState.clearLastScanErrorTime();
        if (!this.retryQueue.offer(activeSubscriberState)) {
            throw new UnexpectedError("Could not enqueue to delivery manager retry queue");
        }
        if (this.requestQueue.isEmpty()) {
            enqueueWithoutFailure(new DeliveryManagerRequest() { // from class: org.apache.hedwig.server.delivery.FIFODeliveryManager.3
                @Override // org.apache.hedwig.server.delivery.FIFODeliveryManager.DeliveryManagerRequest
                public void performRequest() {
                }
            });
        }
    }

    @Override // org.apache.hedwig.server.delivery.DeliveryManager
    public void messageConsumed(ByteString byteString, ByteString byteString2, PubSubProtocol.MessageSeqId messageSeqId) {
        ActiveSubscriberState activeSubscriberState = this.subscriberStates.get(new TopicSubscriber(byteString, byteString2));
        if (null == activeSubscriberState) {
            return;
        }
        activeSubscriberState.messageConsumed(messageSeqId.getLocalComponent());
    }

    public void moveDeliveryPtrForward(ActiveSubscriberState activeSubscriberState, long j, long j2) {
        enqueueWithoutFailure(new DeliveryPtrMove(activeSubscriberState, Long.valueOf(j), Long.valueOf(j2)));
    }

    @Override // java.lang.Runnable
    public void run() {
        while (this.keepRunning) {
            DeliveryManagerRequest deliveryManagerRequest = null;
            try {
                deliveryManagerRequest = this.requestQueue.poll(1L, TimeUnit.SECONDS);
                synchronized (this.suspensionLock) {
                    while (this.suspended) {
                        this.suspensionLock.wait();
                    }
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            retryErroredSubscribers();
            if (deliveryManagerRequest != null) {
                deliveryManagerRequest.performRequest();
            }
        }
    }

    @Override // org.apache.hedwig.server.delivery.DeliveryManager
    public void stop() {
        enqueueWithoutFailure(new ShutdownDeliveryManagerRequest());
    }

    protected void retryErroredSubscribers() {
        long now = MathUtils.now() - this.cfg.getScanBackoffPeriodMs();
        while (true) {
            ActiveSubscriberState peek = this.retryQueue.peek();
            if (peek == null || peek.getLastScanErrorTime() > now) {
                return;
            }
            peek.deliverNextMessage();
            this.retryQueue.poll();
        }
    }

    protected void removeDeliveryPtr(ActiveSubscriberState activeSubscriberState, Long l, boolean z, boolean z2) {
        if (!$assertionsDisabled && l == null) {
            throw new AssertionError();
        }
        ByteString topic = activeSubscriberState.getTopic();
        SortedMap<Long, Set<ActiveSubscriberState>> sortedMap = this.perTopicDeliveryPtrs.get(topic);
        if (sortedMap == null && !z) {
            throw new UnexpectedError("No delivery pointers found while disconnecting channel for topic:" + topic);
        }
        if (null == sortedMap) {
            return;
        }
        if (!MapMethods.removeFromMultiMap(sortedMap, l, activeSubscriberState) && !z) {
            throw new UnexpectedError("Could not find subscriber:" + activeSubscriberState + " at the expected delivery pointer");
        }
        if (z2 && sortedMap.isEmpty()) {
            this.perTopicDeliveryPtrs.remove(topic);
        }
    }

    protected long getMinimumSeqId(ByteString byteString) {
        SortedMap<Long, Set<ActiveSubscriberState>> sortedMap = this.perTopicDeliveryPtrs.get(byteString);
        if (sortedMap == null || sortedMap.isEmpty()) {
            return 9223372036854775806L;
        }
        return sortedMap.firstKey().longValue();
    }

    protected void addDeliveryPtr(ActiveSubscriberState activeSubscriberState, Long l) {
        MapMethods.addToMultiMap((SortedMap) MapMethods.getAfterInsertingIfAbsent(this.perTopicDeliveryPtrs, activeSubscriberState.getTopic(), TreeMapLongToSetSubscriberFactory.instance), l, activeSubscriberState, HashMapSubscriberFactory.instance);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doStopServingSubscriber(ActiveSubscriberState activeSubscriberState, PubSubProtocol.SubscriptionEvent subscriptionEvent) {
        activeSubscriberState.setNotConnected(subscriptionEvent);
        removeDeliveryPtr(activeSubscriberState, Long.valueOf(activeSubscriberState.getLastSeqIdCommunicatedExternally()), true, true);
    }

    @Override // org.apache.hedwig.server.handlers.SubscriptionChannelManager.SubChannelDisconnectedListener
    public void onSubChannelDisconnected(TopicSubscriber topicSubscriber) {
        stopServingSubscriber(topicSubscriber.getTopic(), topicSubscriber.getSubscriberId(), null, NOP_CALLBACK, null);
    }

    static {
        $assertionsDisabled = !FIFODeliveryManager.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(FIFODeliveryManager.class);
        NOP_CALLBACK = new Callback<Void>() { // from class: org.apache.hedwig.server.delivery.FIFODeliveryManager.1
            public void operationFinished(Object obj, Void r3) {
            }

            public void operationFailed(Object obj, PubSubException pubSubException) {
            }
        };
    }
}
