package org.apache.hedwig.server.topics;

import com.google.protobuf.ByteString;
import java.io.IOException;
import java.net.UnknownHostException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.server.common.ServerConfiguration;
import org.apache.hedwig.server.topics.HubInfo;
import org.apache.hedwig.server.topics.HubServerManager;
import org.apache.hedwig.util.Callback;
import org.apache.hedwig.util.ConcurrencyUtils;
import org.apache.hedwig.util.Either;
import org.apache.hedwig.util.HedwigSocketAddress;
import org.apache.hedwig.zookeeper.SafeAsyncZKCallback;
import org.apache.hedwig.zookeeper.ZkUtils;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hedwig/server/topics/ZkTopicManager.class */
public class ZkTopicManager extends AbstractTopicManager implements TopicManager {
    static Logger logger = LoggerFactory.getLogger(ZkTopicManager.class);
    private ZooKeeper zk;
    private final HubServerManager hubManager;
    private final HubInfo myHubInfo;
    private final HubLoad myHubLoad;
    protected volatile boolean isSuspended;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hedwig/server/topics/ZkTopicManager$ZkGetOwnerOp.class */
    public class ZkGetOwnerOp {
        ByteString topic;
        boolean shouldClaim;
        Callback<HedwigSocketAddress> cb;
        Object ctx;
        String hubPath;

        public ZkGetOwnerOp(ByteString byteString, boolean z, Callback<HedwigSocketAddress> callback, Object obj) {
            this.topic = byteString;
            this.shouldClaim = z;
            this.cb = callback;
            this.ctx = obj;
            this.hubPath = ZkTopicManager.this.hubPath(byteString);
        }

        public void choose() {
            ZkTopicManager.this.hubManager.chooseLeastLoadedHub(new Callback<HubInfo>() { // from class: org.apache.hedwig.server.topics.ZkTopicManager.ZkGetOwnerOp.1
                public void operationFinished(Object obj, HubInfo hubInfo) {
                    ZkTopicManager.logger.info("{} : Least loaded owner {} is chosen for topic {}", new Object[]{ZkTopicManager.this.addr, hubInfo, ZkGetOwnerOp.this.topic.toStringUtf8()});
                    if (hubInfo.getAddress().equals(ZkTopicManager.this.addr)) {
                        ZkGetOwnerOp.this.claim();
                    } else {
                        ZkGetOwnerOp.this.cb.operationFinished(ZkGetOwnerOp.this.ctx, hubInfo.getAddress());
                    }
                }

                public void operationFailed(Object obj, PubSubException pubSubException) {
                    ZkTopicManager.logger.error("Failed to choose least loaded hub server for topic " + ZkGetOwnerOp.this.topic.toStringUtf8() + " : ", pubSubException);
                    ZkGetOwnerOp.this.cb.operationFailed(obj, pubSubException);
                }
            }, null);
        }

        public void claimOrChoose() {
            if (this.shouldClaim) {
                claim();
            } else {
                choose();
            }
        }

        public void read() {
            ZkTopicManager.this.zk.getData(this.hubPath, false, new SafeAsyncZKCallback.DataCallback() { // from class: org.apache.hedwig.server.topics.ZkTopicManager.ZkGetOwnerOp.2
                @Override // org.apache.hedwig.zookeeper.SafeAsyncZKCallback.DataCallback
                public void safeProcessResult(int i, String str, Object obj, byte[] bArr, Stat stat) {
                    HedwigSocketAddress address;
                    if (i == KeeperException.Code.NONODE.intValue()) {
                        ZkGetOwnerOp.this.claimOrChoose();
                        return;
                    }
                    if (i != KeeperException.Code.OK.intValue()) {
                        ZkGetOwnerOp.this.cb.operationFailed(obj, new PubSubException.ServiceDownException(ZkUtils.logErrorAndCreateZKException("Could not read ownership for topic: " + ZkGetOwnerOp.this.topic.toStringUtf8(), str, i)));
                        return;
                    }
                    try {
                        address = HubInfo.parse(new String(bArr)).getAddress();
                    } catch (HubInfo.InvalidHubInfoException e) {
                        ZkTopicManager.logger.info("Discovered invalid hub info for topic: " + ZkGetOwnerOp.this.topic.toStringUtf8() + ", will delete it : ", e);
                    }
                    if (address.equals(ZkTopicManager.this.addr)) {
                        ZkTopicManager.logger.info("Discovered stale self-node for topic: " + ZkGetOwnerOp.this.topic.toStringUtf8() + ", will delete it");
                        ZkTopicManager.this.zk.delete(ZkGetOwnerOp.this.hubPath, stat.getVersion(), new AsyncCallback.VoidCallback() { // from class: org.apache.hedwig.server.topics.ZkTopicManager.ZkGetOwnerOp.2.1
                            public void processResult(int i2, String str2, Object obj2) {
                                if (KeeperException.Code.OK.intValue() == i2 || KeeperException.Code.NONODE.intValue() == i2) {
                                    ZkGetOwnerOp.this.claimOrChoose();
                                } else {
                                    ZkGetOwnerOp.this.cb.operationFailed(obj2, new PubSubException.ServiceDownException(ZkUtils.logErrorAndCreateZKException("Could not delete self node for topic: " + ZkGetOwnerOp.this.topic.toStringUtf8(), str2, i2)));
                                }
                            }
                        }, obj);
                    } else {
                        if (ZkTopicManager.logger.isDebugEnabled()) {
                            ZkTopicManager.logger.debug("topic: " + ZkGetOwnerOp.this.topic.toStringUtf8() + " belongs to someone else: " + address);
                        }
                        ZkGetOwnerOp.this.cb.operationFinished(obj, address);
                    }
                }
            }, this.ctx);
        }

        public void claim() {
            if (ZkTopicManager.logger.isDebugEnabled()) {
                ZkTopicManager.logger.debug("claiming topic: " + this.topic.toStringUtf8());
            }
            ZkUtils.createFullPathOptimistic(ZkTopicManager.this.zk, this.hubPath, ZkTopicManager.this.myHubInfo.toString().getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, new SafeAsyncZKCallback.StringCallback() { // from class: org.apache.hedwig.server.topics.ZkTopicManager.ZkGetOwnerOp.3
                @Override // org.apache.hedwig.zookeeper.SafeAsyncZKCallback.StringCallback
                public void safeProcessResult(int i, String str, Object obj, String str2) {
                    if (i == KeeperException.Code.OK.intValue()) {
                        if (ZkTopicManager.logger.isDebugEnabled()) {
                            ZkTopicManager.logger.debug("claimed topic: " + ZkGetOwnerOp.this.topic.toStringUtf8());
                        }
                        ZkTopicManager.this.notifyListenersAndAddToOwnedTopics(ZkGetOwnerOp.this.topic, ZkGetOwnerOp.this.cb, obj);
                        ZkTopicManager.this.hubManager.uploadSelfLoadData(ZkTopicManager.this.myHubLoad.setNumTopics(ZkTopicManager.this.topics.size()));
                        return;
                    }
                    if (i == KeeperException.Code.NODEEXISTS.intValue()) {
                        ZkGetOwnerOp.this.read();
                    } else {
                        ZkGetOwnerOp.this.cb.operationFailed(obj, new PubSubException.ServiceDownException(ZkUtils.logErrorAndCreateZKException("Failed to create ephemeral node to claim ownership of topic: " + ZkGetOwnerOp.this.topic.toStringUtf8(), str, i)));
                    }
                }
            }, this.ctx);
        }
    }

    public ZkTopicManager(ZooKeeper zooKeeper, ServerConfiguration serverConfiguration, ScheduledExecutorService scheduledExecutorService) throws UnknownHostException, PubSubException {
        super(serverConfiguration, scheduledExecutorService);
        this.isSuspended = false;
        this.zk = zooKeeper;
        this.hubManager = new ZkHubServerManager(serverConfiguration, zooKeeper, this.addr);
        this.myHubLoad = new HubLoad(this.topics.size());
        this.hubManager.registerListener(new HubServerManager.ManagerListener() { // from class: org.apache.hedwig.server.topics.ZkTopicManager.1
            @Override // org.apache.hedwig.server.topics.HubServerManager.ManagerListener
            public void onSuspend() {
                ZkTopicManager.this.isSuspended = true;
            }

            @Override // org.apache.hedwig.server.topics.HubServerManager.ManagerListener
            public void onResume() {
                ZkTopicManager.this.isSuspended = false;
            }

            @Override // org.apache.hedwig.server.topics.HubServerManager.ManagerListener
            public void onShutdown() {
                Runtime.getRuntime().exit(1);
            }
        });
        final SynchronousQueue synchronousQueue = new SynchronousQueue();
        this.hubManager.registerSelf(this.myHubLoad, new Callback<HubInfo>() { // from class: org.apache.hedwig.server.topics.ZkTopicManager.2
            public void operationFinished(Object obj, HubInfo hubInfo) {
                ZkTopicManager.logger.info("Successfully registered hub {} with zookeeper", hubInfo);
                ConcurrencyUtils.put(synchronousQueue, Either.of(hubInfo, (PubSubException) null));
            }

            public void operationFailed(Object obj, PubSubException pubSubException) {
                ZkTopicManager.logger.error("Failed to register hub with zookeeper", pubSubException);
                ConcurrencyUtils.put(synchronousQueue, Either.of((HubInfo) null, pubSubException));
            }
        }, null);
        Either either = (Either) ConcurrencyUtils.take(synchronousQueue);
        PubSubException pubSubException = (PubSubException) either.right();
        if (pubSubException != null) {
            throw pubSubException;
        }
        this.myHubInfo = (HubInfo) either.left();
    }

    String hubPath(ByteString byteString) {
        return this.cfg.getZkTopicPath(new StringBuilder(), byteString).append("/hub").toString();
    }

    @Override // org.apache.hedwig.server.topics.AbstractTopicManager
    protected void realGetOwner(ByteString byteString, boolean z, Callback<HedwigSocketAddress> callback, Object obj) {
        if (this.isSuspended) {
            callback.operationFailed(obj, new PubSubException.ServiceDownException("ZKTopicManager service is temporarily suspended!"));
        } else if (this.topics.contains(byteString)) {
            callback.operationFinished(obj, this.addr);
        } else {
            new ZkGetOwnerOp(byteString, z, callback, obj).read();
        }
    }

    @Override // org.apache.hedwig.server.topics.AbstractTopicManager
    protected void postReleaseCleanup(final ByteString byteString, final Callback<Void> callback, Object obj) {
        this.zk.getData(hubPath(byteString), false, new SafeAsyncZKCallback.DataCallback() { // from class: org.apache.hedwig.server.topics.ZkTopicManager.3
            @Override // org.apache.hedwig.zookeeper.SafeAsyncZKCallback.DataCallback
            public void safeProcessResult(int i, String str, Object obj2, byte[] bArr, Stat stat) {
                if (i == KeeperException.Code.NONODE.intValue()) {
                    ZkTopicManager.logger.warn("While deleting self-node for topic: " + byteString.toStringUtf8() + ", node not found");
                    callback.operationFinished(obj2, (Object) null);
                    return;
                }
                if (i != KeeperException.Code.OK.intValue()) {
                    callback.operationFailed(obj2, new PubSubException.ServiceDownException(ZkUtils.logErrorAndCreateZKException("Failed to delete self-ownership node for topic: " + byteString.toStringUtf8(), str, i)));
                    return;
                }
                String str2 = new String(bArr);
                try {
                    HedwigSocketAddress address = HubInfo.parse(str2).getAddress();
                    if (address.equals(ZkTopicManager.this.addr)) {
                        ZkTopicManager.this.zk.delete(str, stat.getVersion(), new SafeAsyncZKCallback.VoidCallback() { // from class: org.apache.hedwig.server.topics.ZkTopicManager.3.1
                            @Override // org.apache.hedwig.zookeeper.SafeAsyncZKCallback.VoidCallback
                            public void safeProcessResult(int i2, String str3, Object obj3) {
                                if (i2 == KeeperException.Code.OK.intValue() || i2 == KeeperException.Code.NONODE.intValue()) {
                                    callback.operationFinished(obj3, (Object) null);
                                } else {
                                    callback.operationFailed(obj3, new PubSubException.ServiceDownException(ZkUtils.logErrorAndCreateZKException("Failed to delete self-ownership node for topic: " + byteString.toStringUtf8(), str3, i2)));
                                }
                            }
                        }, obj2);
                    } else {
                        ZkTopicManager.logger.warn("Wanted to delete self-node for topic: " + byteString.toStringUtf8() + " but node for " + address + " found, leaving untouched");
                        callback.operationFinished(obj2, (Object) null);
                    }
                } catch (HubInfo.InvalidHubInfoException e) {
                    ZkTopicManager.logger.info("Invalid hub info " + str2 + " found when release topic " + byteString.toStringUtf8() + ". Leaving untouched until next acquire action.");
                    callback.operationFinished(obj2, (Object) null);
                }
            }
        }, obj);
    }

    @Override // org.apache.hedwig.server.topics.AbstractTopicManager, org.apache.hedwig.server.topics.TopicManager
    public void stop() {
        try {
            this.hubManager.unregisterSelf();
        } catch (IOException e) {
            logger.error("Error unregistering hub server :", e);
        }
        super.stop();
    }
}
