package org.apache.hedwig.server.topics;

import com.google.protobuf.ByteString;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.server.common.ServerConfiguration;
import org.apache.hedwig.server.meta.MetadataManagerFactory;
import org.apache.hedwig.server.meta.TopicOwnershipManager;
import org.apache.hedwig.server.topics.HubServerManager;
import org.apache.hedwig.util.Callback;
import org.apache.hedwig.util.ConcurrencyUtils;
import org.apache.hedwig.util.Either;
import org.apache.hedwig.util.HedwigSocketAddress;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hedwig/server/topics/MMTopicManager.class */
public class MMTopicManager extends AbstractTopicManager implements TopicManager {
    static Logger logger = LoggerFactory.getLogger(MMTopicManager.class);
    private final TopicOwnershipManager mm;
    private final HubServerManager hubManager;
    private final HubInfo myHubInfo;
    private final HubLoad myHubLoad;
    protected volatile boolean isSuspended;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hedwig/server/topics/MMTopicManager$MMGetOwnerOp.class */
    public class MMGetOwnerOp {
        ByteString topic;
        Callback<HedwigSocketAddress> cb;
        Object ctx;

        public MMGetOwnerOp(ByteString byteString, Callback<HedwigSocketAddress> callback, Object obj) {
            this.topic = byteString;
            this.cb = callback;
            this.ctx = obj;
        }

        protected void read() {
            MMTopicManager.this.mm.readOwnerInfo(this.topic, new Callback<Versioned<HubInfo>>() { // from class: org.apache.hedwig.server.topics.MMTopicManager.MMGetOwnerOp.1
                public void operationFinished(Object obj, Versioned<HubInfo> versioned) {
                    if (null == versioned) {
                        MMTopicManager.logger.info("{} : No owner found for topic {}", new Object[]{MMTopicManager.this.addr, MMGetOwnerOp.this.topic.toStringUtf8()});
                        MMGetOwnerOp.this.choose(Version.NEW);
                        return;
                    }
                    final Version version = versioned.getVersion();
                    if (null == versioned.getValue()) {
                        MMTopicManager.logger.info("{} : Invalid owner found for topic {}", new Object[]{MMTopicManager.this.addr, MMGetOwnerOp.this.topic.toStringUtf8()});
                        MMGetOwnerOp.this.choose(version);
                        return;
                    }
                    final HubInfo hubInfo = (HubInfo) versioned.getValue();
                    MMTopicManager.logger.info("{} : Read owner of topic {} : {}", new Object[]{MMTopicManager.this.addr, MMGetOwnerOp.this.topic.toStringUtf8(), hubInfo});
                    MMTopicManager.logger.info("{}, {}", new Object[]{hubInfo, MMTopicManager.this.myHubInfo});
                    if (!hubInfo.getAddress().equals(MMTopicManager.this.addr)) {
                        MMTopicManager.logger.info("{} : Check whether owner {} for topic {} is still alive.", new Object[]{MMTopicManager.this.addr, hubInfo, MMGetOwnerOp.this.topic.toStringUtf8()});
                        MMTopicManager.this.hubManager.isHubAlive(hubInfo, new Callback<Boolean>() { // from class: org.apache.hedwig.server.topics.MMTopicManager.MMGetOwnerOp.1.1
                            public void operationFinished(Object obj2, Boolean bool) {
                                if (bool.booleanValue()) {
                                    MMGetOwnerOp.this.cb.operationFinished(obj2, hubInfo.getAddress());
                                } else {
                                    MMGetOwnerOp.this.choose(version);
                                }
                            }

                            public void operationFailed(Object obj2, PubSubException pubSubException) {
                                MMGetOwnerOp.this.cb.operationFailed(obj2, pubSubException);
                            }
                        }, obj);
                    } else if (MMTopicManager.this.myHubInfo.getZxid() == hubInfo.getZxid()) {
                        MMGetOwnerOp.this.claimTopic(obj);
                    } else {
                        MMGetOwnerOp.this.choose(version);
                    }
                }

                public void operationFailed(Object obj, PubSubException pubSubException) {
                    MMGetOwnerOp.this.cb.operationFailed(obj, new PubSubException.ServiceDownException("Could not read ownership for topic " + MMGetOwnerOp.this.topic.toStringUtf8() + " : " + pubSubException.getMessage()));
                }
            }, this.ctx);
        }

        public void claim(Version version) {
            MMTopicManager.logger.info("{} : claiming topic {} 's owner to be {}", new Object[]{MMTopicManager.this.addr, this.topic.toStringUtf8(), MMTopicManager.this.myHubInfo});
            MMTopicManager.this.mm.writeOwnerInfo(this.topic, MMTopicManager.this.myHubInfo, version, new Callback<Version>() { // from class: org.apache.hedwig.server.topics.MMTopicManager.MMGetOwnerOp.2
                public void operationFinished(Object obj, Version version2) {
                    MMGetOwnerOp.this.claimTopic(obj);
                }

                public void operationFailed(Object obj, PubSubException pubSubException) {
                    if (!(pubSubException instanceof PubSubException.NoTopicOwnerInfoException) && !(pubSubException instanceof PubSubException.BadVersionException)) {
                        MMGetOwnerOp.this.cb.operationFailed(obj, new PubSubException.ServiceDownException("Exception when writing owner info to claim ownership of topic " + MMGetOwnerOp.this.topic.toStringUtf8() + " : " + pubSubException.getMessage()));
                    } else {
                        MMTopicManager.logger.info("{} : Some one has claimed topic {} 's owner. Try to read the owner again.", new Object[]{MMTopicManager.this.addr, MMGetOwnerOp.this.topic.toStringUtf8()});
                        MMGetOwnerOp.this.read();
                    }
                }
            }, this.ctx);
        }

        protected void claimTopic(Object obj) {
            MMTopicManager.logger.info("{} : claimed topic {} 's owner to be {}", new Object[]{MMTopicManager.this.addr, this.topic.toStringUtf8(), MMTopicManager.this.myHubInfo});
            MMTopicManager.this.notifyListenersAndAddToOwnedTopics(this.topic, this.cb, obj);
            MMTopicManager.this.hubManager.uploadSelfLoadData(MMTopicManager.this.myHubLoad.setNumTopics(MMTopicManager.this.topics.size()));
        }

        public void choose(final Version version) {
            MMTopicManager.this.hubManager.chooseLeastLoadedHub(new Callback<HubInfo>() { // from class: org.apache.hedwig.server.topics.MMTopicManager.MMGetOwnerOp.3
                public void operationFinished(Object obj, HubInfo hubInfo) {
                    MMTopicManager.logger.info("{} : Least loaded owner {} is chosen for topic {}", new Object[]{MMTopicManager.this.addr, hubInfo, MMGetOwnerOp.this.topic.toStringUtf8()});
                    if (hubInfo.getAddress().equals(MMTopicManager.this.addr)) {
                        MMGetOwnerOp.this.claim(version);
                    } else {
                        MMGetOwnerOp.this.setOwner(hubInfo, version);
                    }
                }

                public void operationFailed(Object obj, PubSubException pubSubException) {
                    MMTopicManager.logger.error("Failed to choose least loaded hub server for topic " + MMGetOwnerOp.this.topic.toStringUtf8() + " : ", pubSubException);
                    MMGetOwnerOp.this.cb.operationFailed(obj, pubSubException);
                }
            }, null);
        }

        public void setOwner(final HubInfo hubInfo, Version version) {
            MMTopicManager.logger.info("{} : setting topic {} 's owner to be {}", new Object[]{MMTopicManager.this.addr, this.topic.toStringUtf8(), hubInfo});
            MMTopicManager.this.mm.writeOwnerInfo(this.topic, hubInfo, version, new Callback<Version>() { // from class: org.apache.hedwig.server.topics.MMTopicManager.MMGetOwnerOp.4
                public void operationFinished(Object obj, Version version2) {
                    MMTopicManager.logger.info("{} : Set topic {} 's owner to be {}", new Object[]{MMTopicManager.this.addr, MMGetOwnerOp.this.topic.toStringUtf8(), hubInfo});
                    MMGetOwnerOp.this.cb.operationFinished(obj, hubInfo.getAddress());
                }

                public void operationFailed(Object obj, PubSubException pubSubException) {
                    if (!(pubSubException instanceof PubSubException.NoTopicOwnerInfoException) && !(pubSubException instanceof PubSubException.BadVersionException)) {
                        MMGetOwnerOp.this.cb.operationFailed(obj, new PubSubException.ServiceDownException("Exception when writing owner info to claim ownership of topic " + MMGetOwnerOp.this.topic.toStringUtf8() + " : " + pubSubException.getMessage()));
                    } else {
                        MMTopicManager.logger.info("{} : Some one has set topic {} 's owner. Try to read the owner again.", new Object[]{MMTopicManager.this.addr, MMGetOwnerOp.this.topic.toStringUtf8()});
                        MMGetOwnerOp.this.read();
                    }
                }
            }, this.ctx);
        }
    }

    public MMTopicManager(ServerConfiguration serverConfiguration, ZooKeeper zooKeeper, MetadataManagerFactory metadataManagerFactory, ScheduledExecutorService scheduledExecutorService) throws UnknownHostException, PubSubException {
        super(serverConfiguration, scheduledExecutorService);
        this.isSuspended = false;
        this.mm = metadataManagerFactory.newTopicOwnershipManager();
        this.hubManager = new ZkHubServerManager(serverConfiguration, zooKeeper, this.addr);
        final SynchronousQueue synchronousQueue = new SynchronousQueue();
        this.myHubLoad = new HubLoad(this.topics.size());
        this.hubManager.registerListener(new HubServerManager.ManagerListener() { // from class: org.apache.hedwig.server.topics.MMTopicManager.1
            @Override // org.apache.hedwig.server.topics.HubServerManager.ManagerListener
            public void onSuspend() {
                MMTopicManager.this.isSuspended = true;
            }

            @Override // org.apache.hedwig.server.topics.HubServerManager.ManagerListener
            public void onResume() {
                MMTopicManager.this.isSuspended = false;
            }

            @Override // org.apache.hedwig.server.topics.HubServerManager.ManagerListener
            public void onShutdown() {
                Runtime.getRuntime().exit(1);
            }
        });
        this.hubManager.registerSelf(this.myHubLoad, new Callback<HubInfo>() { // from class: org.apache.hedwig.server.topics.MMTopicManager.2
            public void operationFinished(Object obj, HubInfo hubInfo) {
                MMTopicManager.logger.info("Successfully registered hub {} with zookeeper", hubInfo);
                ConcurrencyUtils.put(synchronousQueue, Either.of(hubInfo, (PubSubException) null));
            }

            public void operationFailed(Object obj, PubSubException pubSubException) {
                MMTopicManager.logger.error("Failed to register hub with zookeeper", pubSubException);
                ConcurrencyUtils.put(synchronousQueue, Either.of((HubInfo) null, pubSubException));
            }
        }, null);
        Either either = (Either) ConcurrencyUtils.take(synchronousQueue);
        PubSubException pubSubException = (PubSubException) either.right();
        if (pubSubException != null) {
            throw pubSubException;
        }
        this.myHubInfo = (HubInfo) either.left();
        logger.info("Start metadata manager based topic manager with hub id : " + this.myHubInfo);
    }

    @Override // org.apache.hedwig.server.topics.AbstractTopicManager
    protected void realGetOwner(ByteString byteString, boolean z, Callback<HedwigSocketAddress> callback, Object obj) {
        if (this.isSuspended) {
            callback.operationFailed(obj, new PubSubException.ServiceDownException("MMTopicManager service is temporarily suspended!"));
        } else if (this.topics.contains(byteString)) {
            callback.operationFinished(obj, this.addr);
        } else {
            new MMGetOwnerOp(byteString, callback, obj).read();
        }
    }

    @Override // org.apache.hedwig.server.topics.AbstractTopicManager
    protected void postReleaseCleanup(final ByteString byteString, final Callback<Void> callback, Object obj) {
        this.mm.readOwnerInfo(byteString, new Callback<Versioned<HubInfo>>() { // from class: org.apache.hedwig.server.topics.MMTopicManager.3
            public void operationFinished(Object obj2, Versioned<HubInfo> versioned) {
                if (null == versioned) {
                    MMTopicManager.logger.warn("No owner info found when cleaning up topic " + byteString.toStringUtf8());
                    callback.operationFinished(obj2, (Object) null);
                } else if (null == versioned.getValue()) {
                    MMTopicManager.logger.warn("No valid owner info found when cleaning up topic " + byteString.toStringUtf8());
                    callback.operationFinished(obj2, (Object) null);
                } else if (((HubInfo) versioned.getValue()).getAddress().equals(MMTopicManager.this.addr)) {
                    MMTopicManager.this.mm.deleteOwnerInfo(byteString, versioned.getVersion(), new Callback<Void>() { // from class: org.apache.hedwig.server.topics.MMTopicManager.3.1
                        public void operationFinished(Object obj3, Void r6) {
                            callback.operationFinished(obj3, (Object) null);
                        }

                        public void operationFailed(Object obj3, PubSubException pubSubException) {
                            if (pubSubException instanceof PubSubException.NoTopicOwnerInfoException) {
                                MMTopicManager.logger.warn("Wanted to clean up self owner info for topic " + byteString.toStringUtf8() + " but it has been removed.");
                                callback.operationFinished(obj3, (Object) null);
                            } else {
                                MMTopicManager.logger.error("Exception when deleting self-ownership metadata for topic " + byteString.toStringUtf8() + " : ", pubSubException);
                                callback.operationFailed(obj3, new PubSubException.ServiceDownException(pubSubException));
                            }
                        }
                    }, obj2);
                } else {
                    MMTopicManager.logger.warn("Wanted to clean up self owner info for topic " + byteString.toStringUtf8() + " but owner " + versioned + " found, leaving untouched");
                    callback.operationFinished(obj2, (Object) null);
                }
            }

            public void operationFailed(Object obj2, PubSubException pubSubException) {
                MMTopicManager.logger.error("Exception when cleaning up owner info of topic " + byteString.toStringUtf8() + " : ", pubSubException);
                callback.operationFailed(obj2, new PubSubException.ServiceDownException(pubSubException));
            }
        }, obj);
    }

    @Override // org.apache.hedwig.server.topics.AbstractTopicManager, org.apache.hedwig.server.topics.TopicManager
    public void stop() {
        try {
            this.hubManager.unregisterSelf();
        } catch (IOException e) {
            logger.error("Error unregistering hub server " + this.myHubInfo + " : ", e);
        }
        super.stop();
    }
}
