package org.apache.hedwig.server.meta;

import com.google.protobuf.ByteString;
import com.google.protobuf.TextFormat;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.bookkeeper.metastore.MSException;
import org.apache.bookkeeper.metastore.MetaStore;
import org.apache.bookkeeper.metastore.MetastoreCallback;
import org.apache.bookkeeper.metastore.MetastoreCursor;
import org.apache.bookkeeper.metastore.MetastoreException;
import org.apache.bookkeeper.metastore.MetastoreFactory;
import org.apache.bookkeeper.metastore.MetastoreScannableTable;
import org.apache.bookkeeper.metastore.MetastoreTable;
import org.apache.bookkeeper.metastore.MetastoreTableItem;
import org.apache.bookkeeper.metastore.MetastoreUtils;
import org.apache.bookkeeper.metastore.Value;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.protocol.PubSubProtocol;
import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
import org.apache.hedwig.server.common.ServerConfiguration;
import org.apache.hedwig.server.topics.HubInfo;
import org.apache.hedwig.util.Callback;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hedwig/server/meta/MsMetadataManagerFactory.class */
public class MsMetadataManagerFactory extends MetadataManagerFactory {
    protected static final Logger logger = LoggerFactory.getLogger(MsMetadataManagerFactory.class);
    static final String UTF8 = "UTF-8";
    static final int CUR_VERSION = 1;
    static final String OWNER_TABLE_NAME = "owner";
    static final String PERSIST_TABLE_NAME = "persist";
    static final String SUB_TABLE_NAME = "sub";
    MetaStore metastore;
    MetastoreTable ownerTable;
    MetastoreTable persistTable;
    MetastoreScannableTable subTable;
    ServerConfiguration cfg;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hedwig/server/meta/MsMetadataManagerFactory$MsSubscriptionDataManagerImpl.class */
    public static class MsSubscriptionDataManagerImpl implements SubscriptionDataManager {
        static final String SUB_STATE_FIELD = "sub_state";
        static final String SUB_PREFS_FIELD = "sub_preferences";
        static final char TOPIC_SUB_FIRST_SEPARATOR = 1;
        static final char TOPIC_SUB_LAST_SEPARATOR = 2;
        final ServerConfiguration cfg;
        final MetastoreScannableTable subTable;

        MsSubscriptionDataManagerImpl(ServerConfiguration serverConfiguration, MetastoreScannableTable metastoreScannableTable) {
            this.cfg = serverConfiguration;
            this.subTable = metastoreScannableTable;
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
        }

        private String getSubscriptionKey(ByteString byteString, ByteString byteString2) {
            return byteString.toStringUtf8() + (char) 1 + byteString2.toStringUtf8();
        }

        private Value subscriptionData2Value(PubSubProtocol.SubscriptionData subscriptionData) {
            Value value = new Value();
            if (subscriptionData.hasState()) {
                value.setField(SUB_STATE_FIELD, TextFormat.printToString(subscriptionData.getState()).getBytes());
            }
            if (subscriptionData.hasPreferences()) {
                value.setField(SUB_PREFS_FIELD, TextFormat.printToString(subscriptionData.getPreferences()).getBytes());
            }
            return value;
        }

        @Override // org.apache.hedwig.server.meta.SubscriptionDataManager
        public void createSubscriptionData(final ByteString byteString, final ByteString byteString2, final PubSubProtocol.SubscriptionData subscriptionData, final Callback<Version> callback, Object obj) {
            this.subTable.put(getSubscriptionKey(byteString, byteString2), subscriptionData2Value(subscriptionData), Version.NEW, new MetastoreCallback<Version>() { // from class: org.apache.hedwig.server.meta.MsMetadataManagerFactory.MsSubscriptionDataManagerImpl.1
                public void complete(int i, Version version, Object obj2) {
                    if (i == MSException.Code.OK.getCode()) {
                        if (MsMetadataManagerFactory.logger.isDebugEnabled()) {
                            MsMetadataManagerFactory.logger.debug("Successfully create subscription for topic: " + byteString.toStringUtf8() + ", subscriberId: " + byteString2.toStringUtf8() + ", data: " + SubscriptionStateUtils.toString(subscriptionData));
                        }
                        callback.operationFinished(obj2, version);
                    } else if (i == MSException.Code.KeyExists.getCode()) {
                        callback.operationFailed(obj2, PubSubException.create(PubSubProtocol.StatusCode.SUBSCRIPTION_STATE_EXISTS, "Subscription data for (topic:" + byteString.toStringUtf8() + ", subscriber:" + byteString2.toStringUtf8() + ") existed."));
                    } else {
                        MsMetadataManagerFactory.logErrorAndFinishOperation("Failed to create topic: " + byteString.toStringUtf8() + ", subscriberId: " + byteString2.toStringUtf8() + ", data: " + SubscriptionStateUtils.toString(subscriptionData), callback, obj2, i);
                    }
                }
            }, obj);
        }

        @Override // org.apache.hedwig.server.meta.SubscriptionDataManager
        public boolean isPartialUpdateSupported() {
            return true;
        }

        @Override // org.apache.hedwig.server.meta.SubscriptionDataManager
        public void replaceSubscriptionData(ByteString byteString, ByteString byteString2, PubSubProtocol.SubscriptionData subscriptionData, Version version, Callback<Version> callback, Object obj) {
            updateSubscriptionData(byteString, byteString2, subscriptionData, version, callback, obj);
        }

        @Override // org.apache.hedwig.server.meta.SubscriptionDataManager
        public void updateSubscriptionData(final ByteString byteString, final ByteString byteString2, final PubSubProtocol.SubscriptionData subscriptionData, Version version, final Callback<Version> callback, Object obj) {
            this.subTable.put(getSubscriptionKey(byteString, byteString2), subscriptionData2Value(subscriptionData), version, new MetastoreCallback<Version>() { // from class: org.apache.hedwig.server.meta.MsMetadataManagerFactory.MsSubscriptionDataManagerImpl.2
                public void complete(int i, Version version2, Object obj2) {
                    if (i == MSException.Code.OK.getCode()) {
                        if (MsMetadataManagerFactory.logger.isDebugEnabled()) {
                            MsMetadataManagerFactory.logger.debug("Successfully updated subscription data for topic: " + byteString.toStringUtf8() + ", subscriberId: " + byteString2.toStringUtf8() + ", data: " + SubscriptionStateUtils.toString(subscriptionData) + ", version: " + version2);
                        }
                        callback.operationFinished(obj2, version2);
                    } else if (i == MSException.Code.NoKey.getCode()) {
                        callback.operationFailed(obj2, PubSubException.create(PubSubProtocol.StatusCode.NO_SUBSCRIPTION_STATE, "No subscription data found for (topic:" + byteString.toStringUtf8() + ", subscriber:" + byteString2.toStringUtf8() + ")."));
                    } else if (i == MSException.Code.BadVersion.getCode()) {
                        callback.operationFailed(obj2, PubSubException.create(PubSubProtocol.StatusCode.BAD_VERSION, "Bad version provided to update subscription data of topic " + byteString.toStringUtf8() + " subscriberId " + byteString2));
                    } else {
                        MsMetadataManagerFactory.logErrorAndFinishOperation("Failed to update subscription data for topic: " + byteString.toStringUtf8() + ", subscriberId: " + byteString2.toStringUtf8() + ", data: " + SubscriptionStateUtils.toString(subscriptionData) + ", version: " + version2, callback, obj2, i);
                    }
                }
            }, obj);
        }

        @Override // org.apache.hedwig.server.meta.SubscriptionDataManager
        public void deleteSubscriptionData(final ByteString byteString, final ByteString byteString2, Version version, final Callback<Void> callback, Object obj) {
            this.subTable.remove(getSubscriptionKey(byteString, byteString2), version, new MetastoreCallback<Void>() { // from class: org.apache.hedwig.server.meta.MsMetadataManagerFactory.MsSubscriptionDataManagerImpl.3
                public void complete(int i, Void r8, Object obj2) {
                    if (i == MSException.Code.OK.getCode()) {
                        MsMetadataManagerFactory.logger.debug("Successfully delete subscription for topic: {}, subscriberId: {}.", byteString.toStringUtf8(), byteString2.toStringUtf8());
                        callback.operationFinished(obj2, (Object) null);
                    } else if (i == MSException.Code.BadVersion.getCode()) {
                        callback.operationFailed(obj2, PubSubException.create(PubSubProtocol.StatusCode.BAD_VERSION, "Bad version provided to delete subscriptoin data of topic " + byteString.toStringUtf8() + " subscriberId " + byteString2));
                    } else if (i == MSException.Code.NoKey.getCode()) {
                        callback.operationFailed(obj2, PubSubException.create(PubSubProtocol.StatusCode.NO_SUBSCRIPTION_STATE, "No subscription data found for (topic:" + byteString.toStringUtf8() + ", subscriber:" + byteString2.toStringUtf8() + ")."));
                    } else {
                        MsMetadataManagerFactory.logErrorAndFinishOperation("Failed to delete subscription topic: " + byteString.toStringUtf8() + ", subscriberId: " + byteString2.toStringUtf8(), callback, obj2, i, PubSubProtocol.StatusCode.SERVICE_DOWN);
                    }
                }
            }, obj);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public PubSubProtocol.SubscriptionData value2SubscriptionData(Value value) throws TextFormat.ParseException, UnsupportedEncodingException {
            PubSubProtocol.SubscriptionData.Builder newBuilder = PubSubProtocol.SubscriptionData.newBuilder();
            byte[] field = value.getField(SUB_STATE_FIELD);
            if (null != field) {
                PubSubProtocol.SubscriptionState.Builder newBuilder2 = PubSubProtocol.SubscriptionState.newBuilder();
                TextFormat.merge(new String(field, MsMetadataManagerFactory.UTF8), newBuilder2);
                newBuilder.setState(newBuilder2.build());
            }
            byte[] field2 = value.getField(SUB_PREFS_FIELD);
            if (null != field2) {
                PubSubProtocol.SubscriptionPreferences.Builder newBuilder3 = PubSubProtocol.SubscriptionPreferences.newBuilder();
                TextFormat.merge(new String(field2, MsMetadataManagerFactory.UTF8), newBuilder3);
                newBuilder.setPreferences(newBuilder3.build());
            }
            return newBuilder.build();
        }

        @Override // org.apache.hedwig.server.meta.SubscriptionDataManager
        public void readSubscriptionData(final ByteString byteString, final ByteString byteString2, final Callback<Versioned<PubSubProtocol.SubscriptionData>> callback, Object obj) {
            this.subTable.get(getSubscriptionKey(byteString, byteString2), new MetastoreCallback<Versioned<Value>>() { // from class: org.apache.hedwig.server.meta.MsMetadataManagerFactory.MsSubscriptionDataManagerImpl.4
                public void complete(int i, Versioned<Value> versioned, Object obj2) {
                    if (i == MSException.Code.NoKey.getCode()) {
                        callback.operationFinished(obj2, (Object) null);
                        return;
                    }
                    if (i != MSException.Code.OK.getCode()) {
                        MsMetadataManagerFactory.logErrorAndFinishOperation("Could not read subscription data for topic: " + byteString.toStringUtf8() + ", subscriberId: " + byteString2.toStringUtf8(), callback, obj2, i);
                        return;
                    }
                    try {
                        Versioned versioned2 = new Versioned(MsSubscriptionDataManagerImpl.this.value2SubscriptionData((Value) versioned.getValue()), versioned.getVersion());
                        if (MsMetadataManagerFactory.logger.isDebugEnabled()) {
                            MsMetadataManagerFactory.logger.debug("Found subscription while acquiring topic: " + byteString.toStringUtf8() + ", subscriberId: " + byteString2.toStringUtf8() + ", data: " + SubscriptionStateUtils.toString((PubSubProtocol.SubscriptionData) versioned2.getValue()) + ", version: " + versioned2.getVersion());
                        }
                        callback.operationFinished(obj2, versioned2);
                    } catch (UnsupportedEncodingException e) {
                        StringBuilder sb = new StringBuilder();
                        sb.append("Subscription data for topic: ").append(byteString.toStringUtf8()).append(", subscriberId: ").append(byteString2.toStringUtf8()).append(" is not UFT-8 encoded");
                        String sb2 = sb.toString();
                        MsMetadataManagerFactory.logger.error(sb2, e);
                        callback.operationFailed(obj2, new PubSubException.UnexpectedConditionException(sb2));
                    } catch (TextFormat.ParseException e2) {
                        StringBuilder sb3 = new StringBuilder();
                        sb3.append("Failed to deserialize subscription data for topic:").append(byteString.toStringUtf8()).append(", subscriberId: ").append(byteString2.toStringUtf8());
                        String sb4 = sb3.toString();
                        MsMetadataManagerFactory.logger.error(sb4, e2);
                        callback.operationFailed(obj2, new PubSubException.UnexpectedConditionException(sb4));
                    }
                }
            }, obj);
        }

        private String getSubscriptionPrefix(ByteString byteString, char c) {
            return byteString.toStringUtf8() + c;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void readSubscriptions(final ByteString byteString, final int i, final MetastoreCursor metastoreCursor, final Map<ByteString, Versioned<PubSubProtocol.SubscriptionData>> map, final Callback<Map<ByteString, Versioned<PubSubProtocol.SubscriptionData>>> callback, Object obj) {
            if (!metastoreCursor.hasMoreEntries()) {
                callback.operationFinished(obj, map);
            } else {
                metastoreCursor.asyncReadEntries(this.cfg.getMetastoreMaxEntriesPerScan(), new MetastoreCursor.ReadEntriesCallback() { // from class: org.apache.hedwig.server.meta.MsMetadataManagerFactory.MsSubscriptionDataManagerImpl.5
                    public void complete(int i2, Iterator<MetastoreTableItem> it, Object obj2) {
                        if (i2 != MSException.Code.OK.getCode()) {
                            MsMetadataManagerFactory.logErrorAndFinishOperation("Could not read subscribers for cursor " + metastoreCursor, callback, obj2, i2);
                            return;
                        }
                        while (it.hasNext()) {
                            MetastoreTableItem next = it.next();
                            ByteString copyFromUtf8 = ByteString.copyFromUtf8(next.getKey().substring(i));
                            try {
                                Versioned value = next.getValue();
                                map.put(copyFromUtf8, new Versioned(MsSubscriptionDataManagerImpl.this.value2SubscriptionData((Value) value.getValue()), value.getVersion()));
                            } catch (TextFormat.ParseException e) {
                                StringBuilder sb = new StringBuilder();
                                sb.append("Failed to deserialize subscription data for topic: ").append(byteString.toStringUtf8()).append(", subscriberId: ").append(copyFromUtf8.toStringUtf8());
                                String sb2 = sb.toString();
                                MsMetadataManagerFactory.logger.error(sb2, e);
                                callback.operationFailed(obj2, new PubSubException.UnexpectedConditionException(sb2));
                                return;
                            } catch (UnsupportedEncodingException e2) {
                                StringBuilder sb3 = new StringBuilder();
                                sb3.append("Subscription data for topic: ").append(byteString.toStringUtf8()).append(", subscriberId: ").append(copyFromUtf8.toStringUtf8()).append(" is not UTF-8 encoded.");
                                String sb4 = sb3.toString();
                                MsMetadataManagerFactory.logger.error(sb4, e2);
                                callback.operationFailed(obj2, new PubSubException.UnexpectedConditionException(sb4));
                                return;
                            }
                        }
                        MsSubscriptionDataManagerImpl.this.readSubscriptions(byteString, i, metastoreCursor, map, callback, obj2);
                    }
                }, obj);
            }
        }

        @Override // org.apache.hedwig.server.meta.SubscriptionDataManager
        public void readSubscriptions(final ByteString byteString, final Callback<Map<ByteString, Versioned<PubSubProtocol.SubscriptionData>>> callback, Object obj) {
            final String subscriptionPrefix = getSubscriptionPrefix(byteString, (char) 1);
            this.subTable.openCursor(subscriptionPrefix, true, getSubscriptionPrefix(byteString, (char) 2), true, MetastoreScannableTable.Order.ASC, MetastoreTable.ALL_FIELDS, new MetastoreCallback<MetastoreCursor>() { // from class: org.apache.hedwig.server.meta.MsMetadataManagerFactory.MsSubscriptionDataManagerImpl.6
                public void complete(int i, MetastoreCursor metastoreCursor, Object obj2) {
                    if (i != MSException.Code.OK.getCode()) {
                        MsMetadataManagerFactory.logErrorAndFinishOperation("Could not read subscribers for topic " + byteString.toStringUtf8(), callback, obj2, i);
                    } else {
                        MsSubscriptionDataManagerImpl.this.readSubscriptions(byteString, subscriptionPrefix.length(), metastoreCursor, new ConcurrentHashMap(), callback, obj2);
                    }
                }
            }, obj);
        }
    }

    /* loaded from: input_file:org/apache/hedwig/server/meta/MsMetadataManagerFactory$MsTopicOwnershipManagerImpl.class */
    static class MsTopicOwnershipManagerImpl implements TopicOwnershipManager {
        static final String OWNER_FIELD = "owner";
        final MetastoreTable ownerTable;

        MsTopicOwnershipManagerImpl(MetastoreTable metastoreTable) {
            this.ownerTable = metastoreTable;
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
        }

        @Override // org.apache.hedwig.server.meta.TopicOwnershipManager
        public void readOwnerInfo(final ByteString byteString, final Callback<Versioned<HubInfo>> callback, Object obj) {
            this.ownerTable.get(byteString.toStringUtf8(), new MetastoreCallback<Versioned<Value>>() { // from class: org.apache.hedwig.server.meta.MsMetadataManagerFactory.MsTopicOwnershipManagerImpl.1
                public void complete(int i, Versioned<Value> versioned, Object obj2) {
                    if (MSException.Code.NoKey.getCode() == i) {
                        callback.operationFinished(obj2, (Object) null);
                        return;
                    }
                    if (MSException.Code.OK.getCode() != i) {
                        MsMetadataManagerFactory.logErrorAndFinishOperation("Could not read ownership for topic " + byteString.toStringUtf8(), callback, obj2, i);
                        return;
                    }
                    HubInfo hubInfo = null;
                    try {
                        byte[] field = ((Value) versioned.getValue()).getField(MsTopicOwnershipManagerImpl.OWNER_FIELD);
                        if (field != null) {
                            hubInfo = HubInfo.parse(new String(field));
                        }
                    } catch (HubInfo.InvalidHubInfoException e) {
                        MsMetadataManagerFactory.logger.warn("Failed to parse hub info for topic " + byteString.toStringUtf8(), e);
                    }
                    callback.operationFinished(obj2, new Versioned(hubInfo, versioned.getVersion()));
                }
            }, obj);
        }

        @Override // org.apache.hedwig.server.meta.TopicOwnershipManager
        public void writeOwnerInfo(final ByteString byteString, final HubInfo hubInfo, Version version, final Callback<Version> callback, Object obj) {
            Value value = new Value();
            value.setField(OWNER_FIELD, hubInfo.toString().getBytes());
            this.ownerTable.put(byteString.toStringUtf8(), value, version, new MetastoreCallback<Version>() { // from class: org.apache.hedwig.server.meta.MsMetadataManagerFactory.MsTopicOwnershipManagerImpl.2
                public void complete(int i, Version version2, Object obj2) {
                    if (MSException.Code.OK.getCode() == i) {
                        callback.operationFinished(obj2, version2);
                        return;
                    }
                    if (MSException.Code.NoKey.getCode() == i) {
                        callback.operationFailed(obj2, PubSubException.create(PubSubProtocol.StatusCode.NO_TOPIC_OWNER_INFO, "No owner info found for topic " + byteString.toStringUtf8()));
                        return;
                    }
                    if (MSException.Code.KeyExists.getCode() == i) {
                        callback.operationFailed(obj2, PubSubException.create(PubSubProtocol.StatusCode.TOPIC_OWNER_INFO_EXISTS, "Owner info of topic " + byteString.toStringUtf8() + " existed."));
                    } else if (MSException.Code.BadVersion.getCode() == i) {
                        callback.operationFailed(obj2, PubSubException.create(PubSubProtocol.StatusCode.BAD_VERSION, "Bad version provided to update owner info of topic " + byteString.toStringUtf8()));
                    } else {
                        MsMetadataManagerFactory.logErrorAndFinishOperation("Failed to update ownership of topic " + byteString.toStringUtf8() + " to " + hubInfo, callback, obj2, i);
                    }
                }
            }, obj);
        }

        @Override // org.apache.hedwig.server.meta.TopicOwnershipManager
        public void deleteOwnerInfo(final ByteString byteString, Version version, final Callback<Void> callback, Object obj) {
            this.ownerTable.remove(byteString.toStringUtf8(), version, new MetastoreCallback<Void>() { // from class: org.apache.hedwig.server.meta.MsMetadataManagerFactory.MsTopicOwnershipManagerImpl.3
                public void complete(int i, Void r8, Object obj2) {
                    if (MSException.Code.OK.getCode() == i) {
                        MsMetadataManagerFactory.logger.debug("Successfully deleted owner info for topic {}", byteString.toStringUtf8());
                        callback.operationFinished(obj2, (Object) null);
                    } else if (MSException.Code.NoKey.getCode() == i) {
                        callback.operationFailed(obj2, PubSubException.create(PubSubProtocol.StatusCode.NO_TOPIC_OWNER_INFO, "No owner info found for topic " + byteString.toStringUtf8()));
                    } else if (MSException.Code.BadVersion.getCode() == i) {
                        callback.operationFailed(obj2, PubSubException.create(PubSubProtocol.StatusCode.BAD_VERSION, "Bad version provided to delete owner info of topic " + byteString.toStringUtf8()));
                    } else {
                        MsMetadataManagerFactory.logErrorAndFinishOperation("Failed to delete owner info for topic " + byteString.toStringUtf8(), callback, obj2, i);
                    }
                }
            }, obj);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hedwig/server/meta/MsMetadataManagerFactory$MsTopicPersistenceManagerImpl.class */
    public static class MsTopicPersistenceManagerImpl implements TopicPersistenceManager {
        static final String PERSIST_FIELD = "prst";
        final MetastoreTable persistTable;

        MsTopicPersistenceManagerImpl(MetastoreTable metastoreTable) {
            this.persistTable = metastoreTable;
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
        }

        @Override // org.apache.hedwig.server.meta.TopicPersistenceManager
        public void readTopicPersistenceInfo(final ByteString byteString, final Callback<Versioned<PubSubProtocol.LedgerRanges>> callback, Object obj) {
            this.persistTable.get(byteString.toStringUtf8(), new MetastoreCallback<Versioned<Value>>() { // from class: org.apache.hedwig.server.meta.MsMetadataManagerFactory.MsTopicPersistenceManagerImpl.1
                public void complete(int i, Versioned<Value> versioned, Object obj2) {
                    if (MSException.Code.OK.getCode() != i) {
                        if (MSException.Code.NoKey.getCode() == i) {
                            callback.operationFinished(obj2, (Object) null);
                            return;
                        } else {
                            MsMetadataManagerFactory.logErrorAndFinishOperation("Could not read ledgers node for topic " + byteString.toStringUtf8(), callback, obj2, i);
                            return;
                        }
                    }
                    byte[] field = ((Value) versioned.getValue()).getField(MsTopicPersistenceManagerImpl.PERSIST_FIELD);
                    if (field != null) {
                        MsTopicPersistenceManagerImpl.this.parseAndReturnTopicLedgerRanges(byteString, field, versioned.getVersion(), callback, obj2);
                    } else {
                        callback.operationFinished(obj2, (Object) null);
                    }
                }
            }, obj);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void parseAndReturnTopicLedgerRanges(ByteString byteString, byte[] bArr, Version version, Callback<Versioned<PubSubProtocol.LedgerRanges>> callback, Object obj) {
            try {
                PubSubProtocol.LedgerRanges.Builder newBuilder = PubSubProtocol.LedgerRanges.newBuilder();
                TextFormat.merge(new String(bArr, MsMetadataManagerFactory.UTF8), newBuilder);
                callback.operationFinished(obj, new Versioned(newBuilder.build(), version));
            } catch (TextFormat.ParseException e) {
                StringBuilder sb = new StringBuilder();
                sb.append("Ledger ranges for topic ").append(byteString.toStringUtf8()).append(" could not be deserialized.");
                String sb2 = sb.toString();
                MsMetadataManagerFactory.logger.error(sb2, e);
                callback.operationFailed(obj, new PubSubException.UnexpectedConditionException(sb2));
            } catch (UnsupportedEncodingException e2) {
                StringBuilder sb3 = new StringBuilder();
                sb3.append("Ledger ranges for topic ").append(byteString.toStringUtf8()).append(" is not UTF-8 encoded.");
                String sb4 = sb3.toString();
                MsMetadataManagerFactory.logger.error(sb4, e2);
                callback.operationFailed(obj, new PubSubException.UnexpectedConditionException(sb4));
            }
        }

        @Override // org.apache.hedwig.server.meta.TopicPersistenceManager
        public void writeTopicPersistenceInfo(final ByteString byteString, PubSubProtocol.LedgerRanges ledgerRanges, Version version, final Callback<Version> callback, Object obj) {
            Value value = new Value();
            value.setField(PERSIST_FIELD, TextFormat.printToString(ledgerRanges).getBytes());
            this.persistTable.put(byteString.toStringUtf8(), value, version, new MetastoreCallback<Version>() { // from class: org.apache.hedwig.server.meta.MsMetadataManagerFactory.MsTopicPersistenceManagerImpl.2
                public void complete(int i, Version version2, Object obj2) {
                    if (MSException.Code.OK.getCode() == i) {
                        callback.operationFinished(obj2, version2);
                        return;
                    }
                    if (MSException.Code.NoKey.getCode() == i) {
                        callback.operationFailed(obj2, PubSubException.create(PubSubProtocol.StatusCode.NO_TOPIC_PERSISTENCE_INFO, "No persistence info found for topic " + byteString.toStringUtf8()));
                        return;
                    }
                    if (MSException.Code.KeyExists.getCode() == i) {
                        callback.operationFailed(obj2, PubSubException.create(PubSubProtocol.StatusCode.TOPIC_PERSISTENCE_INFO_EXISTS, "Persistence info of topic " + byteString.toStringUtf8() + " existed."));
                    } else if (MSException.Code.BadVersion.getCode() == i) {
                        callback.operationFailed(obj2, PubSubException.create(PubSubProtocol.StatusCode.BAD_VERSION, "Bad version provided to update persistence info of topic " + byteString.toStringUtf8()));
                    } else {
                        MsMetadataManagerFactory.logErrorAndFinishOperation("Could not write ledgers node for topic " + byteString.toStringUtf8(), callback, obj2, i);
                    }
                }
            }, obj);
        }

        @Override // org.apache.hedwig.server.meta.TopicPersistenceManager
        public void deleteTopicPersistenceInfo(final ByteString byteString, final Version version, final Callback<Void> callback, Object obj) {
            this.persistTable.remove(byteString.toStringUtf8(), version, new MetastoreCallback<Void>() { // from class: org.apache.hedwig.server.meta.MsMetadataManagerFactory.MsTopicPersistenceManagerImpl.3
                public void complete(int i, Void r8, Object obj2) {
                    if (MSException.Code.OK.getCode() == i) {
                        MsMetadataManagerFactory.logger.debug("Successfully deleted persistence info for topic {}.", byteString.toStringUtf8());
                        callback.operationFinished(obj2, (Object) null);
                    } else if (MSException.Code.NoKey.getCode() == i) {
                        callback.operationFailed(obj2, PubSubException.create(PubSubProtocol.StatusCode.NO_TOPIC_PERSISTENCE_INFO, "No persistence info found for topic " + byteString.toStringUtf8()));
                    } else if (MSException.Code.BadVersion.getCode() == i) {
                        callback.operationFailed(obj2, PubSubException.create(PubSubProtocol.StatusCode.BAD_VERSION, "Bad version provided to delete persistence info of topic " + byteString.toStringUtf8()));
                    } else {
                        MsMetadataManagerFactory.logErrorAndFinishOperation("Failed to delete persistence info topic: " + byteString.toStringUtf8() + ", version: " + version, callback, obj2, i, PubSubProtocol.StatusCode.SERVICE_DOWN);
                    }
                }
            }, obj);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hedwig/server/meta/MsMetadataManagerFactory$SyncResult.class */
    public static class SyncResult<T> {
        T value;
        int rc;
        boolean finished = false;

        SyncResult() {
        }

        public synchronized void complete(int i, T t) {
            this.rc = i;
            this.value = t;
            this.finished = true;
            notify();
        }

        public synchronized void block() throws InterruptedException {
            while (!this.finished) {
                wait();
            }
        }

        public int getReturnCode() {
            return this.rc;
        }

        public T getValue() {
            return this.value;
        }
    }

    @Override // org.apache.hedwig.server.meta.MetadataManagerFactory
    public MetadataManagerFactory initialize(ServerConfiguration serverConfiguration, ZooKeeper zooKeeper, int i) throws IOException {
        if (1 != i) {
            throw new IOException("Incompatible MsMetadataManagerFactory version " + i + " found, expected version 1");
        }
        this.cfg = serverConfiguration;
        try {
            this.metastore = MetastoreFactory.createMetaStore(serverConfiguration.getMetastoreImplClass());
            this.metastore.init(serverConfiguration.getConf(), this.metastore.getVersion());
            try {
                this.ownerTable = this.metastore.createTable(OWNER_TABLE_NAME);
                if (this.ownerTable == null) {
                    throw new IOException("create owner table failed");
                }
                this.persistTable = this.metastore.createTable(PERSIST_TABLE_NAME);
                if (this.persistTable == null) {
                    throw new IOException("create persistence table failed");
                }
                this.subTable = this.metastore.createScannableTable(SUB_TABLE_NAME);
                if (this.subTable == null) {
                    throw new IOException("create subscription table failed");
                }
                return this;
            } catch (MetastoreException e) {
                throw new IOException("Failed to create tables : ", e);
            }
        } catch (Exception e2) {
            throw new IOException("Load metastore failed : ", e2);
        }
    }

    @Override // org.apache.hedwig.server.meta.MetadataManagerFactory
    public int getCurrentVersion() {
        return 1;
    }

    @Override // org.apache.hedwig.server.meta.MetadataManagerFactory
    public void shutdown() {
        if (this.metastore == null) {
            return;
        }
        if (this.ownerTable != null) {
            this.ownerTable.close();
            this.ownerTable = null;
        }
        if (this.persistTable != null) {
            this.persistTable.close();
            this.persistTable = null;
        }
        if (this.subTable != null) {
            this.subTable.close();
            this.subTable = null;
        }
        this.metastore.close();
        this.metastore = null;
    }

    @Override // org.apache.hedwig.server.meta.MetadataManagerFactory
    public Iterator<ByteString> getTopics() throws IOException {
        SyncResult syncResult = new SyncResult();
        this.persistTable.openCursor(MetastoreTable.NON_FIELDS, new MetastoreCallback<MetastoreCursor>() { // from class: org.apache.hedwig.server.meta.MsMetadataManagerFactory.1
            public void complete(int i, MetastoreCursor metastoreCursor, Object obj) {
                ((SyncResult) obj).complete(i, metastoreCursor);
            }
        }, syncResult);
        try {
            syncResult.block();
            if (syncResult.getReturnCode() != MSException.Code.OK.getCode()) {
                throw new IOException("Failed to get topics : ", MSException.create(MSException.Code.get(syncResult.getReturnCode()), ""));
            }
            final MetastoreCursor metastoreCursor = (MetastoreCursor) syncResult.getValue();
            return new Iterator<ByteString>() { // from class: org.apache.hedwig.server.meta.MsMetadataManagerFactory.2
                Iterator<MetastoreTableItem> itemIter = null;

                @Override // java.util.Iterator
                public boolean hasNext() {
                    while (true) {
                        if (null != this.itemIter && this.itemIter.hasNext()) {
                            return true;
                        }
                        if (!metastoreCursor.hasMoreEntries()) {
                            return false;
                        }
                        try {
                            this.itemIter = metastoreCursor.readEntries(MsMetadataManagerFactory.this.cfg.getMetastoreMaxEntriesPerScan());
                        } catch (MSException e) {
                            MsMetadataManagerFactory.logger.warn("Interrupted when iterating the topics list : ", e);
                            return false;
                        }
                    }
                }

                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.Iterator
                public ByteString next() {
                    return ByteString.copyFromUtf8(this.itemIter.next().getKey());
                }

                @Override // java.util.Iterator
                public void remove() {
                    throw new UnsupportedOperationException("Doesn't support remove topic from topic iterator.");
                }
            };
        } catch (Exception e) {
            throw new IOException("Interrupted on getting topics list : ", e);
        }
    }

    @Override // org.apache.hedwig.server.meta.MetadataManagerFactory
    public TopicOwnershipManager newTopicOwnershipManager() {
        return new MsTopicOwnershipManagerImpl(this.ownerTable);
    }

    @Override // org.apache.hedwig.server.meta.MetadataManagerFactory
    public TopicPersistenceManager newTopicPersistenceManager() {
        return new MsTopicPersistenceManagerImpl(this.persistTable);
    }

    @Override // org.apache.hedwig.server.meta.MetadataManagerFactory
    public SubscriptionDataManager newSubscriptionDataManager() {
        return new MsSubscriptionDataManagerImpl(this.cfg, this.subTable);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T> void logErrorAndFinishOperation(String str, Callback<T> callback, Object obj, int i, PubSubProtocol.StatusCode statusCode) {
        logger.error(str, MSException.create(MSException.Code.get(i), ""));
        callback.operationFailed(obj, PubSubException.create(statusCode, str));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <T> void logErrorAndFinishOperation(String str, Callback<T> callback, Object obj, int i) {
        logErrorAndFinishOperation(str, callback, obj, i, i == MSException.Code.NoKey.getCode() ? PubSubProtocol.StatusCode.NO_SUCH_TOPIC : i == MSException.Code.ServiceDown.getCode() ? PubSubProtocol.StatusCode.SERVICE_DOWN : PubSubProtocol.StatusCode.UNEXPECTED_CONDITION);
    }

    @Override // org.apache.hedwig.server.meta.MetadataManagerFactory
    public void format(ServerConfiguration serverConfiguration, ZooKeeper zooKeeper) throws IOException {
        try {
            int metastoreMaxEntriesPerScan = serverConfiguration.getMetastoreMaxEntriesPerScan();
            logger.info("Cleaning topic ownership table ...");
            MetastoreUtils.cleanTable(this.ownerTable, metastoreMaxEntriesPerScan);
            logger.info("Cleaned topic ownership table successfully.");
            logger.info("Cleaning topic subscription table ...");
            MetastoreUtils.cleanTable(this.subTable, metastoreMaxEntriesPerScan);
            logger.info("Cleaned topic subscription table successfully.");
            logger.info("Cleaning topic persistence info table ...");
            MetastoreUtils.cleanTable(this.persistTable, metastoreMaxEntriesPerScan);
            logger.info("Cleaned topic persistence info table successfully.");
        } catch (MSException e) {
            throw new IOException("Exception when formatting hedwig metastore : ", e);
        } catch (InterruptedException e2) {
            throw new IOException("Interrupted when formatting hedwig metastore : ", e2);
        }
    }
}
