package org.apache.hedwig.server.subscriptions;

import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.bookkeeper.versioning.Version;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.protocol.PubSubProtocol;
import org.apache.hedwig.protoextensions.MessageIdUtils;
import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
import org.apache.hedwig.server.common.ServerConfiguration;
import org.apache.hedwig.server.common.TopicOpQueuer;
import org.apache.hedwig.server.delivery.DeliveryManager;
import org.apache.hedwig.server.persistence.PersistenceManager;
import org.apache.hedwig.server.topics.TopicManager;
import org.apache.hedwig.server.topics.TopicOwnershipChangeListener;
import org.apache.hedwig.util.Callback;
import org.apache.hedwig.util.CallbackUtils;
import org.apache.hedwig.util.ConcurrencyUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.class */
public abstract class AbstractSubscriptionManager implements SubscriptionManager, TopicOwnershipChangeListener {
    static Logger logger = LoggerFactory.getLogger(AbstractSubscriptionManager.class);
    protected final ServerConfiguration cfg;
    protected final TopicOpQueuer queuer;
    private final DeliveryManager dm;
    private final PersistenceManager pm;
    protected final ConcurrentHashMap<ByteString, Map<ByteString, InMemorySubscriptionState>> top2sub2seq = new ConcurrentHashMap<>();
    private final ArrayList<SubscriptionEventListener> listeners = new ArrayList<>();
    private final Timer timer = new Timer(true);
    private final ConcurrentHashMap<ByteString, Long> topic2MinConsumedMessagesMap = new ConcurrentHashMap<>();
    protected final Callback<Void> noopCallback = new NoopCallback();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager$AcquireOp.class */
    public class AcquireOp extends TopicOpQueuer.AsynchronousOp<Void> {
        /* JADX WARN: Illegal instructions before constructor call */
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public AcquireOp(com.google.protobuf.ByteString r8, org.apache.hedwig.util.Callback<java.lang.Void> r9, java.lang.Object r10) {
            /*
                r6 = this;
                r0 = r6
                r1 = r7
                org.apache.hedwig.server.subscriptions.AbstractSubscriptionManager.this = r1
                r0 = r6
                r1 = r7
                org.apache.hedwig.server.common.TopicOpQueuer r1 = r1.queuer
                r2 = r1
                java.lang.Class r2 = r2.getClass()
                r2 = r8
                r3 = r9
                r4 = r10
                r0.<init>(r2, r3, r4)
                return
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.hedwig.server.subscriptions.AbstractSubscriptionManager.AcquireOp.<init>(org.apache.hedwig.server.subscriptions.AbstractSubscriptionManager, com.google.protobuf.ByteString, org.apache.hedwig.util.Callback, java.lang.Object):void");
        }

        @Override // java.lang.Runnable
        public void run() {
            if (AbstractSubscriptionManager.this.top2sub2seq.containsKey(this.topic)) {
                this.cb.operationFinished(this.ctx, (Object) null);
            } else {
                AbstractSubscriptionManager.this.readSubscriptions(this.topic, new Callback<Map<ByteString, InMemorySubscriptionState>>() { // from class: org.apache.hedwig.server.subscriptions.AbstractSubscriptionManager.AcquireOp.1
                    public void operationFailed(Object obj, PubSubException pubSubException) {
                        AcquireOp.this.cb.operationFailed(obj, pubSubException);
                    }

                    public void operationFinished(Object obj, final Map<ByteString, InMemorySubscriptionState> map) {
                        Callback<Void> callback = new Callback<Void>() { // from class: org.apache.hedwig.server.subscriptions.AbstractSubscriptionManager.AcquireOp.1.1
                            public void operationFailed(Object obj2, PubSubException pubSubException) {
                                AbstractSubscriptionManager.logger.error("Subscription manager failed to acquired topic " + AcquireOp.this.topic.toStringUtf8(), pubSubException);
                                AcquireOp.this.cb.operationFailed(obj2, (PubSubException) null);
                            }

                            public void operationFinished(Object obj2, Void r6) {
                                AbstractSubscriptionManager.this.top2sub2seq.put(AcquireOp.this.topic, map);
                                AbstractSubscriptionManager.logger.info("Subscription manager successfully acquired topic: " + AcquireOp.this.topic.toStringUtf8());
                                AcquireOp.this.cb.operationFinished(obj2, (Object) null);
                            }
                        };
                        if (AbstractSubscriptionManager.hasLocalSubscriptions(map)) {
                            AbstractSubscriptionManager.this.notifyFirstLocalSubscribe(AcquireOp.this.topic, false, callback, obj);
                        } else {
                            callback.operationFinished(obj, (Object) null);
                        }
                        AbstractSubscriptionManager.this.updateMessageBound(AcquireOp.this.topic);
                    }
                }, this.ctx);
            }
        }
    }

    /* loaded from: input_file:org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager$CloseSubscriptionOp.class */
    private class CloseSubscriptionOp extends TopicOpQueuer.AsynchronousOp<Void> {
        /* JADX WARN: Illegal instructions before constructor call */
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public CloseSubscriptionOp(com.google.protobuf.ByteString r8, com.google.protobuf.ByteString r9, org.apache.hedwig.util.Callback<java.lang.Void> r10, java.lang.Object r11) {
            /*
                r6 = this;
                r0 = r6
                r1 = r7
                org.apache.hedwig.server.subscriptions.AbstractSubscriptionManager.this = r1
                r0 = r6
                r1 = r7
                org.apache.hedwig.server.common.TopicOpQueuer r1 = r1.queuer
                r2 = r1
                java.lang.Class r2 = r2.getClass()
                r2 = r8
                r3 = r10
                r4 = r11
                r0.<init>(r2, r3, r4)
                return
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.hedwig.server.subscriptions.AbstractSubscriptionManager.CloseSubscriptionOp.<init>(org.apache.hedwig.server.subscriptions.AbstractSubscriptionManager, com.google.protobuf.ByteString, com.google.protobuf.ByteString, org.apache.hedwig.util.Callback, java.lang.Object):void");
        }

        @Override // java.lang.Runnable
        public void run() {
            this.cb.operationFinished(this.ctx, (Object) null);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager$ConsumeOp.class */
    public class ConsumeOp extends TopicOpQueuer.AsynchronousOp<Void> {
        ByteString subscriberId;
        PubSubProtocol.MessageSeqId consumeSeqId;

        /* JADX WARN: Illegal instructions before constructor call */
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public ConsumeOp(com.google.protobuf.ByteString r8, com.google.protobuf.ByteString r9, org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId r10, org.apache.hedwig.util.Callback<java.lang.Void> r11, java.lang.Object r12) {
            /*
                r6 = this;
                r0 = r6
                r1 = r7
                org.apache.hedwig.server.subscriptions.AbstractSubscriptionManager.this = r1
                r0 = r6
                r1 = r7
                org.apache.hedwig.server.common.TopicOpQueuer r1 = r1.queuer
                r2 = r1
                java.lang.Class r2 = r2.getClass()
                r2 = r8
                r3 = r11
                r4 = r12
                r0.<init>(r2, r3, r4)
                r0 = r6
                r1 = r9
                r0.subscriberId = r1
                r0 = r6
                r1 = r10
                r0.consumeSeqId = r1
                return
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.hedwig.server.subscriptions.AbstractSubscriptionManager.ConsumeOp.<init>(org.apache.hedwig.server.subscriptions.AbstractSubscriptionManager, com.google.protobuf.ByteString, com.google.protobuf.ByteString, org.apache.hedwig.protocol.PubSubProtocol$MessageSeqId, org.apache.hedwig.util.Callback, java.lang.Object):void");
        }

        @Override // java.lang.Runnable
        public void run() {
            Map<ByteString, InMemorySubscriptionState> map = AbstractSubscriptionManager.this.top2sub2seq.get(this.topic);
            if (map == null) {
                this.cb.operationFinished(this.ctx, (Object) null);
                return;
            }
            final InMemorySubscriptionState inMemorySubscriptionState = map.get(this.subscriberId);
            if (inMemorySubscriptionState == null) {
                this.cb.operationFinished(this.ctx, (Object) null);
                return;
            }
            if (inMemorySubscriptionState.setLastConsumeSeqId(this.consumeSeqId, AbstractSubscriptionManager.this.cfg.getConsumeInterval())) {
                AbstractSubscriptionManager.this.updateSubscriptionState(this.topic, this.subscriberId, inMemorySubscriptionState, new Callback<Void>() { // from class: org.apache.hedwig.server.subscriptions.AbstractSubscriptionManager.ConsumeOp.1
                    public void operationFinished(Object obj, Void r6) {
                        inMemorySubscriptionState.setLastPersistedSeqId(ConsumeOp.this.consumeSeqId.getLocalComponent());
                        ConsumeOp.this.cb.operationFinished(obj, r6);
                    }

                    public void operationFailed(Object obj, PubSubException pubSubException) {
                        ConsumeOp.this.cb.operationFailed(obj, pubSubException);
                    }
                }, this.ctx);
            } else {
                if (AbstractSubscriptionManager.logger.isDebugEnabled()) {
                    AbstractSubscriptionManager.logger.debug("Only advanced consume pointer in memory, will persist later, topic: " + this.topic.toStringUtf8() + " subscriberId: " + this.subscriberId.toStringUtf8() + " persistentState: " + SubscriptionStateUtils.toString(inMemorySubscriptionState.getSubscriptionState()) + " in-memory consume-id: " + MessageIdUtils.msgIdToReadableString(inMemorySubscriptionState.getLastConsumeSeqId()));
                }
                this.cb.operationFinished(this.ctx, (Object) null);
            }
            if (null != AbstractSubscriptionManager.this.dm) {
                AbstractSubscriptionManager.this.dm.messageConsumed(this.topic, this.subscriberId, this.consumeSeqId);
            }
        }
    }

    /* loaded from: input_file:org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager$MessagesConsumedTask.class */
    class MessagesConsumedTask extends TimerTask {
        MessagesConsumedTask() {
        }

        @Override // java.util.TimerTask, java.lang.Runnable
        public void run() {
            for (ByteString byteString : AbstractSubscriptionManager.this.top2sub2seq.keySet()) {
                Map<ByteString, InMemorySubscriptionState> map = AbstractSubscriptionManager.this.top2sub2seq.get(byteString);
                if (map != null) {
                    long j = Long.MAX_VALUE;
                    boolean z = true;
                    for (InMemorySubscriptionState inMemorySubscriptionState : map.values()) {
                        if (inMemorySubscriptionState.getLastPersistedSeqId() < j) {
                            j = inMemorySubscriptionState.getLastPersistedSeqId();
                        }
                        z = z && inMemorySubscriptionState.getSubscriptionPreferences().hasMessageBound();
                    }
                    Long l = (Long) AbstractSubscriptionManager.this.topic2MinConsumedMessagesMap.get(byteString);
                    if (map.isEmpty() || ((l != null && l.longValue() < j) || (l == null && j != 0))) {
                        AbstractSubscriptionManager.this.topic2MinConsumedMessagesMap.put(byteString, Long.valueOf(j));
                        AbstractSubscriptionManager.this.pm.consumedUntil(byteString, Long.valueOf(j));
                    } else if (z) {
                        AbstractSubscriptionManager.this.pm.consumeToBound(byteString);
                    }
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager$NoopCallback.class */
    static class NoopCallback<T> implements Callback<T> {
        NoopCallback() {
        }

        public void operationFailed(Object obj, PubSubException pubSubException) {
            AbstractSubscriptionManager.logger.warn("Exception found in AbstractSubscriptionManager : ", pubSubException);
        }

        public void operationFinished(Object obj, T t) {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager$ReleaseOp.class */
    public class ReleaseOp extends TopicOpQueuer.AsynchronousOp<Void> {
        /* JADX WARN: Illegal instructions before constructor call */
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public ReleaseOp(com.google.protobuf.ByteString r8, org.apache.hedwig.util.Callback<java.lang.Void> r9, java.lang.Object r10) {
            /*
                r6 = this;
                r0 = r6
                r1 = r7
                org.apache.hedwig.server.subscriptions.AbstractSubscriptionManager.this = r1
                r0 = r6
                r1 = r7
                org.apache.hedwig.server.common.TopicOpQueuer r1 = r1.queuer
                r2 = r1
                java.lang.Class r2 = r2.getClass()
                r2 = r8
                r3 = r9
                r4 = r10
                r0.<init>(r2, r3, r4)
                return
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.hedwig.server.subscriptions.AbstractSubscriptionManager.ReleaseOp.<init>(org.apache.hedwig.server.subscriptions.AbstractSubscriptionManager, com.google.protobuf.ByteString, org.apache.hedwig.util.Callback, java.lang.Object):void");
        }

        @Override // java.lang.Runnable
        public void run() {
            Callback<Void> callback = new Callback<Void>() { // from class: org.apache.hedwig.server.subscriptions.AbstractSubscriptionManager.ReleaseOp.1
                public void operationFinished(Object obj, Void r6) {
                    AbstractSubscriptionManager.logger.info("Finished update subscription states when losting topic " + ReleaseOp.this.topic.toStringUtf8());
                    finish();
                }

                public void operationFailed(Object obj, PubSubException pubSubException) {
                    AbstractSubscriptionManager.logger.warn("Error when releasing topic : " + ReleaseOp.this.topic.toStringUtf8(), pubSubException);
                    finish();
                }

                private void finish() {
                    Map<ByteString, InMemorySubscriptionState> remove = AbstractSubscriptionManager.this.top2sub2seq.remove(ReleaseOp.this.topic);
                    if (null != remove) {
                        for (ByteString byteString : remove.keySet()) {
                            if (AbstractSubscriptionManager.logger.isDebugEnabled()) {
                                AbstractSubscriptionManager.logger.debug("Stop serving subscriber (" + ReleaseOp.this.topic.toStringUtf8() + ", " + byteString.toStringUtf8() + ") when losing topic");
                            }
                            if (null != AbstractSubscriptionManager.this.dm) {
                                AbstractSubscriptionManager.this.dm.stopServingSubscriber(ReleaseOp.this.topic, byteString, PubSubProtocol.SubscriptionEvent.TOPIC_MOVED, AbstractSubscriptionManager.this.noopCallback, null);
                            }
                        }
                    }
                    if (AbstractSubscriptionManager.logger.isDebugEnabled()) {
                        AbstractSubscriptionManager.logger.debug("Stop serving topic " + ReleaseOp.this.topic.toStringUtf8());
                    }
                    AbstractSubscriptionManager.this.notifyLastLocalUnsubscribe(ReleaseOp.this.topic);
                    ReleaseOp.this.cb.operationFinished(ReleaseOp.this.ctx, (Object) null);
                }
            };
            if (AbstractSubscriptionManager.logger.isDebugEnabled()) {
                AbstractSubscriptionManager.logger.debug("Try to update subscription states when losing topic " + this.topic.toStringUtf8());
            }
            AbstractSubscriptionManager.this.updateSubscriptionStates(this.topic, callback, this.ctx);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager$SubscribeOp.class */
    public class SubscribeOp extends TopicOpQueuer.AsynchronousOp<PubSubProtocol.SubscriptionData> {
        PubSubProtocol.SubscribeRequest subRequest;
        PubSubProtocol.MessageSeqId consumeSeqId;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* renamed from: org.apache.hedwig.server.subscriptions.AbstractSubscriptionManager$SubscribeOp$2, reason: invalid class name */
        /* loaded from: input_file:org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager$SubscribeOp$2.class */
        public class AnonymousClass2 implements Callback<Version> {
            final /* synthetic */ ByteString val$subscriberId;
            final /* synthetic */ Map val$topicSubscriptions;
            final /* synthetic */ PubSubProtocol.SubscriptionData val$subData;

            AnonymousClass2(ByteString byteString, Map map, PubSubProtocol.SubscriptionData subscriptionData) {
                this.val$subscriberId = byteString;
                this.val$topicSubscriptions = map;
                this.val$subData = subscriptionData;
            }

            public void operationFailed(Object obj, PubSubException pubSubException) {
                SubscribeOp.this.cb.operationFailed(obj, pubSubException);
            }

            public void operationFinished(Object obj, final Version version) {
                Callback<Void> callback = new Callback<Void>() { // from class: org.apache.hedwig.server.subscriptions.AbstractSubscriptionManager.SubscribeOp.2.1
                    public void operationFailed(final Object obj2, final PubSubException pubSubException) {
                        AbstractSubscriptionManager.logger.error("subscription for subscriber " + AnonymousClass2.this.val$subscriberId.toStringUtf8() + " to topic " + SubscribeOp.this.topic.toStringUtf8() + " failed due to failed listener callback", pubSubException);
                        AbstractSubscriptionManager.this.deleteSubscriptionData(SubscribeOp.this.topic, AnonymousClass2.this.val$subscriberId, version, new Callback<Void>() { // from class: org.apache.hedwig.server.subscriptions.AbstractSubscriptionManager.SubscribeOp.2.1.1
                            public void operationFinished(Object obj3, Void r4) {
                                finish();
                            }

                            public void operationFailed(Object obj3, PubSubException pubSubException2) {
                                AbstractSubscriptionManager.logger.error("Remove subscription for subscriber " + AnonymousClass2.this.val$subscriberId.toStringUtf8() + " to topic " + SubscribeOp.this.topic.toStringUtf8() + " failed : ", pubSubException2);
                                finish();
                            }

                            private void finish() {
                                SubscribeOp.this.cb.operationFailed(obj2, pubSubException);
                            }
                        }, obj2);
                    }

                    public void operationFinished(Object obj2, Void r9) {
                        AnonymousClass2.this.val$topicSubscriptions.put(AnonymousClass2.this.val$subscriberId, new InMemorySubscriptionState(AnonymousClass2.this.val$subData, version));
                        AbstractSubscriptionManager.this.updateMessageBound(SubscribeOp.this.topic);
                        SubscribeOp.this.cb.operationFinished(obj2, AnonymousClass2.this.val$subData);
                    }
                };
                if (SubscriptionStateUtils.isHubSubscriber(SubscribeOp.this.subRequest.getSubscriberId()) || AbstractSubscriptionManager.hasLocalSubscriptions(this.val$topicSubscriptions)) {
                    callback.operationFinished(obj, (Object) null);
                } else {
                    AbstractSubscriptionManager.this.notifyFirstLocalSubscribe(SubscribeOp.this.topic, SubscribeOp.this.subRequest.getSynchronous(), callback, obj);
                }
            }
        }

        /* JADX WARN: Illegal instructions before constructor call */
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public SubscribeOp(com.google.protobuf.ByteString r8, org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest r9, org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId r10, org.apache.hedwig.util.Callback<org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData> r11, java.lang.Object r12) {
            /*
                r6 = this;
                r0 = r6
                r1 = r7
                org.apache.hedwig.server.subscriptions.AbstractSubscriptionManager.this = r1
                r0 = r6
                r1 = r7
                org.apache.hedwig.server.common.TopicOpQueuer r1 = r1.queuer
                r2 = r1
                java.lang.Class r2 = r2.getClass()
                r2 = r8
                r3 = r11
                r4 = r12
                r0.<init>(r2, r3, r4)
                r0 = r6
                r1 = r9
                r0.subRequest = r1
                r0 = r6
                r1 = r10
                r0.consumeSeqId = r1
                return
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.hedwig.server.subscriptions.AbstractSubscriptionManager.SubscribeOp.<init>(org.apache.hedwig.server.subscriptions.AbstractSubscriptionManager, com.google.protobuf.ByteString, org.apache.hedwig.protocol.PubSubProtocol$SubscribeRequest, org.apache.hedwig.protocol.PubSubProtocol$MessageSeqId, org.apache.hedwig.util.Callback, java.lang.Object):void");
        }

        @Override // java.lang.Runnable
        public void run() {
            Map<ByteString, InMemorySubscriptionState> map = AbstractSubscriptionManager.this.top2sub2seq.get(this.topic);
            if (map == null) {
                this.cb.operationFailed(this.ctx, new PubSubException.ServerNotResponsibleForTopicException(""));
                return;
            }
            final ByteString subscriberId = this.subRequest.getSubscriberId();
            final InMemorySubscriptionState inMemorySubscriptionState = map.get(subscriberId);
            PubSubProtocol.SubscribeRequest.CreateOrAttach createOrAttach = this.subRequest.getCreateOrAttach();
            if (inMemorySubscriptionState == null) {
                if (createOrAttach.equals(PubSubProtocol.SubscribeRequest.CreateOrAttach.ATTACH)) {
                    String str = "Topic: " + this.topic.toStringUtf8() + " subscriberId: " + subscriberId.toStringUtf8() + " requested attaching to an existing subscription but it is not subscribed";
                    AbstractSubscriptionManager.logger.error(str);
                    this.cb.operationFailed(this.ctx, new PubSubException.ClientNotSubscribedException(str));
                    return;
                } else {
                    PubSubProtocol.SubscriptionState.Builder msgId = PubSubProtocol.SubscriptionState.newBuilder().setMsgId(this.consumeSeqId);
                    PubSubProtocol.SubscriptionPreferences.Builder newBuilder = this.subRequest.hasPreferences() ? PubSubProtocol.SubscriptionPreferences.newBuilder(this.subRequest.getPreferences()) : PubSubProtocol.SubscriptionPreferences.newBuilder();
                    if (this.subRequest.hasMessageBound()) {
                        newBuilder = newBuilder.setMessageBound(this.subRequest.getMessageBound());
                    }
                    PubSubProtocol.SubscriptionData build = PubSubProtocol.SubscriptionData.newBuilder().setState(msgId).setPreferences(newBuilder).build();
                    AbstractSubscriptionManager.this.createSubscriptionData(this.topic, subscriberId, build, new AnonymousClass2(subscriberId, map, build), this.ctx);
                    return;
                }
            }
            if (createOrAttach.equals(PubSubProtocol.SubscribeRequest.CreateOrAttach.CREATE)) {
                String str2 = "Topic: " + this.topic.toStringUtf8() + " subscriberId: " + subscriberId.toStringUtf8() + " requested creating a subscription but it is already subscribed with state: " + SubscriptionStateUtils.toString(inMemorySubscriptionState.getSubscriptionState());
                AbstractSubscriptionManager.logger.error(str2);
                this.cb.operationFailed(this.ctx, new PubSubException.ClientAlreadySubscribedException(str2));
            } else {
                if (this.subRequest.hasPreferences() && inMemorySubscriptionState.updatePreferences(this.subRequest.getPreferences())) {
                    AbstractSubscriptionManager.this.updateSubscriptionPreferences(this.topic, subscriberId, inMemorySubscriptionState, new Callback<Void>() { // from class: org.apache.hedwig.server.subscriptions.AbstractSubscriptionManager.SubscribeOp.1
                        public void operationFailed(Object obj, PubSubException pubSubException) {
                            SubscribeOp.this.cb.operationFailed(obj, pubSubException);
                        }

                        public void operationFinished(Object obj, Void r6) {
                            if (AbstractSubscriptionManager.logger.isDebugEnabled()) {
                                AbstractSubscriptionManager.logger.debug("Topic: " + SubscribeOp.this.topic.toStringUtf8() + " subscriberId: " + subscriberId.toStringUtf8() + " attaching to subscription with state: " + SubscriptionStateUtils.toString(inMemorySubscriptionState.getSubscriptionState()) + ", with preferences: " + SubscriptionStateUtils.toString(inMemorySubscriptionState.getSubscriptionPreferences()));
                            }
                            AbstractSubscriptionManager.this.updateMessageBound(SubscribeOp.this.topic);
                            SubscribeOp.this.cb.operationFinished(obj, inMemorySubscriptionState.toSubscriptionData());
                        }
                    }, this.ctx);
                    return;
                }
                if (AbstractSubscriptionManager.logger.isDebugEnabled()) {
                    AbstractSubscriptionManager.logger.debug("Topic: " + this.topic.toStringUtf8() + " subscriberId: " + subscriberId.toStringUtf8() + " attaching to subscription with state: " + SubscriptionStateUtils.toString(inMemorySubscriptionState.getSubscriptionState()) + ", with preferences: " + SubscriptionStateUtils.toString(inMemorySubscriptionState.getSubscriptionPreferences()));
                }
                this.cb.operationFinished(this.ctx, inMemorySubscriptionState.toSubscriptionData());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager$UnsubscribeOp.class */
    public class UnsubscribeOp extends TopicOpQueuer.AsynchronousOp<Void> {
        ByteString subscriberId;

        /* JADX WARN: Illegal instructions before constructor call */
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public UnsubscribeOp(com.google.protobuf.ByteString r8, com.google.protobuf.ByteString r9, org.apache.hedwig.util.Callback<java.lang.Void> r10, java.lang.Object r11) {
            /*
                r6 = this;
                r0 = r6
                r1 = r7
                org.apache.hedwig.server.subscriptions.AbstractSubscriptionManager.this = r1
                r0 = r6
                r1 = r7
                org.apache.hedwig.server.common.TopicOpQueuer r1 = r1.queuer
                r2 = r1
                java.lang.Class r2 = r2.getClass()
                r2 = r8
                r3 = r10
                r4 = r11
                r0.<init>(r2, r3, r4)
                r0 = r6
                r1 = r9
                r0.subscriberId = r1
                return
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.hedwig.server.subscriptions.AbstractSubscriptionManager.UnsubscribeOp.<init>(org.apache.hedwig.server.subscriptions.AbstractSubscriptionManager, com.google.protobuf.ByteString, com.google.protobuf.ByteString, org.apache.hedwig.util.Callback, java.lang.Object):void");
        }

        @Override // java.lang.Runnable
        public void run() {
            final Map<ByteString, InMemorySubscriptionState> map = AbstractSubscriptionManager.this.top2sub2seq.get(this.topic);
            if (map == null) {
                this.cb.operationFailed(this.ctx, new PubSubException.ServerNotResponsibleForTopicException(""));
            } else if (map.containsKey(this.subscriberId)) {
                AbstractSubscriptionManager.this.deleteSubscriptionData(this.topic, this.subscriberId, map.get(this.subscriberId).getVersion(), new Callback<Void>() { // from class: org.apache.hedwig.server.subscriptions.AbstractSubscriptionManager.UnsubscribeOp.1
                    public void operationFailed(Object obj, PubSubException pubSubException) {
                        UnsubscribeOp.this.cb.operationFailed(obj, pubSubException);
                    }

                    public void operationFinished(Object obj, Void r6) {
                        map.remove(UnsubscribeOp.this.subscriberId);
                        if (!SubscriptionStateUtils.isHubSubscriber(UnsubscribeOp.this.subscriberId) && !AbstractSubscriptionManager.hasLocalSubscriptions(map)) {
                            AbstractSubscriptionManager.this.notifyLastLocalUnsubscribe(UnsubscribeOp.this.topic);
                        }
                        AbstractSubscriptionManager.this.updateMessageBound(UnsubscribeOp.this.topic);
                        UnsubscribeOp.this.cb.operationFinished(obj, (Object) null);
                    }
                }, this.ctx);
            } else {
                this.cb.operationFailed(this.ctx, new PubSubException.ClientNotSubscribedException(""));
            }
        }
    }

    public AbstractSubscriptionManager(ServerConfiguration serverConfiguration, TopicManager topicManager, PersistenceManager persistenceManager, DeliveryManager deliveryManager, ScheduledExecutorService scheduledExecutorService) {
        this.cfg = serverConfiguration;
        this.queuer = new TopicOpQueuer(scheduledExecutorService);
        topicManager.addTopicOwnershipChangeListener(this);
        this.pm = persistenceManager;
        this.dm = deliveryManager;
        if (persistenceManager != null) {
            this.timer.schedule(new MessagesConsumedTask(), 0L, serverConfiguration.getMessagesConsumedThreadRunInterval());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void notifyFirstLocalSubscribe(ByteString byteString, boolean z, Callback<Void> callback, Object obj) {
        Callback<Void> multiCallback = CallbackUtils.multiCallback(this.listeners.size(), callback, obj);
        Iterator<SubscriptionEventListener> it = this.listeners.iterator();
        while (it.hasNext()) {
            it.next().onFirstLocalSubscribe(byteString, z, multiCallback);
        }
    }

    @Override // org.apache.hedwig.server.topics.TopicOwnershipChangeListener
    public void acquiredTopic(ByteString byteString, Callback<Void> callback, Object obj) {
        this.queuer.pushAndMaybeRun(byteString, new AcquireOp(this, byteString, callback, obj));
    }

    void updateSubscriptionStates(ByteString byteString, Callback<Void> callback, Object obj) {
        Map<ByteString, InMemorySubscriptionState> map = this.top2sub2seq.get(byteString);
        if (null == map) {
            callback.operationFinished(obj, (Object) null);
            return;
        }
        Callback<Void> multiCallback = CallbackUtils.multiCallback(map.size(), callback, obj);
        for (Map.Entry<ByteString, InMemorySubscriptionState> entry : map.entrySet()) {
            InMemorySubscriptionState value = entry.getValue();
            if (value.setLastConsumeSeqIdImmediately()) {
                updateSubscriptionState(byteString, entry.getKey(), value, multiCallback, obj);
            } else {
                multiCallback.operationFinished(obj, (Object) null);
            }
        }
    }

    @Override // org.apache.hedwig.server.topics.TopicOwnershipChangeListener
    public void lostTopic(ByteString byteString) {
        this.queuer.pushAndMaybeRun(byteString, new ReleaseOp(this, byteString, this.noopCallback, null));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void notifyLastLocalUnsubscribe(ByteString byteString) {
        Iterator<SubscriptionEventListener> it = this.listeners.iterator();
        while (it.hasNext()) {
            it.next().onLastLocalUnsubscribe(byteString);
        }
    }

    protected abstract void readSubscriptions(ByteString byteString, Callback<Map<ByteString, InMemorySubscriptionState>> callback, Object obj);

    protected abstract void readSubscriptionData(ByteString byteString, ByteString byteString2, Callback<InMemorySubscriptionState> callback, Object obj);

    /* JADX INFO: Access modifiers changed from: private */
    public static boolean hasLocalSubscriptions(Map<ByteString, InMemorySubscriptionState> map) {
        Iterator<ByteString> it = map.keySet().iterator();
        while (it.hasNext()) {
            if (!SubscriptionStateUtils.isHubSubscriber(it.next())) {
                return true;
            }
        }
        return false;
    }

    public void updateMessageBound(ByteString byteString) {
        Map<ByteString, InMemorySubscriptionState> map = this.top2sub2seq.get(byteString);
        if (map == null) {
            return;
        }
        int i = Integer.MIN_VALUE;
        Iterator<Map.Entry<ByteString, InMemorySubscriptionState>> it = map.entrySet().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            Map.Entry<ByteString, InMemorySubscriptionState> next = it.next();
            if (!next.getValue().getSubscriptionPreferences().hasMessageBound()) {
                i = Integer.MIN_VALUE;
                break;
            }
            i = Math.max(i, next.getValue().getSubscriptionPreferences().getMessageBound());
        }
        if (i == Integer.MIN_VALUE) {
            this.pm.clearMessageBound(byteString);
        } else {
            this.pm.setMessageBound(byteString, Integer.valueOf(i));
        }
    }

    @Override // org.apache.hedwig.server.subscriptions.SubscriptionManager
    public void serveSubscribeRequest(ByteString byteString, PubSubProtocol.SubscribeRequest subscribeRequest, PubSubProtocol.MessageSeqId messageSeqId, Callback<PubSubProtocol.SubscriptionData> callback, Object obj) {
        this.queuer.pushAndMaybeRun(byteString, new SubscribeOp(this, byteString, subscribeRequest, messageSeqId, callback, obj));
    }

    @Override // org.apache.hedwig.server.subscriptions.SubscriptionManager
    public void setConsumeSeqIdForSubscriber(ByteString byteString, ByteString byteString2, PubSubProtocol.MessageSeqId messageSeqId, Callback<Void> callback, Object obj) {
        this.queuer.pushAndMaybeRun(byteString, new ConsumeOp(this, byteString, byteString2, messageSeqId, callback, obj));
    }

    @Override // org.apache.hedwig.server.subscriptions.SubscriptionManager
    public void closeSubscription(ByteString byteString, ByteString byteString2, Callback<Void> callback, Object obj) {
        this.queuer.pushAndMaybeRun(byteString, new CloseSubscriptionOp(this, byteString, byteString2, callback, obj));
    }

    @Override // org.apache.hedwig.server.subscriptions.SubscriptionManager
    public void unsubscribe(ByteString byteString, ByteString byteString2, Callback<Void> callback, Object obj) {
        this.queuer.pushAndMaybeRun(byteString, new UnsubscribeOp(this, byteString, byteString2, callback, obj));
    }

    @Override // org.apache.hedwig.server.subscriptions.SubscriptionManager
    public void addListener(SubscriptionEventListener subscriptionEventListener) {
        this.listeners.add(subscriptionEventListener);
    }

    @Override // org.apache.hedwig.server.subscriptions.SubscriptionManager
    public void stop() {
        this.timer.cancel();
        try {
            final LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
            Iterator<ByteString> it = this.top2sub2seq.keySet().iterator();
            while (it.hasNext()) {
                updateSubscriptionStates(it.next(), new Callback<Void>() { // from class: org.apache.hedwig.server.subscriptions.AbstractSubscriptionManager.1
                    public void operationFinished(Object obj, Void r5) {
                        ConcurrencyUtils.put(linkedBlockingQueue, true);
                    }

                    public void operationFailed(Object obj, PubSubException pubSubException) {
                        ConcurrencyUtils.put(linkedBlockingQueue, false);
                    }
                }, null);
                linkedBlockingQueue.take();
            }
        } catch (InterruptedException e) {
            logger.warn("Error during updating subscription states : ", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void updateSubscriptionState(final ByteString byteString, final ByteString byteString2, final InMemorySubscriptionState inMemorySubscriptionState, final Callback<Void> callback, Object obj) {
        Callback<Version> callback2 = new Callback<Version>() { // from class: org.apache.hedwig.server.subscriptions.AbstractSubscriptionManager.2
            public void operationFinished(Object obj2, Version version) {
                inMemorySubscriptionState.setVersion(version);
                callback.operationFinished(obj2, (Object) null);
            }

            public void operationFailed(Object obj2, PubSubException pubSubException) {
                if (pubSubException instanceof PubSubException.BadVersionException) {
                    AbstractSubscriptionManager.this.readSubscriptionData(byteString, byteString2, new Callback<InMemorySubscriptionState>() { // from class: org.apache.hedwig.server.subscriptions.AbstractSubscriptionManager.2.1
                        public void operationFinished(Object obj3, InMemorySubscriptionState inMemorySubscriptionState2) {
                            inMemorySubscriptionState.setVersion(inMemorySubscriptionState2.getVersion());
                            AbstractSubscriptionManager.this.updateSubscriptionState(byteString, byteString2, inMemorySubscriptionState, callback, obj3);
                        }

                        public void operationFailed(Object obj3, PubSubException pubSubException2) {
                            callback.operationFailed(obj3, pubSubException2);
                        }
                    }, obj2);
                } else {
                    callback.operationFailed(obj2, pubSubException);
                }
            }
        };
        if (isPartialUpdateSupported()) {
            updateSubscriptionData(byteString, byteString2, PubSubProtocol.SubscriptionData.newBuilder().setState(inMemorySubscriptionState.getSubscriptionState()).build(), inMemorySubscriptionState.getVersion(), callback2, obj);
        } else {
            replaceSubscriptionData(byteString, byteString2, inMemorySubscriptionState.toSubscriptionData(), inMemorySubscriptionState.getVersion(), callback2, obj);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void updateSubscriptionPreferences(final ByteString byteString, final ByteString byteString2, final InMemorySubscriptionState inMemorySubscriptionState, final Callback<Void> callback, Object obj) {
        Callback<Version> callback2 = new Callback<Version>() { // from class: org.apache.hedwig.server.subscriptions.AbstractSubscriptionManager.3
            public void operationFinished(Object obj2, Version version) {
                inMemorySubscriptionState.setVersion(version);
                callback.operationFinished(obj2, (Object) null);
            }

            public void operationFailed(Object obj2, PubSubException pubSubException) {
                if (pubSubException instanceof PubSubException.BadVersionException) {
                    AbstractSubscriptionManager.this.readSubscriptionData(byteString, byteString2, new Callback<InMemorySubscriptionState>() { // from class: org.apache.hedwig.server.subscriptions.AbstractSubscriptionManager.3.1
                        public void operationFinished(Object obj3, InMemorySubscriptionState inMemorySubscriptionState2) {
                            inMemorySubscriptionState.setVersion(inMemorySubscriptionState2.getVersion());
                            AbstractSubscriptionManager.this.updateSubscriptionPreferences(byteString, byteString2, inMemorySubscriptionState, callback, obj3);
                        }

                        public void operationFailed(Object obj3, PubSubException pubSubException2) {
                            callback.operationFailed(obj3, pubSubException2);
                        }
                    }, obj2);
                } else {
                    callback.operationFailed(obj2, pubSubException);
                }
            }
        };
        if (isPartialUpdateSupported()) {
            updateSubscriptionData(byteString, byteString2, PubSubProtocol.SubscriptionData.newBuilder().setPreferences(inMemorySubscriptionState.getSubscriptionPreferences()).build(), inMemorySubscriptionState.getVersion(), callback2, obj);
        } else {
            replaceSubscriptionData(byteString, byteString2, inMemorySubscriptionState.toSubscriptionData(), inMemorySubscriptionState.getVersion(), callback2, obj);
        }
    }

    protected abstract boolean isPartialUpdateSupported();

    protected abstract void createSubscriptionData(ByteString byteString, ByteString byteString2, PubSubProtocol.SubscriptionData subscriptionData, Callback<Version> callback, Object obj);

    protected abstract void updateSubscriptionData(ByteString byteString, ByteString byteString2, PubSubProtocol.SubscriptionData subscriptionData, Version version, Callback<Version> callback, Object obj);

    protected abstract void replaceSubscriptionData(ByteString byteString, ByteString byteString2, PubSubProtocol.SubscriptionData subscriptionData, Version version, Callback<Version> callback, Object obj);

    protected abstract void deleteSubscriptionData(ByteString byteString, ByteString byteString2, Version version, Callback<Void> callback, Object obj);
}
