package org.apache.hedwig.server.topics;

import java.io.IOException;
import java.util.List;
import java.util.Random;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.server.common.ServerConfiguration;
import org.apache.hedwig.server.topics.HubLoad;
import org.apache.hedwig.server.topics.HubServerManager;
import org.apache.hedwig.util.Callback;
import org.apache.hedwig.util.HedwigSocketAddress;
import org.apache.hedwig.zookeeper.SafeAsyncZKCallback;
import org.apache.hedwig.zookeeper.ZkUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hedwig/server/topics/ZkHubServerManager.class */
class ZkHubServerManager implements HubServerManager {
    static Logger logger = LoggerFactory.getLogger(ZkHubServerManager.class);
    private final ServerConfiguration conf;
    private final ZooKeeper zk;
    private final HedwigSocketAddress addr;
    private final String ephemeralNodePath;
    private final String hubNodesPath;
    protected HubInfo myHubInfo;
    final Random rand = new Random();
    protected volatile boolean isSuspended = false;
    protected HubServerManager.ManagerListener listener = null;
    SafeAsyncZKCallback.StatCallback loadReportingStatCallback = new SafeAsyncZKCallback.StatCallback() { // from class: org.apache.hedwig.server.topics.ZkHubServerManager.1
        @Override // org.apache.hedwig.zookeeper.SafeAsyncZKCallback.StatCallback
        public void safeProcessResult(int i, String str, Object obj, Stat stat) {
            if (i != KeeperException.Code.OK.intValue()) {
                ZkHubServerManager.logger.warn("Failed to update load information of hub {} in zk", ZkHubServerManager.this.myHubInfo);
            }
        }
    };

    /* loaded from: input_file:org/apache/hedwig/server/topics/ZkHubServerManager$ZkHubsWatcher.class */
    class ZkHubsWatcher implements Watcher {
        ZkHubsWatcher() {
        }

        public void process(WatchedEvent watchedEvent) {
            if (watchedEvent.getType().equals(Watcher.Event.EventType.None)) {
                if (watchedEvent.getState().equals(Watcher.Event.KeeperState.Disconnected)) {
                    ZkHubServerManager.logger.warn("ZK client has been disconnected to the ZK server!");
                    ZkHubServerManager.this.isSuspended = true;
                    if (null != ZkHubServerManager.this.listener) {
                        ZkHubServerManager.this.listener.onSuspend();
                    }
                } else if (watchedEvent.getState().equals(Watcher.Event.KeeperState.SyncConnected)) {
                    if (ZkHubServerManager.this.isSuspended) {
                        ZkHubServerManager.logger.info("ZK client has been reconnected to the ZK server!");
                    }
                    ZkHubServerManager.this.isSuspended = false;
                    if (null != ZkHubServerManager.this.listener) {
                        ZkHubServerManager.this.listener.onResume();
                    }
                }
            }
            if (watchedEvent.getState().equals(Watcher.Event.KeeperState.Expired)) {
                ZkHubServerManager.logger.error("ZK client connection to the ZK server has expired.!");
                if (null != ZkHubServerManager.this.listener) {
                    ZkHubServerManager.this.listener.onShutdown();
                }
            }
        }
    }

    public ZkHubServerManager(ServerConfiguration serverConfiguration, ZooKeeper zooKeeper, HedwigSocketAddress hedwigSocketAddress) {
        this.conf = serverConfiguration;
        this.zk = zooKeeper;
        this.addr = hedwigSocketAddress;
        this.hubNodesPath = this.conf.getZkHostsPrefix(new StringBuilder()).toString();
        this.ephemeralNodePath = getHubZkNodePath(hedwigSocketAddress);
        zooKeeper.register(new ZkHubsWatcher());
    }

    @Override // org.apache.hedwig.server.topics.HubServerManager
    public void registerListener(HubServerManager.ManagerListener managerListener) {
        this.listener = managerListener;
    }

    private String getHubZkNodePath(HedwigSocketAddress hedwigSocketAddress) {
        return this.conf.getZkHostsPrefix(new StringBuilder()).append("/").append(hedwigSocketAddress).toString();
    }

    @Override // org.apache.hedwig.server.topics.HubServerManager
    public void registerSelf(final HubLoad hubLoad, final Callback<HubInfo> callback, Object obj) {
        ZkUtils.createFullPathOptimistic(this.zk, this.ephemeralNodePath, hubLoad.toString().getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, new SafeAsyncZKCallback.StringCallback() { // from class: org.apache.hedwig.server.topics.ZkHubServerManager.2
            @Override // org.apache.hedwig.zookeeper.SafeAsyncZKCallback.StringCallback
            public void safeProcessResult(int i, String str, Object obj2, String str2) {
                if (i == KeeperException.Code.OK.intValue()) {
                    ZkHubServerManager.this.zk.exists(ZkHubServerManager.this.ephemeralNodePath, false, new SafeAsyncZKCallback.StatCallback() { // from class: org.apache.hedwig.server.topics.ZkHubServerManager.2.1
                        @Override // org.apache.hedwig.zookeeper.SafeAsyncZKCallback.StatCallback
                        public void safeProcessResult(int i2, String str3, Object obj3, Stat stat) {
                            if (i2 != KeeperException.Code.OK.intValue()) {
                                callback.operationFailed(obj3, new PubSubException.ServiceDownException("I can't state my hub node after I created it : " + ZkHubServerManager.this.ephemeralNodePath));
                                return;
                            }
                            ZkHubServerManager.this.myHubInfo = new HubInfo(ZkHubServerManager.this.addr, stat.getCzxid());
                            callback.operationFinished(obj3, ZkHubServerManager.this.myHubInfo);
                        }
                    }, obj2);
                } else if (i != KeeperException.Code.NODEEXISTS.intValue()) {
                    callback.operationFailed(obj2, new PubSubException.ServiceDownException(ZkUtils.logErrorAndCreateZKException("Could not create ephemeral node to register hub", ZkHubServerManager.this.ephemeralNodePath, i)));
                } else {
                    ZkHubServerManager.logger.info("Found stale ephemeral node while registering hub with ZK, deleting it");
                    ZkHubServerManager.this.zk.delete(ZkHubServerManager.this.ephemeralNodePath, -1, new SafeAsyncZKCallback.VoidCallback() { // from class: org.apache.hedwig.server.topics.ZkHubServerManager.2.2
                        @Override // org.apache.hedwig.zookeeper.SafeAsyncZKCallback.VoidCallback
                        public void safeProcessResult(int i2, String str3, Object obj3) {
                            if (i2 == KeeperException.Code.OK.intValue() || i2 == KeeperException.Code.NONODE.intValue()) {
                                ZkHubServerManager.this.registerSelf(hubLoad, callback, obj3);
                            } else {
                                callback.operationFailed(obj3, new PubSubException.ServiceDownException(ZkUtils.logErrorAndCreateZKException("Could not delete stale ephemeral node to register hub", ZkHubServerManager.this.ephemeralNodePath, i2)));
                            }
                        }
                    }, obj2);
                }
            }
        }, obj);
    }

    @Override // org.apache.hedwig.server.topics.HubServerManager
    public void unregisterSelf() throws IOException {
        try {
            this.zk.delete(this.ephemeralNodePath, -1);
        } catch (InterruptedException e) {
            throw new IOException(e);
        } catch (KeeperException e2) {
            throw new IOException((Throwable) e2);
        }
    }

    @Override // org.apache.hedwig.server.topics.HubServerManager
    public void uploadSelfLoadData(HubLoad hubLoad) {
        logger.debug("Reporting hub load of {} : {}", this.myHubInfo, hubLoad);
        this.zk.setData(this.ephemeralNodePath, hubLoad.toString().getBytes(), -1, this.loadReportingStatCallback, (Object) null);
    }

    @Override // org.apache.hedwig.server.topics.HubServerManager
    public void isHubAlive(final HubInfo hubInfo, final Callback<Boolean> callback, Object obj) {
        this.zk.exists(getHubZkNodePath(hubInfo.getAddress()), false, new SafeAsyncZKCallback.StatCallback() { // from class: org.apache.hedwig.server.topics.ZkHubServerManager.3
            @Override // org.apache.hedwig.zookeeper.SafeAsyncZKCallback.StatCallback
            public void safeProcessResult(int i, String str, Object obj2, Stat stat) {
                if (i == KeeperException.Code.NONODE.intValue()) {
                    callback.operationFinished(obj2, false);
                    return;
                }
                if (i != KeeperException.Code.OK.intValue()) {
                    callback.operationFailed(obj2, new PubSubException.ServiceDownException("Failed to check whether hub server " + hubInfo + " is alive!"));
                } else if (hubInfo.getZxid() == stat.getCzxid()) {
                    callback.operationFinished(obj2, true);
                } else {
                    callback.operationFinished(obj2, false);
                }
            }
        }, obj);
    }

    @Override // org.apache.hedwig.server.topics.HubServerManager
    public void chooseLeastLoadedHub(final Callback<HubInfo> callback, Object obj) {
        this.zk.getChildren(this.hubNodesPath, false, new SafeAsyncZKCallback.ChildrenCallback() { // from class: org.apache.hedwig.server.topics.ZkHubServerManager.4
            @Override // org.apache.hedwig.zookeeper.SafeAsyncZKCallback.ChildrenCallback
            public void safeProcessResult(int i, String str, Object obj2, List<String> list) {
                if (i == KeeperException.Code.OK.intValue()) {
                    ZkHubServerManager.this.chooseLeastLoadedNode(list, callback, obj2);
                } else {
                    callback.operationFailed(obj2, new PubSubException.ServiceDownException(ZkUtils.logErrorAndCreateZKException("Could not get list of available hubs", str, i)));
                }
            }
        }, obj);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void chooseLeastLoadedNode(final List<String> list, final Callback<HubInfo> callback, Object obj) {
        SafeAsyncZKCallback.DataCallback dataCallback = new SafeAsyncZKCallback.DataCallback() { // from class: org.apache.hedwig.server.topics.ZkHubServerManager.5
            int numResponses = 0;
            HubLoad minLoad = HubLoad.MAX_LOAD;
            String leastLoaded = null;
            long leastLoadedCzxid = 0;

            @Override // org.apache.hedwig.zookeeper.SafeAsyncZKCallback.DataCallback
            public void safeProcessResult(int i, String str, Object obj2, byte[] bArr, Stat stat) {
                synchronized (this) {
                    if (i == KeeperException.Code.OK.intValue()) {
                        try {
                            HubLoad parse = HubLoad.parse(new String(bArr));
                            ZkHubServerManager.logger.debug("Found server {} with load: {}", obj2, parse);
                            int compareTo = parse.compareTo(this.minLoad);
                            if (compareTo < 0 || (compareTo == 0 && ZkHubServerManager.this.rand.nextBoolean())) {
                                this.minLoad = parse;
                                this.leastLoaded = (String) obj2;
                                this.leastLoadedCzxid = stat.getCzxid();
                            }
                        } catch (HubLoad.InvalidHubLoadException e) {
                            ZkHubServerManager.logger.warn("Corrupted load information from hub : " + obj2);
                        }
                    }
                    this.numResponses++;
                    if (this.numResponses == list.size()) {
                        if (this.leastLoaded == null) {
                            callback.operationFailed(obj2, new PubSubException.ServiceDownException("No hub available"));
                        } else {
                            try {
                                callback.operationFinished(obj2, new HubInfo(new HedwigSocketAddress(this.leastLoaded), this.leastLoadedCzxid));
                            } catch (Throwable th) {
                                callback.operationFailed(obj2, new PubSubException.ServiceDownException("Least loaded hub server " + this.leastLoaded + " is invalid."));
                            }
                        }
                    }
                }
            }
        };
        for (String str : list) {
            this.zk.getData(this.conf.getZkHostsPrefix(new StringBuilder()).append("/").append(str).toString(), false, dataCallback, str);
        }
    }
}
