package org.apache.hedwig.server.meta;

import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.meta.ZkVersion;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.protocol.PubSubProtocol;
import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
import org.apache.hedwig.server.common.ServerConfiguration;
import org.apache.hedwig.server.topics.HubInfo;
import org.apache.hedwig.util.Callback;
import org.apache.hedwig.zookeeper.SafeAsyncZKCallback;
import org.apache.hedwig.zookeeper.ZkUtils;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZKUtil;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hedwig/server/meta/ZkMetadataManagerFactory.class */
public class ZkMetadataManagerFactory extends MetadataManagerFactory {
    protected static final Logger logger = LoggerFactory.getLogger(ZkMetadataManagerFactory.class);
    static final int CUR_VERSION = 1;
    ZooKeeper zk;
    ServerConfiguration cfg;

    /* loaded from: input_file:org/apache/hedwig/server/meta/ZkMetadataManagerFactory$ZkSubscriptionDataManagerImpl.class */
    static class ZkSubscriptionDataManagerImpl implements SubscriptionDataManager {
        ZooKeeper zk;
        ServerConfiguration cfg;

        ZkSubscriptionDataManagerImpl(ServerConfiguration serverConfiguration, ZooKeeper zooKeeper) {
            this.cfg = serverConfiguration;
            this.zk = zooKeeper;
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
        }

        private StringBuilder topicSubscribersPath(StringBuilder sb, ByteString byteString) {
            return this.cfg.getZkTopicPath(sb, byteString).append("/subscribers");
        }

        private String topicSubscriberPath(ByteString byteString, ByteString byteString2) {
            return topicSubscribersPath(new StringBuilder(), byteString).append("/").append(byteString2.toStringUtf8()).toString();
        }

        @Override // org.apache.hedwig.server.meta.SubscriptionDataManager
        public boolean isPartialUpdateSupported() {
            return false;
        }

        @Override // org.apache.hedwig.server.meta.SubscriptionDataManager
        public void createSubscriptionData(final ByteString byteString, final ByteString byteString2, final PubSubProtocol.SubscriptionData subscriptionData, final Callback<Version> callback, Object obj) {
            ZkUtils.createFullPathOptimistic(this.zk, topicSubscriberPath(byteString, byteString2), subscriptionData.toByteArray(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new SafeAsyncZKCallback.StringCallback() { // from class: org.apache.hedwig.server.meta.ZkMetadataManagerFactory.ZkSubscriptionDataManagerImpl.1
                @Override // org.apache.hedwig.zookeeper.SafeAsyncZKCallback.StringCallback
                public void safeProcessResult(int i, String str, Object obj2, String str2) {
                    if (i == KeeperException.Code.NODEEXISTS.intValue()) {
                        callback.operationFailed(obj2, PubSubException.create(PubSubProtocol.StatusCode.SUBSCRIPTION_STATE_EXISTS, "Subscription state for (topic:" + byteString.toStringUtf8() + ", subscriber:" + byteString2.toStringUtf8() + ") existed."));
                        return;
                    }
                    if (i != KeeperException.Code.OK.intValue()) {
                        callback.operationFailed(obj2, new PubSubException.ServiceDownException(ZkUtils.logErrorAndCreateZKException("Could not record new subscription for topic: " + byteString.toStringUtf8() + " subscriberId: " + byteString2.toStringUtf8(), str, i)));
                    } else {
                        if (ZkMetadataManagerFactory.logger.isDebugEnabled()) {
                            ZkMetadataManagerFactory.logger.debug("Successfully recorded subscription for topic: " + byteString.toStringUtf8() + " subscriberId: " + byteString2.toStringUtf8() + " data: " + SubscriptionStateUtils.toString(subscriptionData));
                        }
                        callback.operationFinished(obj2, new ZkVersion(0));
                    }
                }
            }, obj);
        }

        @Override // org.apache.hedwig.server.meta.SubscriptionDataManager
        public void updateSubscriptionData(ByteString byteString, ByteString byteString2, PubSubProtocol.SubscriptionData subscriptionData, Version version, Callback<Version> callback, Object obj) {
            throw new UnsupportedOperationException("ZooKeeper based metadata manager doesn't support partial update!");
        }

        @Override // org.apache.hedwig.server.meta.SubscriptionDataManager
        public void replaceSubscriptionData(final ByteString byteString, final ByteString byteString2, final PubSubProtocol.SubscriptionData subscriptionData, Version version, final Callback<Version> callback, Object obj) {
            int i = -1;
            if (Version.NEW == version) {
                callback.operationFailed(obj, new PubSubException.BadVersionException("Can not replace Version.New subscription data"));
                return;
            }
            if (Version.ANY != version) {
                if (!(version instanceof ZkVersion)) {
                    callback.operationFailed(obj, new PubSubException.UnexpectedConditionException("Invalid version provided to replace subscription data for topic  " + byteString.toStringUtf8() + " subscribe id: " + byteString2));
                    return;
                }
                i = ((ZkVersion) version).getZnodeVersion();
            }
            this.zk.setData(topicSubscriberPath(byteString, byteString2), subscriptionData.toByteArray(), i, new SafeAsyncZKCallback.StatCallback() { // from class: org.apache.hedwig.server.meta.ZkMetadataManagerFactory.ZkSubscriptionDataManagerImpl.2
                @Override // org.apache.hedwig.zookeeper.SafeAsyncZKCallback.StatCallback
                public void safeProcessResult(int i2, String str, Object obj2, Stat stat) {
                    if (i2 == KeeperException.Code.NONODE.intValue()) {
                        callback.operationFailed(obj2, PubSubException.create(PubSubProtocol.StatusCode.NO_SUBSCRIPTION_STATE, "No subscription state found for (topic:" + byteString.toStringUtf8() + ", subscriber:" + byteString2.toStringUtf8() + ")."));
                        return;
                    }
                    if (i2 == -103) {
                        callback.operationFailed(obj2, PubSubException.create(PubSubProtocol.StatusCode.BAD_VERSION, "Bad version provided to replace subscription data of topic " + byteString.toStringUtf8() + " subscriberId " + byteString2));
                        return;
                    }
                    if (i2 != KeeperException.Code.OK.intValue()) {
                        callback.operationFailed(obj2, new PubSubException.ServiceDownException(ZkUtils.logErrorAndCreateZKException("Topic: " + byteString.toStringUtf8() + " subscriberId: " + byteString2.toStringUtf8() + " could not set subscription data: " + SubscriptionStateUtils.toString(subscriptionData), str, i2)));
                    } else {
                        if (ZkMetadataManagerFactory.logger.isDebugEnabled()) {
                            ZkMetadataManagerFactory.logger.debug("Successfully updated subscription for topic: " + byteString.toStringUtf8() + " subscriberId: " + byteString2.toStringUtf8() + " data: " + SubscriptionStateUtils.toString(subscriptionData));
                        }
                        callback.operationFinished(obj2, new ZkVersion(stat.getVersion()));
                    }
                }
            }, obj);
        }

        @Override // org.apache.hedwig.server.meta.SubscriptionDataManager
        public void deleteSubscriptionData(final ByteString byteString, final ByteString byteString2, Version version, final Callback<Void> callback, Object obj) {
            int i = -1;
            if (Version.NEW == version) {
                callback.operationFailed(obj, new PubSubException.BadVersionException("Can not delete Version.New subscription data"));
                return;
            }
            if (Version.ANY != version) {
                if (!(version instanceof ZkVersion)) {
                    callback.operationFailed(obj, new PubSubException.UnexpectedConditionException("Invalid version provided to delete subscription data for topic  " + byteString.toStringUtf8() + " subscribe id: " + byteString2));
                    return;
                }
                i = ((ZkVersion) version).getZnodeVersion();
            }
            this.zk.delete(topicSubscriberPath(byteString, byteString2), i, new SafeAsyncZKCallback.VoidCallback() { // from class: org.apache.hedwig.server.meta.ZkMetadataManagerFactory.ZkSubscriptionDataManagerImpl.3
                @Override // org.apache.hedwig.zookeeper.SafeAsyncZKCallback.VoidCallback
                public void safeProcessResult(int i2, String str, Object obj2) {
                    if (i2 == KeeperException.Code.NONODE.intValue()) {
                        callback.operationFailed(obj2, PubSubException.create(PubSubProtocol.StatusCode.NO_SUBSCRIPTION_STATE, "No subscription state found for (topic:" + byteString.toStringUtf8() + ", subscriber:" + byteString2.toStringUtf8() + ")."));
                        return;
                    }
                    if (i2 == -103) {
                        callback.operationFailed(obj2, PubSubException.create(PubSubProtocol.StatusCode.BAD_VERSION, "Bad version provided to delete subscription data of topic " + byteString.toStringUtf8() + " subscriberId " + byteString2));
                        return;
                    }
                    if (i2 != KeeperException.Code.OK.intValue()) {
                        callback.operationFailed(obj2, new PubSubException.ServiceDownException(ZkUtils.logErrorAndCreateZKException("Topic: " + byteString.toStringUtf8() + " subscriberId: " + byteString2.toStringUtf8() + " failed to delete subscription", str, i2)));
                    } else {
                        if (ZkMetadataManagerFactory.logger.isDebugEnabled()) {
                            ZkMetadataManagerFactory.logger.debug("Successfully deleted subscription for topic: " + byteString.toStringUtf8() + " subscriberId: " + byteString2.toStringUtf8());
                        }
                        callback.operationFinished(obj2, (Object) null);
                    }
                }
            }, obj);
        }

        @Override // org.apache.hedwig.server.meta.SubscriptionDataManager
        public void readSubscriptionData(final ByteString byteString, final ByteString byteString2, final Callback<Versioned<PubSubProtocol.SubscriptionData>> callback, Object obj) {
            this.zk.getData(topicSubscriberPath(byteString, byteString2), false, new SafeAsyncZKCallback.DataCallback() { // from class: org.apache.hedwig.server.meta.ZkMetadataManagerFactory.ZkSubscriptionDataManagerImpl.4
                @Override // org.apache.hedwig.zookeeper.SafeAsyncZKCallback.DataCallback
                public void safeProcessResult(int i, String str, Object obj2, byte[] bArr, Stat stat) {
                    if (i == KeeperException.Code.NONODE.intValue()) {
                        callback.operationFinished(obj2, (Object) null);
                        return;
                    }
                    if (i != KeeperException.Code.OK.intValue()) {
                        callback.operationFailed(obj2, new PubSubException.ServiceDownException(ZkUtils.logErrorAndCreateZKException("Could not read subscription data for topic: " + byteString.toStringUtf8() + ", subscriberId: " + byteString2.toStringUtf8(), str, i)));
                        return;
                    }
                    try {
                        Versioned versioned = new Versioned(SubscriptionStateUtils.parseSubscriptionData(bArr), new ZkVersion(stat.getVersion()));
                        if (ZkMetadataManagerFactory.logger.isDebugEnabled()) {
                            ZkMetadataManagerFactory.logger.debug("Found subscription while acquiring topic: " + byteString.toStringUtf8() + " subscriberId: " + byteString2.toStringUtf8() + " data: " + SubscriptionStateUtils.toString((PubSubProtocol.SubscriptionData) versioned.getValue()));
                        }
                        callback.operationFinished(obj2, versioned);
                    } catch (InvalidProtocolBufferException e) {
                        String str2 = "Failed to deserialize subscription data for topic: " + byteString.toStringUtf8() + " subscriberId: " + byteString2.toStringUtf8();
                        ZkMetadataManagerFactory.logger.error(str2, e);
                        callback.operationFailed(obj2, new PubSubException.UnexpectedConditionException(str2));
                    }
                }
            }, obj);
        }

        @Override // org.apache.hedwig.server.meta.SubscriptionDataManager
        public void readSubscriptions(final ByteString byteString, final Callback<Map<ByteString, Versioned<PubSubProtocol.SubscriptionData>>> callback, Object obj) {
            this.zk.getChildren(topicSubscribersPath(new StringBuilder(), byteString).toString(), false, new SafeAsyncZKCallback.ChildrenCallback() { // from class: org.apache.hedwig.server.meta.ZkMetadataManagerFactory.ZkSubscriptionDataManagerImpl.5
                @Override // org.apache.hedwig.zookeeper.SafeAsyncZKCallback.ChildrenCallback
                public void safeProcessResult(int i, String str, final Object obj2, final List<String> list) {
                    if (i != KeeperException.Code.OK.intValue() && i != KeeperException.Code.NONODE.intValue()) {
                        callback.operationFailed(obj2, new PubSubException.ServiceDownException(ZkUtils.logErrorAndCreateZKException("Could not read subscribers for topic " + byteString.toStringUtf8(), str, i)));
                        return;
                    }
                    final ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
                    if (i == KeeperException.Code.NONODE.intValue() || list.size() == 0) {
                        if (ZkMetadataManagerFactory.logger.isDebugEnabled()) {
                            ZkMetadataManagerFactory.logger.debug("No subscriptions found while acquiring topic: " + byteString.toStringUtf8());
                        }
                        callback.operationFinished(obj2, concurrentHashMap);
                        return;
                    }
                    final AtomicBoolean atomicBoolean = new AtomicBoolean();
                    final AtomicInteger atomicInteger = new AtomicInteger();
                    for (final String str2 : list) {
                        final ByteString copyFromUtf8 = ByteString.copyFromUtf8(str2);
                        ZkSubscriptionDataManagerImpl.this.zk.getData(str + "/" + str2, false, new SafeAsyncZKCallback.DataCallback() { // from class: org.apache.hedwig.server.meta.ZkMetadataManagerFactory.ZkSubscriptionDataManagerImpl.5.1
                            static final /* synthetic */ boolean $assertionsDisabled;

                            @Override // org.apache.hedwig.zookeeper.SafeAsyncZKCallback.DataCallback
                            public void safeProcessResult(int i2, String str3, Object obj3, byte[] bArr, Stat stat) {
                                if (i2 != KeeperException.Code.OK.intValue()) {
                                    reportFailure(new PubSubException.ServiceDownException(ZkUtils.logErrorAndCreateZKException("Could not read subscription data for topic: " + byteString.toStringUtf8() + ", subscriberId: " + copyFromUtf8.toStringUtf8(), str3, i2)));
                                    return;
                                }
                                if (atomicBoolean.get()) {
                                    return;
                                }
                                try {
                                    Versioned versioned = new Versioned(SubscriptionStateUtils.parseSubscriptionData(bArr), new ZkVersion(stat.getVersion()));
                                    if (ZkMetadataManagerFactory.logger.isDebugEnabled()) {
                                        ZkMetadataManagerFactory.logger.debug("Found subscription while acquiring topic: " + byteString.toStringUtf8() + " subscriberId: " + str2 + "state: " + SubscriptionStateUtils.toString((PubSubProtocol.SubscriptionData) versioned.getValue()));
                                    }
                                    concurrentHashMap.put(copyFromUtf8, versioned);
                                    if (atomicInteger.incrementAndGet() == list.size()) {
                                        if (!$assertionsDisabled && concurrentHashMap.size() != atomicInteger.get()) {
                                            throw new AssertionError();
                                        }
                                        callback.operationFinished(obj3, concurrentHashMap);
                                    }
                                } catch (InvalidProtocolBufferException e) {
                                    String str4 = "Failed to deserialize subscription data for topic: " + byteString.toStringUtf8() + " subscriberId: " + copyFromUtf8.toStringUtf8();
                                    ZkMetadataManagerFactory.logger.error(str4, e);
                                    reportFailure(new PubSubException.UnexpectedConditionException(str4));
                                }
                            }

                            private void reportFailure(PubSubException pubSubException) {
                                if (atomicBoolean.compareAndSet(false, true)) {
                                    callback.operationFailed(obj2, pubSubException);
                                }
                            }

                            static {
                                $assertionsDisabled = !ZkMetadataManagerFactory.class.desiredAssertionStatus();
                            }
                        }, obj2);
                    }
                }
            }, obj);
        }
    }

    /* loaded from: input_file:org/apache/hedwig/server/meta/ZkMetadataManagerFactory$ZkTopicOwnershipManagerImpl.class */
    static class ZkTopicOwnershipManagerImpl implements TopicOwnershipManager {
        ZooKeeper zk;
        ServerConfiguration cfg;

        ZkTopicOwnershipManagerImpl(ServerConfiguration serverConfiguration, ZooKeeper zooKeeper) {
            this.cfg = serverConfiguration;
            this.zk = zooKeeper;
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
        }

        String hubPath(ByteString byteString) {
            return this.cfg.getZkTopicPath(new StringBuilder(), byteString).append("/hub").toString();
        }

        @Override // org.apache.hedwig.server.meta.TopicOwnershipManager
        public void readOwnerInfo(final ByteString byteString, final Callback<Versioned<HubInfo>> callback, Object obj) {
            this.zk.getData(hubPath(byteString), false, new SafeAsyncZKCallback.DataCallback() { // from class: org.apache.hedwig.server.meta.ZkMetadataManagerFactory.ZkTopicOwnershipManagerImpl.1
                @Override // org.apache.hedwig.zookeeper.SafeAsyncZKCallback.DataCallback
                public void safeProcessResult(int i, String str, Object obj2, byte[] bArr, Stat stat) {
                    if (KeeperException.Code.NONODE.intValue() == i) {
                        callback.operationFinished(obj2, (Object) null);
                        return;
                    }
                    if (KeeperException.Code.OK.intValue() != i) {
                        callback.operationFailed(obj2, new PubSubException.ServiceDownException(ZkUtils.logErrorAndCreateZKException("Could not read ownership for topic: " + byteString.toStringUtf8(), str, i)));
                        return;
                    }
                    HubInfo hubInfo = null;
                    try {
                        hubInfo = HubInfo.parse(new String(bArr));
                    } catch (HubInfo.InvalidHubInfoException e) {
                        ZkMetadataManagerFactory.logger.warn("Failed to parse hub info for topic " + byteString.toStringUtf8() + " : ", e);
                    }
                    callback.operationFinished(obj2, new Versioned(hubInfo, new ZkVersion(stat.getVersion())));
                }
            }, obj);
        }

        @Override // org.apache.hedwig.server.meta.TopicOwnershipManager
        public void writeOwnerInfo(final ByteString byteString, final HubInfo hubInfo, Version version, final Callback<Version> callback, Object obj) {
            if (Version.NEW == version) {
                createOwnerInfo(byteString, hubInfo, callback, obj);
            } else if (!(version instanceof ZkVersion)) {
                callback.operationFailed(obj, new PubSubException.UnexpectedConditionException("Invalid version provided to update owner info for topic " + byteString.toStringUtf8()));
            } else {
                this.zk.setData(hubPath(byteString), hubInfo.toString().getBytes(), ((ZkVersion) version).getZnodeVersion(), new SafeAsyncZKCallback.StatCallback() { // from class: org.apache.hedwig.server.meta.ZkMetadataManagerFactory.ZkTopicOwnershipManagerImpl.2
                    @Override // org.apache.hedwig.zookeeper.SafeAsyncZKCallback.StatCallback
                    public void safeProcessResult(int i, String str, Object obj2, Stat stat) {
                        if (i == KeeperException.Code.NONODE.intValue()) {
                            callback.operationFailed(obj2, PubSubException.create(PubSubProtocol.StatusCode.NO_TOPIC_OWNER_INFO, "No owner info found for topic " + byteString.toStringUtf8()));
                            return;
                        }
                        if (i == -103) {
                            callback.operationFailed(obj2, PubSubException.create(PubSubProtocol.StatusCode.BAD_VERSION, "Bad version provided to update owner info of topic " + byteString.toStringUtf8()));
                        } else if (KeeperException.Code.OK.intValue() == i) {
                            callback.operationFinished(obj2, new ZkVersion(stat.getVersion()));
                        } else {
                            callback.operationFailed(obj2, new PubSubException.ServiceDownException(ZkUtils.logErrorAndCreateZKException("Failed to update ownership of topic " + byteString.toStringUtf8() + " to " + hubInfo, str, i)));
                        }
                    }
                }, obj);
            }
        }

        protected void createOwnerInfo(final ByteString byteString, HubInfo hubInfo, final Callback<Version> callback, Object obj) {
            ZkUtils.createFullPathOptimistic(this.zk, hubPath(byteString), hubInfo.toString().getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new SafeAsyncZKCallback.StringCallback() { // from class: org.apache.hedwig.server.meta.ZkMetadataManagerFactory.ZkTopicOwnershipManagerImpl.3
                @Override // org.apache.hedwig.zookeeper.SafeAsyncZKCallback.StringCallback
                public void safeProcessResult(int i, String str, Object obj2, String str2) {
                    if (KeeperException.Code.OK.intValue() == i) {
                        callback.operationFinished(obj2, new ZkVersion(0));
                    } else if (KeeperException.Code.NODEEXISTS.intValue() == i) {
                        callback.operationFailed(obj2, PubSubException.create(PubSubProtocol.StatusCode.TOPIC_OWNER_INFO_EXISTS, "Owner info of topic " + byteString.toStringUtf8() + " existed."));
                    } else {
                        callback.operationFailed(obj2, new PubSubException.ServiceDownException(ZkUtils.logErrorAndCreateZKException("Failed to create znode for ownership of topic: " + byteString.toStringUtf8(), str, i)));
                    }
                }
            }, obj);
        }

        @Override // org.apache.hedwig.server.meta.TopicOwnershipManager
        public void deleteOwnerInfo(final ByteString byteString, Version version, final Callback<Void> callback, Object obj) {
            int i = -1;
            if (Version.ANY != version) {
                if (!(version instanceof ZkVersion)) {
                    callback.operationFailed(obj, new PubSubException.UnexpectedConditionException("Invalid version provided to delete owner info for topic " + byteString.toStringUtf8()));
                    return;
                }
                i = ((ZkVersion) version).getZnodeVersion();
            }
            this.zk.delete(hubPath(byteString), i, new SafeAsyncZKCallback.VoidCallback() { // from class: org.apache.hedwig.server.meta.ZkMetadataManagerFactory.ZkTopicOwnershipManagerImpl.4
                @Override // org.apache.hedwig.zookeeper.SafeAsyncZKCallback.VoidCallback
                public void safeProcessResult(int i2, String str, Object obj2) {
                    if (KeeperException.Code.OK.intValue() == i2) {
                        if (ZkMetadataManagerFactory.logger.isDebugEnabled()) {
                            ZkMetadataManagerFactory.logger.debug("Successfully deleted owner info for topic " + byteString.toStringUtf8() + ".");
                        }
                        callback.operationFinished(obj2, (Object) null);
                    } else if (KeeperException.Code.NONODE.intValue() == i2) {
                        callback.operationFailed(obj2, PubSubException.create(PubSubProtocol.StatusCode.NO_TOPIC_OWNER_INFO, "No owner info found for topic " + byteString.toStringUtf8()));
                    } else if (-103 == i2) {
                        callback.operationFailed(obj2, PubSubException.create(PubSubProtocol.StatusCode.BAD_VERSION, "Bad version provided to delete owner info of topic " + byteString.toStringUtf8()));
                    } else {
                        callback.operationFailed(obj2, new PubSubException.ServiceDownException(ZkUtils.logErrorAndCreateZKException("Failed to delete owner info for topic " + byteString.toStringUtf8(), str, i2)));
                    }
                }
            }, obj);
        }
    }

    /* loaded from: input_file:org/apache/hedwig/server/meta/ZkMetadataManagerFactory$ZkTopicPersistenceManagerImpl.class */
    static class ZkTopicPersistenceManagerImpl implements TopicPersistenceManager {
        ZooKeeper zk;
        ServerConfiguration cfg;

        ZkTopicPersistenceManagerImpl(ServerConfiguration serverConfiguration, ZooKeeper zooKeeper) {
            this.cfg = serverConfiguration;
            this.zk = zooKeeper;
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
        }

        private String ledgersPath(ByteString byteString) {
            return this.cfg.getZkTopicPath(new StringBuilder(), byteString).append("/ledgers").toString();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void parseAndReturnTopicLedgerRanges(ByteString byteString, byte[] bArr, int i, Callback<Versioned<PubSubProtocol.LedgerRanges>> callback, Object obj) {
            try {
                callback.operationFinished(obj, new Versioned(PubSubProtocol.LedgerRanges.parseFrom(bArr), new ZkVersion(i)));
            } catch (InvalidProtocolBufferException e) {
                String str = "Ledger ranges for topic:" + byteString.toStringUtf8() + " could not be deserialized";
                ZkMetadataManagerFactory.logger.error(str, e);
                callback.operationFailed(obj, new PubSubException.UnexpectedConditionException(str));
            }
        }

        @Override // org.apache.hedwig.server.meta.TopicPersistenceManager
        public void readTopicPersistenceInfo(final ByteString byteString, final Callback<Versioned<PubSubProtocol.LedgerRanges>> callback, Object obj) {
            this.zk.getData(ledgersPath(byteString), false, new SafeAsyncZKCallback.DataCallback() { // from class: org.apache.hedwig.server.meta.ZkMetadataManagerFactory.ZkTopicPersistenceManagerImpl.1
                @Override // org.apache.hedwig.zookeeper.SafeAsyncZKCallback.DataCallback
                public void safeProcessResult(int i, String str, Object obj2, byte[] bArr, Stat stat) {
                    if (i == KeeperException.Code.OK.intValue()) {
                        ZkTopicPersistenceManagerImpl.this.parseAndReturnTopicLedgerRanges(byteString, bArr, stat.getVersion(), callback, obj2);
                    } else if (i == KeeperException.Code.NONODE.intValue()) {
                        callback.operationFinished(obj2, (Object) null);
                    } else {
                        callback.operationFailed(obj2, new PubSubException.ServiceDownException(ZkUtils.logErrorAndCreateZKException("Could not read ledgers node for topic: " + byteString.toStringUtf8(), str, i)));
                    }
                }
            }, obj);
        }

        private void createTopicPersistenceInfo(final ByteString byteString, PubSubProtocol.LedgerRanges ledgerRanges, final Callback<Version> callback, Object obj) {
            ZkUtils.createFullPathOptimistic(this.zk, ledgersPath(byteString), ledgerRanges.toByteArray(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new SafeAsyncZKCallback.StringCallback() { // from class: org.apache.hedwig.server.meta.ZkMetadataManagerFactory.ZkTopicPersistenceManagerImpl.2
                @Override // org.apache.hedwig.zookeeper.SafeAsyncZKCallback.StringCallback
                public void safeProcessResult(int i, String str, Object obj2, String str2) {
                    if (i == KeeperException.Code.NODEEXISTS.intValue()) {
                        callback.operationFailed(obj2, PubSubException.create(PubSubProtocol.StatusCode.TOPIC_PERSISTENCE_INFO_EXISTS, "Persistence info of topic " + byteString.toStringUtf8() + " existed."));
                    } else if (i == KeeperException.Code.OK.intValue()) {
                        callback.operationFinished(obj2, new ZkVersion(0));
                    } else {
                        callback.operationFailed(obj2, new PubSubException.ServiceDownException(ZkUtils.logErrorAndCreateZKException("Could not create ledgers node for topic: " + byteString.toStringUtf8(), str, i)));
                    }
                }
            }, obj);
        }

        @Override // org.apache.hedwig.server.meta.TopicPersistenceManager
        public void writeTopicPersistenceInfo(final ByteString byteString, PubSubProtocol.LedgerRanges ledgerRanges, Version version, final Callback<Version> callback, Object obj) {
            if (Version.NEW == version) {
                createTopicPersistenceInfo(byteString, ledgerRanges, callback, obj);
                return;
            }
            String ledgersPath = ledgersPath(byteString);
            byte[] byteArray = ledgerRanges.toByteArray();
            if (!(version instanceof ZkVersion)) {
                callback.operationFailed(obj, new PubSubException.UnexpectedConditionException("Invalid version provided to update persistence info for topic " + byteString.toStringUtf8()));
            } else {
                this.zk.setData(ledgersPath, byteArray, ((ZkVersion) version).getZnodeVersion(), new SafeAsyncZKCallback.StatCallback() { // from class: org.apache.hedwig.server.meta.ZkMetadataManagerFactory.ZkTopicPersistenceManagerImpl.3
                    @Override // org.apache.hedwig.zookeeper.SafeAsyncZKCallback.StatCallback
                    public void safeProcessResult(int i, String str, Object obj2, Stat stat) {
                        if (i == KeeperException.Code.NONODE.intValue()) {
                            callback.operationFailed(obj2, PubSubException.create(PubSubProtocol.StatusCode.NO_TOPIC_PERSISTENCE_INFO, "No persistence info found for topic " + byteString.toStringUtf8()));
                            return;
                        }
                        if (i == -103) {
                            callback.operationFailed(obj2, PubSubException.create(PubSubProtocol.StatusCode.BAD_VERSION, "Bad version provided to update persistence info of topic " + byteString.toStringUtf8()));
                        } else if (i == KeeperException.Code.OK.intValue()) {
                            callback.operationFinished(obj2, new ZkVersion(stat.getVersion()));
                        } else {
                            callback.operationFailed(obj2, new PubSubException.ServiceDownException(ZkUtils.logErrorAndCreateZKException("Could not write ledgers node for topic: " + byteString.toStringUtf8(), str, i)));
                        }
                    }
                }, obj);
            }
        }

        @Override // org.apache.hedwig.server.meta.TopicPersistenceManager
        public void deleteTopicPersistenceInfo(final ByteString byteString, final Version version, final Callback<Void> callback, Object obj) {
            String ledgersPath = ledgersPath(byteString);
            int i = -1;
            if (Version.ANY != version) {
                if (!(version instanceof ZkVersion)) {
                    callback.operationFailed(obj, new PubSubException.UnexpectedConditionException("Invalid version provided to delete persistence info for topic " + byteString.toStringUtf8()));
                    return;
                }
                i = ((ZkVersion) version).getZnodeVersion();
            }
            this.zk.delete(ledgersPath, i, new SafeAsyncZKCallback.VoidCallback() { // from class: org.apache.hedwig.server.meta.ZkMetadataManagerFactory.ZkTopicPersistenceManagerImpl.4
                @Override // org.apache.hedwig.zookeeper.SafeAsyncZKCallback.VoidCallback
                public void safeProcessResult(int i2, String str, Object obj2) {
                    if (i2 == KeeperException.Code.OK.intValue()) {
                        callback.operationFinished(obj2, (Object) null);
                        return;
                    }
                    if (i2 == KeeperException.Code.NONODE.intValue()) {
                        callback.operationFailed(obj2, PubSubException.create(PubSubProtocol.StatusCode.NO_TOPIC_PERSISTENCE_INFO, "No persistence info found for topic " + byteString.toStringUtf8()));
                    } else if (i2 == -103) {
                        callback.operationFailed(obj2, PubSubException.create(PubSubProtocol.StatusCode.BAD_VERSION, "Bad version provided to delete persistence info of topic " + byteString.toStringUtf8()));
                    } else {
                        callback.operationFailed(obj2, new PubSubException.ServiceDownException(ZkUtils.logErrorAndCreateZKException("Topic: " + byteString.toStringUtf8() + " failed to delete persistence info @version " + version + " : ", str, i2)));
                    }
                }
            }, obj);
        }
    }

    @Override // org.apache.hedwig.server.meta.MetadataManagerFactory
    public int getCurrentVersion() {
        return 1;
    }

    @Override // org.apache.hedwig.server.meta.MetadataManagerFactory
    public MetadataManagerFactory initialize(ServerConfiguration serverConfiguration, ZooKeeper zooKeeper, int i) throws IOException {
        if (1 != i) {
            throw new IOException("Incompatible ZkMetadataManagerFactory version " + i + " found, expected version 1");
        }
        this.cfg = serverConfiguration;
        this.zk = zooKeeper;
        return this;
    }

    @Override // org.apache.hedwig.server.meta.MetadataManagerFactory
    public void shutdown() {
    }

    @Override // org.apache.hedwig.server.meta.MetadataManagerFactory
    public Iterator<ByteString> getTopics() throws IOException {
        try {
            final Iterator it = this.zk.getChildren(this.cfg.getZkTopicsPrefix(new StringBuilder()).toString(), false).iterator();
            return new Iterator<ByteString>() { // from class: org.apache.hedwig.server.meta.ZkMetadataManagerFactory.1
                @Override // java.util.Iterator
                public boolean hasNext() {
                    return it.hasNext();
                }

                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.Iterator
                public ByteString next() {
                    return ByteString.copyFromUtf8((String) it.next());
                }

                @Override // java.util.Iterator
                public void remove() {
                    it.remove();
                }
            };
        } catch (KeeperException e) {
            throw new IOException("Failed to get topics list : ", e);
        } catch (InterruptedException e2) {
            throw new IOException("Interrupted on getting topics list : ", e2);
        }
    }

    @Override // org.apache.hedwig.server.meta.MetadataManagerFactory
    public TopicPersistenceManager newTopicPersistenceManager() {
        return new ZkTopicPersistenceManagerImpl(this.cfg, this.zk);
    }

    @Override // org.apache.hedwig.server.meta.MetadataManagerFactory
    public SubscriptionDataManager newSubscriptionDataManager() {
        return new ZkSubscriptionDataManagerImpl(this.cfg, this.zk);
    }

    @Override // org.apache.hedwig.server.meta.MetadataManagerFactory
    public TopicOwnershipManager newTopicOwnershipManager() {
        return new ZkTopicOwnershipManagerImpl(this.cfg, this.zk);
    }

    @Override // org.apache.hedwig.server.meta.MetadataManagerFactory
    public void format(ServerConfiguration serverConfiguration, ZooKeeper zooKeeper) throws IOException {
        try {
            ZKUtil.deleteRecursive(zooKeeper, serverConfiguration.getZkTopicsPrefix(new StringBuilder()).toString());
        } catch (KeeperException e) {
            throw new IOException((Throwable) e);
        } catch (KeeperException.NoNodeException e2) {
            logger.debug("Hedwig root node doesn't exist in zookeeper to delete");
        } catch (InterruptedException e3) {
            throw new IOException(e3);
        }
    }
}
