package org.apache.pulsar.broker.service.persistent;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.StreamingStats;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.broker.stats.ReplicationMetrics;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.OffloadProcessStatus;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.compaction.CompactedTopic;
import org.apache.pulsar.compaction.CompactedTopicImpl;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.shade.com.carrotsearch.hppc.ObjectObjectHashMap;
import org.apache.pulsar.shade.com.google.common.base.MoreObjects;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.com.google.common.collect.Lists;
import org.apache.pulsar.shade.com.google.common.collect.Maps;
import org.apache.pulsar.shade.com.google.common.collect.Sets;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocal;
import org.apache.pulsar.shade.javax.ws.rs.core.Link;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.replication.ReplicationStats;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.PublisherStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.ReplicatorStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.schema.SchemaData;
import org.apache.pulsar.shade.org.apache.pulsar.common.schema.SchemaVersion;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.shade.org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.shade.org.apache.zookeeper.KeeperException;
import org.apache.pulsar.utils.StatsOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/service/persistent/PersistentTopic.class */
public class PersistentTopic implements Topic, AsyncCallbacks.AddEntryCallback {
    private final String topic;
    private final ManagedLedger ledger;
    private final BrokerService brokerService;
    public final String replicatorPrefix;
    static final String DEDUPLICATION_CURSOR_NAME = "pulsar.dedup";
    private static final double MESSAGE_EXPIRY_THRESHOLD = 1.5d;
    private static final long POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS = 60;
    private volatile long lastActive;
    private final DispatchRateLimiter dispatchRateLimiter;
    public static final int MESSAGE_RATE_BACKOFF_MS = 1000;
    private final MessageDeduplication messageDeduplication;
    private static final long COMPACTION_NEVER_RUN = -4273917950L;
    final CompactedTopic compactedTopic;
    private volatile boolean isEncryptionRequired;
    protected static final AtomicLongFieldUpdater<PersistentTopic> USAGE_COUNT_UPDATER = AtomicLongFieldUpdater.newUpdater(PersistentTopic.class, "usageCount");
    private static final FastThreadLocal<TopicStatsHelper> threadLocalTopicStats = new FastThreadLocal<TopicStatsHelper>() { // from class: org.apache.pulsar.broker.service.persistent.PersistentTopic.1
        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocal
        public TopicStatsHelper initialValue() {
            return new TopicStatsHelper();
        }
    };
    private static final Logger log = LoggerFactory.getLogger((Class<?>) PersistentTopic.class);
    private volatile long usageCount = 0;
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private volatile boolean hasBatchMessagePublished = false;
    CompletableFuture<Long> currentCompaction = CompletableFuture.completedFuture(Long.valueOf(COMPACTION_NEVER_RUN));
    CompletableFuture<MessageIdImpl> currentOffload = CompletableFuture.completedFuture((MessageIdImpl) MessageId.earliest);
    private final ConcurrentOpenHashSet<Producer> producers = new ConcurrentOpenHashSet<>(16, 1);
    private final ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions = new ConcurrentOpenHashMap<>(16, 1);
    private final ConcurrentOpenHashMap<String, Replicator> replicators = new ConcurrentOpenHashMap<>(16, 1);
    private volatile boolean isFenced = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/pulsar/broker/service/persistent/PersistentTopic$TopicStatsHelper.class */
    public static class TopicStatsHelper {
        public double averageMsgSize;
        public double aggMsgRateIn;
        public double aggMsgThroughputIn;
        public double aggMsgRateOut;
        public double aggMsgThroughputOut;
        public final ObjectObjectHashMap<String, PublisherStats> remotePublishersStats = new ObjectObjectHashMap<>();

        public TopicStatsHelper() {
            reset();
        }

        public void reset() {
            this.averageMsgSize = 0.0d;
            this.aggMsgRateIn = 0.0d;
            this.aggMsgThroughputIn = 0.0d;
            this.aggMsgRateOut = 0.0d;
            this.aggMsgThroughputOut = 0.0d;
            this.remotePublishersStats.clear();
        }
    }

    public PersistentTopic(String str, ManagedLedger managedLedger, BrokerService brokerService) throws BrokerServiceException.NamingException {
        this.isEncryptionRequired = false;
        this.topic = str;
        this.ledger = managedLedger;
        this.brokerService = brokerService;
        this.replicatorPrefix = brokerService.pulsar().getConfiguration().getReplicatorPrefix();
        USAGE_COUNT_UPDATER.set(this, 0L);
        this.dispatchRateLimiter = new DispatchRateLimiter(this);
        this.compactedTopic = new CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient());
        for (ManagedCursor managedCursor : managedLedger.getCursors()) {
            if (managedCursor.getName().startsWith(this.replicatorPrefix)) {
                String clusterName = brokerService.pulsar().getConfiguration().getClusterName();
                String remoteCluster = PersistentReplicator.getRemoteCluster(managedCursor.getName());
                if (!addReplicationCluster(remoteCluster, this, managedCursor, clusterName)) {
                    throw new BrokerServiceException.NamingException(String.valueOf(getName()) + " Failed to start replicator " + remoteCluster);
                }
            } else if (!managedCursor.getName().equals(DEDUPLICATION_CURSOR_NAME)) {
                String decode = Codec.decode(managedCursor.getName());
                this.subscriptions.put(decode, createPersistentSubscription(decode, managedCursor));
                this.subscriptions.get(decode).deactivateCursor();
            }
        }
        this.lastActive = System.nanoTime();
        this.messageDeduplication = new MessageDeduplication(brokerService.pulsar(), this, managedLedger);
        try {
            this.isEncryptionRequired = brokerService.pulsar().getConfigurationCache().policiesCache().get(AdminResource.path("policies", TopicName.get(str).getNamespace())).orElseThrow(() -> {
                return new KeeperException.NoNodeException();
            }).encryption_required;
        } catch (Exception e) {
            log.warn("[{}] Error getting policies {} and isEncryptionRequired will be set to false", str, e.getMessage());
            this.isEncryptionRequired = false;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public PersistentSubscription createPersistentSubscription(String str, ManagedCursor managedCursor) {
        Preconditions.checkNotNull(this.compactedTopic);
        return str.equals(Compactor.COMPACTION_SUBSCRIPTION) ? new CompactorSubscription(this, this.compactedTopic, str, managedCursor) : new PersistentSubscription(this, str, managedCursor);
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public void publishMessage(ByteBuf byteBuf, Topic.PublishContext publishContext) {
        if (this.messageDeduplication.shouldPublishNextMessage(publishContext, byteBuf)) {
            this.ledger.asyncAddEntry(byteBuf, this, publishContext);
        } else {
            publishContext.completed(null, -1L, -1L);
        }
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback
    public void addComplete(Position position, Object obj) {
        Topic.PublishContext publishContext = (Topic.PublishContext) obj;
        PositionImpl positionImpl = (PositionImpl) position;
        this.messageDeduplication.recordMessagePersisted(publishContext, positionImpl);
        publishContext.completed(null, positionImpl.getLedgerId(), positionImpl.getEntryId());
    }

    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback
    public void addFailed(ManagedLedgerException managedLedgerException, Object obj) {
        Topic.PublishContext publishContext = (Topic.PublishContext) obj;
        if (managedLedgerException instanceof ManagedLedgerException.ManagedLedgerAlreadyClosedException) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Failed to persist msg in store: {}", this.topic, managedLedgerException.getMessage());
            }
            publishContext.completed(new BrokerServiceException.TopicClosedException(managedLedgerException), -1L, -1L);
            return;
        }
        log.warn("[{}] Failed to persist msg in store: {}", this.topic, managedLedgerException.getMessage());
        if (managedLedgerException instanceof ManagedLedgerException.ManagedLedgerTerminatedException) {
            publishContext.completed(new BrokerServiceException.TopicTerminatedException(managedLedgerException), -1L, -1L);
        } else {
            publishContext.completed(new BrokerServiceException.PersistenceException(managedLedgerException), -1L, -1L);
        }
        if (managedLedgerException instanceof ManagedLedgerException.ManagedLedgerFencedException) {
            close();
        }
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public void addProducer(Producer producer) throws BrokerServiceException {
        Preconditions.checkArgument(producer.getTopic() == this);
        this.lock.readLock().lock();
        try {
            if (this.isFenced) {
                log.warn("[{}] Attempting to add producer to a fenced topic", this.topic);
                throw new BrokerServiceException.TopicFencedException("Topic is temporarily unavailable");
            }
            if (this.ledger.isTerminated()) {
                log.warn("[{}] Attempting to add producer to a terminated topic", this.topic);
                throw new BrokerServiceException.TopicTerminatedException("Topic was already terminated");
            }
            if (isProducersExceeded()) {
                log.warn("[{}] Attempting to add producer to topic which reached max producers limit", this.topic);
                throw new BrokerServiceException.ProducerBusyException("Topic reached max producers limit");
            }
            if (log.isDebugEnabled()) {
                log.debug("[{}] {} Got request to create producer ", this.topic, producer.getProducerName());
            }
            if (!this.producers.add(producer)) {
                throw new BrokerServiceException.NamingException("Producer with name '" + producer.getProducerName() + "' is already connected to topic");
            }
            USAGE_COUNT_UPDATER.incrementAndGet(this);
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Added producer -- count: {}", this.topic, producer.getProducerName(), Long.valueOf(USAGE_COUNT_UPDATER.get(this)));
            }
            this.messageDeduplication.producerAdded(producer.getProducerName());
            startReplProducers();
        } finally {
            this.lock.readLock().unlock();
        }
    }

    private boolean isProducersExceeded() {
        Policies policies;
        try {
            policies = this.brokerService.pulsar().getConfigurationCache().policiesCache().get(AdminResource.path("policies", TopicName.get(this.topic).getNamespace())).orElseGet(() -> {
                return new Policies();
            });
        } catch (Exception unused) {
            policies = new Policies();
        }
        int maxProducersPerTopic = policies.max_producers_per_topic > 0 ? policies.max_producers_per_topic : this.brokerService.pulsar().getConfiguration().getMaxProducersPerTopic();
        return maxProducersPerTopic > 0 && ((long) maxProducersPerTopic) <= this.producers.size();
    }

    private boolean hasLocalProducers() {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.producers.forEach(producer -> {
            if (producer.isRemote()) {
                return;
            }
            atomicBoolean.set(true);
        });
        return atomicBoolean.get();
    }

    private boolean hasRemoteProducers() {
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.producers.forEach(producer -> {
            if (producer.isRemote()) {
                atomicBoolean.set(true);
            }
        });
        return atomicBoolean.get();
    }

    public void startReplProducers() {
        try {
            Policies orElseThrow = this.brokerService.pulsar().getConfigurationCache().policiesCache().get(AdminResource.path("policies", TopicName.get(this.topic).getNamespace())).orElseThrow(() -> {
                return new KeeperException.NoNodeException();
            });
            if (orElseThrow.replication_clusters != null) {
                TreeSet newTreeSet = Sets.newTreeSet(orElseThrow.replication_clusters);
                this.replicators.forEach((str, replicator) -> {
                    if (newTreeSet.contains(str)) {
                        replicator.startProducer();
                    }
                });
            }
        } catch (Exception e) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Error getting policies while starting repl-producers {}", this.topic, e.getMessage());
            }
            this.replicators.forEach((str2, replicator2) -> {
                replicator2.startProducer();
            });
        }
    }

    public CompletableFuture<Void> stopReplProducers() {
        ArrayList newArrayList = Lists.newArrayList();
        this.replicators.forEach((str, replicator) -> {
            newArrayList.add(replicator.disconnect());
        });
        return FutureUtil.waitForAll(newArrayList);
    }

    private synchronized CompletableFuture<Void> closeReplProducersIfNoBacklog() {
        ArrayList newArrayList = Lists.newArrayList();
        this.replicators.forEach((str, replicator) -> {
            newArrayList.add(replicator.disconnect(true));
        });
        return FutureUtil.waitForAll(newArrayList);
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public void removeProducer(Producer producer) {
        Preconditions.checkArgument(producer.getTopic() == this);
        if (this.producers.remove(producer)) {
            USAGE_COUNT_UPDATER.decrementAndGet(this);
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Removed producer -- count: {}", this.topic, producer.getProducerName(), Long.valueOf(USAGE_COUNT_UPDATER.get(this)));
            }
            this.lastActive = System.nanoTime();
            this.messageDeduplication.producerRemoved(producer.getProducerName());
        }
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public CompletableFuture<Consumer> subscribe(ServerCnx serverCnx, String str, long j, PulsarApi.CommandSubscribe.SubType subType, int i, String str2, boolean z, MessageId messageId, Map<String, String> map, boolean z2, PulsarApi.CommandSubscribe.InitialPosition initialPosition) {
        CompletableFuture<Consumer> completableFuture = new CompletableFuture<>();
        if (z2 && subType != PulsarApi.CommandSubscribe.SubType.Failover && subType != PulsarApi.CommandSubscribe.SubType.Exclusive) {
            completableFuture.completeExceptionally(new BrokerServiceException.NotAllowedException("readCompacted only allowed on failover or exclusive subscriptions"));
            return completableFuture;
        }
        if (StringUtils.isBlank(str)) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Empty subscription name", this.topic);
            }
            completableFuture.completeExceptionally(new BrokerServiceException.NamingException("Empty subscription name"));
            return completableFuture;
        }
        if (this.hasBatchMessagePublished && !serverCnx.isBatchMessageCompatibleVersion()) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Consumer doesn't support batch-message {}", this.topic, str);
            }
            completableFuture.completeExceptionally(new BrokerServiceException.UnsupportedVersionException("Consumer doesn't support batch-message"));
            return completableFuture;
        }
        if (str.startsWith(this.replicatorPrefix) || str.equals(DEDUPLICATION_CURSOR_NAME)) {
            log.warn("[{}] Failed to create subscription for {}", this.topic, str);
            completableFuture.completeExceptionally(new BrokerServiceException.NamingException("Subscription with reserved subscription name attempted"));
            return completableFuture;
        }
        this.lock.readLock().lock();
        try {
            if (this.isFenced) {
                log.warn("[{}] Attempting to subscribe to a fenced topic", this.topic);
                completableFuture.completeExceptionally(new BrokerServiceException.TopicFencedException("Topic is temporarily unavailable"));
                return completableFuture;
            }
            USAGE_COUNT_UPDATER.incrementAndGet(this);
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] [{}] Added consumer -- count: {}", this.topic, str, str2, Long.valueOf(USAGE_COUNT_UPDATER.get(this)));
            }
            CompletableFuture<? extends Subscription> durableSubscription = z ? getDurableSubscription(str, initialPosition) : getNonDurableSubscription(str, messageId);
            int maxUnackedMessagesPerConsumer = z ? this.brokerService.pulsar().getConfiguration().getMaxUnackedMessagesPerConsumer() : 0;
            durableSubscription.thenAccept(subscription -> {
                try {
                    Consumer consumer = new Consumer(subscription, subType, this.topic, j, i, str2, maxUnackedMessagesPerConsumer, serverCnx, serverCnx.getRole(), map, z2, initialPosition);
                    subscription.addConsumer(consumer);
                    if (serverCnx.isActive()) {
                        log.info("[{}][{}] Created new subscription for {}", this.topic, str, Long.valueOf(j));
                        completableFuture.complete(consumer);
                    } else {
                        consumer.close();
                        if (log.isDebugEnabled()) {
                            log.debug("[{}] [{}] [{}] Subscribe failed -- count: {}", this.topic, str, consumer.consumerName(), Long.valueOf(USAGE_COUNT_UPDATER.get(this)));
                        }
                        completableFuture.completeExceptionally(new BrokerServiceException("Connection was closed while the opening the cursor "));
                    }
                } catch (BrokerServiceException e) {
                    if (e instanceof BrokerServiceException.ConsumerBusyException) {
                        log.warn("[{}][{}] Consumer {} {} already connected", this.topic, str, Long.valueOf(j), str2);
                    } else if (e instanceof BrokerServiceException.SubscriptionBusyException) {
                        log.warn("[{}][{}] {}", this.topic, str, e.getMessage());
                    }
                    USAGE_COUNT_UPDATER.decrementAndGet(this);
                    completableFuture.completeExceptionally(e);
                }
            }).exceptionally(th -> {
                log.warn("[{}] Failed to create subscription for {}: ", this.topic, str, th.getMessage());
                USAGE_COUNT_UPDATER.decrementAndGet(this);
                completableFuture.completeExceptionally(new BrokerServiceException.PersistenceException(th));
                return null;
            });
            return completableFuture;
        } finally {
            this.lock.readLock().unlock();
        }
    }

    private CompletableFuture<Subscription> getDurableSubscription(final String str, PulsarApi.CommandSubscribe.InitialPosition initialPosition) {
        final CompletableFuture<Subscription> completableFuture = new CompletableFuture<>();
        this.ledger.asyncOpenCursor(Codec.encode(str), initialPosition, new AsyncCallbacks.OpenCursorCallback() { // from class: org.apache.pulsar.broker.service.persistent.PersistentTopic.2
            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback
            public void openCursorComplete(ManagedCursor managedCursor, Object obj) {
                if (PersistentTopic.log.isDebugEnabled()) {
                    PersistentTopic.log.debug("[{}][{}] Opened cursor", PersistentTopic.this.topic, str);
                }
                CompletableFuture completableFuture2 = completableFuture;
                ConcurrentOpenHashMap concurrentOpenHashMap = PersistentTopic.this.subscriptions;
                String str2 = str;
                String str3 = str;
                completableFuture2.complete((Subscription) concurrentOpenHashMap.computeIfAbsent(str2, str4 -> {
                    return PersistentTopic.this.createPersistentSubscription(str3, managedCursor);
                }));
            }

            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback
            public void openCursorFailed(ManagedLedgerException managedLedgerException, Object obj) {
                PersistentTopic.log.warn("[{}] Failed to create subscription for {}: {}", PersistentTopic.this.topic, str, managedLedgerException.getMessage());
                PersistentTopic.USAGE_COUNT_UPDATER.decrementAndGet(PersistentTopic.this);
                completableFuture.completeExceptionally(new BrokerServiceException.PersistenceException(managedLedgerException));
                if (managedLedgerException instanceof ManagedLedgerException.ManagedLedgerFencedException) {
                    PersistentTopic.this.close();
                }
            }
        }, null);
        return completableFuture;
    }

    private CompletableFuture<? extends Subscription> getNonDurableSubscription(String str, MessageId messageId) {
        CompletableFuture<? extends Subscription> completableFuture = new CompletableFuture<>();
        log.info("[{}][{}] Creating non-durable subscription at msg id {}", this.topic, str, messageId);
        PersistentSubscription computeIfAbsent = this.subscriptions.computeIfAbsent(str, str2 -> {
            MessageIdImpl messageIdImpl = messageId != null ? (MessageIdImpl) messageId : (MessageIdImpl) MessageId.latest;
            long ledgerId = messageIdImpl.getLedgerId();
            long entryId = messageIdImpl.getEntryId();
            if ((messageIdImpl instanceof BatchMessageIdImpl) && ((BatchMessageIdImpl) messageIdImpl).getBatchIndex() >= 0) {
                entryId = messageIdImpl.getEntryId() - 1;
            }
            ManagedCursor managedCursor = null;
            try {
                managedCursor = this.ledger.newNonDurableCursor(new PositionImpl(ledgerId, entryId));
            } catch (ManagedLedgerException e) {
                completableFuture.completeExceptionally(e);
            }
            return new PersistentSubscription(this, str, managedCursor);
        });
        if (completableFuture.isDone()) {
            this.subscriptions.remove(str);
        } else {
            completableFuture.complete(computeIfAbsent);
        }
        return completableFuture;
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public CompletableFuture<Subscription> createSubscription(String str, PulsarApi.CommandSubscribe.InitialPosition initialPosition) {
        return getDurableSubscription(str, initialPosition);
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public CompletableFuture<Void> unsubscribe(final String str) {
        final CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.ledger.asyncDeleteCursor(Codec.encode(str), new AsyncCallbacks.DeleteCursorCallback() { // from class: org.apache.pulsar.broker.service.persistent.PersistentTopic.3
            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback
            public void deleteCursorComplete(Object obj) {
                if (PersistentTopic.log.isDebugEnabled()) {
                    PersistentTopic.log.debug("[{}][{}] Cursor deleted successfully", PersistentTopic.this.topic, str);
                }
                PersistentTopic.this.subscriptions.remove(str);
                completableFuture.complete(null);
                PersistentTopic.this.lastActive = System.nanoTime();
            }

            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback
            public void deleteCursorFailed(ManagedLedgerException managedLedgerException, Object obj) {
                if (PersistentTopic.log.isDebugEnabled()) {
                    PersistentTopic.log.debug("[{}][{}] Error deleting cursor for subscription", PersistentTopic.this.topic, str, managedLedgerException);
                }
                completableFuture.completeExceptionally(new BrokerServiceException.PersistenceException(managedLedgerException));
            }
        }, null);
        return completableFuture;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeSubscription(String str) {
        this.subscriptions.remove(str);
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public CompletableFuture<Void> delete() {
        return delete(false);
    }

    private CompletableFuture<Void> delete(boolean z) {
        return delete(z, false);
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public CompletableFuture<Void> deleteForcefully() {
        return delete(false, true);
    }

    private CompletableFuture<Void> delete(boolean z, boolean z2) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.lock.writeLock().lock();
        try {
            if (this.isFenced) {
                log.warn("[{}] Topic is already being closed or deleted", this.topic);
                completableFuture.completeExceptionally(new BrokerServiceException.TopicFencedException("Topic is already fenced"));
                return completableFuture;
            }
            CompletableFuture completableFuture2 = new CompletableFuture();
            if (z2) {
                ArrayList newArrayList = Lists.newArrayList();
                this.replicators.forEach((str, replicator) -> {
                    newArrayList.add(replicator.disconnect());
                });
                this.producers.forEach(producer -> {
                    newArrayList.add(producer.disconnect());
                });
                this.subscriptions.forEach((str2, persistentSubscription) -> {
                    newArrayList.add(persistentSubscription.disconnect());
                });
                FutureUtil.waitForAll(newArrayList).thenRun(() -> {
                    completableFuture2.complete(null);
                }).exceptionally(th -> {
                    log.error("[{}] Error closing clients", this.topic, th);
                    this.isFenced = false;
                    completableFuture2.completeExceptionally(th);
                    return null;
                });
            } else {
                completableFuture2.complete(null);
            }
            completableFuture2.thenAccept(r10 -> {
                if (USAGE_COUNT_UPDATER.get(this) != 0) {
                    completableFuture.completeExceptionally(new BrokerServiceException.TopicBusyException("Topic has " + USAGE_COUNT_UPDATER.get(this) + " connected producers/consumers"));
                    return;
                }
                this.isFenced = true;
                ArrayList newArrayList2 = Lists.newArrayList();
                if (!z) {
                    this.subscriptions.forEach((str3, persistentSubscription2) -> {
                        newArrayList2.add(persistentSubscription2.delete());
                    });
                } else if (!this.subscriptions.isEmpty()) {
                    this.isFenced = false;
                    completableFuture.completeExceptionally(new BrokerServiceException.TopicBusyException("Topic has subscriptions"));
                    return;
                }
                FutureUtil.waitForAll(newArrayList2).whenComplete((r8, th2) -> {
                    if (th2 == null) {
                        this.ledger.asyncDelete(new AsyncCallbacks.DeleteLedgerCallback() { // from class: org.apache.pulsar.broker.service.persistent.PersistentTopic.4
                            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback
                            public void deleteLedgerComplete(Object obj) {
                                PersistentTopic.this.brokerService.removeTopicFromCache(PersistentTopic.this.topic);
                                PersistentTopic.log.info("[{}] Topic deleted", PersistentTopic.this.topic);
                                completableFuture.complete(null);
                            }

                            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback
                            public void deleteLedgerFailed(ManagedLedgerException managedLedgerException, Object obj) {
                                PersistentTopic.this.isFenced = false;
                                PersistentTopic.log.error("[{}] Error deleting topic", PersistentTopic.this.topic, managedLedgerException);
                                completableFuture.completeExceptionally(new BrokerServiceException.PersistenceException(managedLedgerException));
                            }
                        }, null);
                        return;
                    }
                    log.error("[{}] Error deleting topic", this.topic, th2);
                    this.isFenced = false;
                    completableFuture.completeExceptionally(th2);
                });
            }).exceptionally(th2 -> {
                completableFuture.completeExceptionally(new BrokerServiceException.TopicBusyException("Failed to close clients before deleting topic."));
                return null;
            });
            return completableFuture;
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public CompletableFuture<Void> close() {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.lock.writeLock().lock();
        try {
            if (this.isFenced) {
                log.warn("[{}] Topic is already being closed or deleted", this.topic);
                completableFuture.completeExceptionally(new BrokerServiceException.TopicFencedException("Topic is already fenced"));
                return completableFuture;
            }
            this.isFenced = true;
            this.lock.writeLock().unlock();
            ArrayList newArrayList = Lists.newArrayList();
            this.replicators.forEach((str, replicator) -> {
                newArrayList.add(replicator.disconnect());
            });
            this.producers.forEach(producer -> {
                newArrayList.add(producer.disconnect());
            });
            this.subscriptions.forEach((str2, persistentSubscription) -> {
                newArrayList.add(persistentSubscription.disconnect());
            });
            FutureUtil.waitForAll(newArrayList).thenRun(() -> {
                this.ledger.asyncClose(new AsyncCallbacks.CloseCallback() { // from class: org.apache.pulsar.broker.service.persistent.PersistentTopic.5
                    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback
                    public void closeComplete(Object obj) {
                        PersistentTopic.this.brokerService.removeTopicFromCache(PersistentTopic.this.topic);
                        PersistentTopic.log.info("[{}] Topic closed", PersistentTopic.this.topic);
                        completableFuture.complete(null);
                    }

                    @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback
                    public void closeFailed(ManagedLedgerException managedLedgerException, Object obj) {
                        PersistentTopic.log.error("[{}] Failed to close managed ledger, proceeding anyway.", PersistentTopic.this.topic, managedLedgerException);
                        PersistentTopic.this.brokerService.removeTopicFromCache(PersistentTopic.this.topic);
                        completableFuture.complete(null);
                    }
                }, null);
                this.dispatchRateLimiter.close();
            }).exceptionally(th -> {
                log.error("[{}] Error closing topic", this.topic, th);
                this.isFenced = false;
                completableFuture.completeExceptionally(th);
                return null;
            });
            return completableFuture;
        } finally {
            this.lock.writeLock().unlock();
        }
    }

    private CompletableFuture<Void> checkReplicationAndRetryOnFailure() {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        checkReplication().thenAccept(r6 -> {
            log.info("[{}] Policies updated successfully", this.topic);
            completableFuture.complete(null);
        }).exceptionally(th -> {
            log.error("[{}] Policies update failed {}, scheduled retry in {} seconds", this.topic, th.getMessage(), Long.valueOf(POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS), th);
            this.brokerService.executor().schedule(this::checkReplicationAndRetryOnFailure, POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS, TimeUnit.SECONDS);
            completableFuture.completeExceptionally(th);
            return null;
        });
        return completableFuture;
    }

    public CompletableFuture<Void> checkDeduplicationStatus() {
        return this.messageDeduplication.checkStatus();
    }

    private CompletableFuture<Void> checkPersistencePolicies() {
        TopicName topicName = TopicName.get(this.topic);
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.brokerService.getManagedLedgerConfig(topicName).thenAccept(managedLedgerConfig -> {
            this.ledger.setConfig(managedLedgerConfig);
            completableFuture.complete(null);
        }).exceptionally(th -> {
            log.warn("[{}] Failed to update persistence-policies {}", this.topic, th.getMessage());
            completableFuture.completeExceptionally(th);
            return null;
        });
        return completableFuture;
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public CompletableFuture<Void> checkReplication() {
        TopicName topicName = TopicName.get(this.topic);
        if (!topicName.isGlobal()) {
            return CompletableFuture.completedFuture(null);
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] Checking replication status", topicName);
        }
        try {
            Policies orElseThrow = this.brokerService.pulsar().getConfigurationCache().policiesCache().get(AdminResource.path("policies", topicName.getNamespace())).orElseThrow(() -> {
                return new KeeperException.NoNodeException();
            });
            int i = orElseThrow.message_ttl_in_seconds;
            Set<String> newTreeSet = orElseThrow.replication_clusters != null ? Sets.newTreeSet(orElseThrow.replication_clusters) : Collections.emptySet();
            String clusterName = this.brokerService.pulsar().getConfiguration().getClusterName();
            if (TopicName.get(this.topic).isGlobal() && !newTreeSet.contains(clusterName)) {
                log.info("Deleting topic [{}] because local cluster is not part of global namespace repl list {}", newTreeSet);
                return deleteForcefully();
            }
            ArrayList newArrayList = Lists.newArrayList();
            for (String str : newTreeSet) {
                if (!str.equals(clusterName) && !this.replicators.containsKey(str)) {
                    newArrayList.add(startReplicator(str));
                }
            }
            Set set = newTreeSet;
            this.replicators.forEach((str2, replicator) -> {
                ((PersistentReplicator) replicator).updateMessageTTL(i);
                if (str2.equals(clusterName) || set.contains(str2)) {
                    return;
                }
                newArrayList.add(removeReplicator(str2));
            });
            return FutureUtil.waitForAll(newArrayList);
        } catch (Exception e) {
            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
            completableFuture.completeExceptionally(new BrokerServiceException.ServerMetadataException(e));
            return completableFuture;
        }
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public void checkMessageExpiry() {
        try {
            Policies orElseThrow = this.brokerService.pulsar().getConfigurationCache().policiesCache().get(AdminResource.path("policies", TopicName.get(this.topic).getNamespace())).orElseThrow(() -> {
                return new KeeperException.NoNodeException();
            });
            if (orElseThrow.message_ttl_in_seconds != 0) {
                this.subscriptions.forEach((str, persistentSubscription) -> {
                    persistentSubscription.expireMessages(orElseThrow.message_ttl_in_seconds);
                });
                this.replicators.forEach((str2, replicator) -> {
                    ((PersistentReplicator) replicator).expireMessages(orElseThrow.message_ttl_in_seconds);
                });
            }
        } catch (Exception unused) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Error getting policies", this.topic);
            }
        }
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public void checkMessageDeduplicationInfo() {
        this.messageDeduplication.purgeInactiveProducers();
    }

    public void checkCompaction() {
        TopicName topicName = TopicName.get(this.topic);
        try {
            Policies orElseThrow = this.brokerService.pulsar().getConfigurationCache().policiesCache().get(AdminResource.path("policies", topicName.getNamespace())).orElseThrow(() -> {
                return new KeeperException.NoNodeException();
            });
            if (orElseThrow.compaction_threshold == 0 || !this.currentCompaction.isDone()) {
                return;
            }
            PersistentSubscription persistentSubscription = this.subscriptions.get(Compactor.COMPACTION_SUBSCRIPTION);
            long estimateBacklogSize = persistentSubscription != null ? persistentSubscription.estimateBacklogSize() : this.ledger.getEstimatedBacklogSize();
            if (estimateBacklogSize > orElseThrow.compaction_threshold) {
                try {
                    triggerCompaction();
                } catch (BrokerServiceException.AlreadyRunningException unused) {
                    log.debug("[{}] Compaction already running, so don't trigger again, even though backlog({}) is over threshold({})", topicName, Long.valueOf(estimateBacklogSize), Long.valueOf(orElseThrow.compaction_threshold));
                }
            }
        } catch (Exception unused2) {
            log.debug("[{}] Error getting policies", this.topic);
        }
    }

    CompletableFuture<Void> startReplicator(final String str) {
        log.info("[{}] Starting replicator to remote: {}", this.topic, str);
        final CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        this.ledger.asyncOpenCursor(PersistentReplicator.getReplicatorName(this.replicatorPrefix, str), new AsyncCallbacks.OpenCursorCallback() { // from class: org.apache.pulsar.broker.service.persistent.PersistentTopic.6
            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback
            public void openCursorComplete(ManagedCursor managedCursor, Object obj) {
                if (PersistentTopic.this.addReplicationCluster(str, PersistentTopic.this, managedCursor, PersistentTopic.this.brokerService.pulsar().getConfiguration().getClusterName())) {
                    completableFuture.complete(null);
                } else {
                    completableFuture.completeExceptionally(new BrokerServiceException.NamingException(String.valueOf(PersistentTopic.this.getName()) + " Failed to start replicator " + str));
                }
            }

            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback
            public void openCursorFailed(ManagedLedgerException managedLedgerException, Object obj) {
                completableFuture.completeExceptionally(new BrokerServiceException.PersistenceException(managedLedgerException));
            }
        }, null);
        return completableFuture;
    }

    protected boolean addReplicationCluster(String str, PersistentTopic persistentTopic, ManagedCursor managedCursor, String str2) {
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        this.replicators.computeIfAbsent(str, str3 -> {
            try {
                return new PersistentReplicator(this, managedCursor, str2, str, this.brokerService);
            } catch (BrokerServiceException.NamingException unused) {
                atomicBoolean.set(false);
                log.error("[{}] Replicator startup failed due to partitioned-topic {}", this.topic, str);
                return null;
            }
        });
        if (!atomicBoolean.get()) {
            this.replicators.remove(str);
        }
        return atomicBoolean.get();
    }

    CompletableFuture<Void> removeReplicator(String str) {
        log.info("[{}] Removing replicator to {}", this.topic, str);
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        String replicatorName = PersistentReplicator.getReplicatorName(this.replicatorPrefix, str);
        this.replicators.get(str).disconnect().thenRun(() -> {
            this.ledger.asyncDeleteCursor(replicatorName, new AsyncCallbacks.DeleteCursorCallback() { // from class: org.apache.pulsar.broker.service.persistent.PersistentTopic.7
                @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback
                public void deleteCursorComplete(Object obj) {
                    PersistentTopic.this.replicators.remove(str);
                    completableFuture.complete(null);
                }

                @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback
                public void deleteCursorFailed(ManagedLedgerException managedLedgerException, Object obj) {
                    PersistentTopic.log.error("[{}] Failed to delete cursor {} {}", PersistentTopic.this.topic, replicatorName, managedLedgerException.getMessage(), managedLedgerException);
                    completableFuture.completeExceptionally(new BrokerServiceException.PersistenceException(managedLedgerException));
                }
            }, null);
        }).exceptionally(th -> {
            log.error("[{}] Failed to close replication producer {} {}", this.topic, replicatorName, th.getMessage(), th);
            completableFuture.completeExceptionally(th);
            return null;
        });
        return completableFuture;
    }

    public boolean isDeduplicationEnabled() {
        return this.messageDeduplication.isEnabled();
    }

    public String toString() {
        return MoreObjects.toStringHelper(this).add("topic", this.topic).toString();
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public ConcurrentOpenHashSet<Producer> getProducers() {
        return this.producers;
    }

    public int getNumberOfConsumers() {
        int i = 0;
        Iterator<PersistentSubscription> it = this.subscriptions.values().iterator();
        while (it.hasNext()) {
            i += it.next().getConsumers().size();
        }
        return i;
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public ConcurrentOpenHashMap<String, PersistentSubscription> getSubscriptions() {
        return this.subscriptions;
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public PersistentSubscription getSubscription(String str) {
        return this.subscriptions.get(str);
    }

    public BrokerService getBrokerService() {
        return this.brokerService;
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public ConcurrentOpenHashMap<String, Replicator> getReplicators() {
        return this.replicators;
    }

    public Replicator getPersistentReplicator(String str) {
        return this.replicators.get(str);
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public String getName() {
        return this.topic;
    }

    public ManagedLedger getManagedLedger() {
        return this.ledger;
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public void updateRates(NamespaceStats namespaceStats, NamespaceBundleStats namespaceBundleStats, StatsOutputStream statsOutputStream, ClusterReplicationMetrics clusterReplicationMetrics, String str, boolean z) {
        TopicStatsHelper topicStatsHelper = threadLocalTopicStats.get();
        topicStatsHelper.reset();
        this.replicators.forEach((str2, replicator) -> {
            replicator.updateRates();
        });
        namespaceStats.producerCount = (int) (namespaceStats.producerCount + this.producers.size());
        namespaceBundleStats.producerCount = (int) (namespaceBundleStats.producerCount + this.producers.size());
        statsOutputStream.startObject(this.topic);
        statsOutputStream.startList("publishers");
        this.producers.forEach(producer -> {
            producer.updateRates();
            PublisherStats stats = producer.getStats();
            topicStatsHelper.aggMsgRateIn += stats.msgRateIn;
            topicStatsHelper.aggMsgThroughputIn += stats.msgThroughputIn;
            if (producer.isRemote()) {
                topicStatsHelper.remotePublishersStats.put(producer.getRemoteCluster(), stats);
            }
            if (z) {
                StreamingStats.writePublisherStats(statsOutputStream, stats);
            }
        });
        statsOutputStream.endList();
        statsOutputStream.startObject(ReplicationStats.REPLICATION_SCOPE);
        namespaceStats.replicatorCount += topicStatsHelper.remotePublishersStats.size();
        this.replicators.forEach((str3, replicator2) -> {
            try {
                ((PersistentReplicator) replicator2).updateCursorState();
            } catch (Exception e) {
                log.warn("[{}] Failed to update cursro state ", this.topic, e);
            }
            ReplicatorStats stats = replicator2.getStats();
            PublisherStats publisherStats = topicStatsHelper.remotePublishersStats.get(replicator2.getRemoteCluster());
            if (publisherStats != null) {
                stats.msgRateIn = publisherStats.msgRateIn;
                stats.msgThroughputIn = publisherStats.msgThroughputIn;
                stats.inboundConnection = publisherStats.getAddress();
                stats.inboundConnectedSince = publisherStats.getConnectedSince();
            }
            topicStatsHelper.aggMsgRateOut += stats.msgRateOut;
            topicStatsHelper.aggMsgThroughputOut += stats.msgThroughputOut;
            statsOutputStream.startObject(str3);
            statsOutputStream.writePair("connected", stats.connected);
            statsOutputStream.writePair("msgRateExpired", stats.msgRateExpired);
            statsOutputStream.writePair("msgRateIn", stats.msgRateIn);
            statsOutputStream.writePair("msgRateOut", stats.msgRateOut);
            statsOutputStream.writePair("msgThroughputIn", stats.msgThroughputIn);
            statsOutputStream.writePair("msgThroughputOut", stats.msgThroughputOut);
            statsOutputStream.writePair("replicationBacklog", stats.replicationBacklog);
            statsOutputStream.writePair("replicationDelayInSeconds", stats.replicationDelayInSeconds);
            statsOutputStream.writePair("inboundConnection", stats.inboundConnection);
            statsOutputStream.writePair("inboundConnectedSince", stats.inboundConnectedSince);
            statsOutputStream.writePair("outboundConnection", stats.outboundConnection);
            statsOutputStream.writePair("outboundConnectedSince", stats.outboundConnectedSince);
            statsOutputStream.endObject();
            namespaceStats.msgReplBacklog += stats.replicationBacklog;
            if (clusterReplicationMetrics.isMetricsEnabled()) {
                String keyName = clusterReplicationMetrics.getKeyName(str, str3);
                ReplicationMetrics replicationMetrics = clusterReplicationMetrics.get(keyName);
                boolean z2 = false;
                if (replicationMetrics == null) {
                    replicationMetrics = ReplicationMetrics.get();
                    z2 = true;
                }
                replicationMetrics.connected += stats.connected ? 1 : 0;
                replicationMetrics.msgRateOut += stats.msgRateOut;
                replicationMetrics.msgThroughputOut += stats.msgThroughputOut;
                replicationMetrics.msgReplBacklog += stats.replicationBacklog;
                if (z2) {
                    clusterReplicationMetrics.put(keyName, replicationMetrics);
                }
            }
        });
        statsOutputStream.endObject();
        statsOutputStream.startObject("subscriptions");
        namespaceStats.subsCount = (int) (namespaceStats.subsCount + this.subscriptions.size());
        this.subscriptions.forEach((str4, persistentSubscription) -> {
            double d = 0.0d;
            double d2 = 0.0d;
            double d3 = 0.0d;
            try {
                statsOutputStream.startObject(str4);
                Consumer[] array = persistentSubscription.getConsumers().array();
                namespaceStats.consumerCount += array.length;
                namespaceBundleStats.consumerCount += array.length;
                statsOutputStream.startList("consumers");
                for (Consumer consumer : array) {
                    consumer.updateRates();
                    ConsumerStats stats = consumer.getStats();
                    d += stats.msgRateOut;
                    d2 += stats.msgThroughputOut;
                    d3 += stats.msgRateRedeliver;
                    StreamingStats.writeConsumerStats(statsOutputStream, persistentSubscription.getType(), stats);
                }
                statsOutputStream.endList();
                statsOutputStream.writePair("msgBacklog", persistentSubscription.getNumberOfEntriesInBacklog());
                statsOutputStream.writePair("msgRateExpired", persistentSubscription.getExpiredMessageRate());
                statsOutputStream.writePair("msgRateOut", d);
                statsOutputStream.writePair("msgThroughputOut", d2);
                statsOutputStream.writePair("msgRateRedeliver", d3);
                statsOutputStream.writePair("numberOfEntriesSinceFirstNotAckedMessage", persistentSubscription.getNumberOfEntriesSinceFirstNotAckedMessage());
                statsOutputStream.writePair("totalNonContiguousDeletedMessagesRange", persistentSubscription.getTotalNonContiguousDeletedMessagesRange());
                statsOutputStream.writePair(Link.TYPE, persistentSubscription.getTypeString());
                if (PulsarApi.CommandSubscribe.SubType.Shared.equals(persistentSubscription.getType()) && (persistentSubscription.getDispatcher() instanceof PersistentDispatcherMultipleConsumers)) {
                    statsOutputStream.writePair("blockedSubscriptionOnUnackedMsgs", ((PersistentDispatcherMultipleConsumers) persistentSubscription.getDispatcher()).isBlockedDispatcherOnUnackedMsgs());
                    statsOutputStream.writePair("unackedMessages", r0.getTotalUnackedMessages());
                }
                statsOutputStream.endObject();
                topicStatsHelper.aggMsgRateOut += d;
                topicStatsHelper.aggMsgThroughputOut += d2;
                namespaceStats.msgBacklog += persistentSubscription.getNumberOfEntriesInBacklog();
            } catch (Exception e) {
                log.error("Got exception when creating consumer stats for subscription {}: {}", str4, e.getMessage(), e);
            }
        });
        statsOutputStream.endObject();
        topicStatsHelper.averageMsgSize = topicStatsHelper.aggMsgRateIn == 0.0d ? 0.0d : topicStatsHelper.aggMsgThroughputIn / topicStatsHelper.aggMsgRateIn;
        statsOutputStream.writePair("producerCount", this.producers.size());
        statsOutputStream.writePair("averageMsgSize", topicStatsHelper.averageMsgSize);
        statsOutputStream.writePair("msgRateIn", topicStatsHelper.aggMsgRateIn);
        statsOutputStream.writePair("msgRateOut", topicStatsHelper.aggMsgRateOut);
        statsOutputStream.writePair("msgThroughputIn", topicStatsHelper.aggMsgThroughputIn);
        statsOutputStream.writePair("msgThroughputOut", topicStatsHelper.aggMsgThroughputOut);
        statsOutputStream.writePair("storageSize", this.ledger.getEstimatedBacklogSize());
        statsOutputStream.writePair("pendingAddEntriesCount", ((ManagedLedgerImpl) this.ledger).getPendingAddEntriesCount());
        namespaceStats.msgRateIn += topicStatsHelper.aggMsgRateIn;
        namespaceStats.msgRateOut += topicStatsHelper.aggMsgRateOut;
        namespaceStats.msgThroughputIn += topicStatsHelper.aggMsgThroughputIn;
        namespaceStats.msgThroughputOut += topicStatsHelper.aggMsgThroughputOut;
        namespaceStats.storageSize += this.ledger.getEstimatedBacklogSize();
        namespaceBundleStats.msgRateIn += topicStatsHelper.aggMsgRateIn;
        namespaceBundleStats.msgRateOut += topicStatsHelper.aggMsgRateOut;
        namespaceBundleStats.msgThroughputIn += topicStatsHelper.aggMsgThroughputIn;
        namespaceBundleStats.msgThroughputOut += topicStatsHelper.aggMsgThroughputOut;
        namespaceBundleStats.cacheSize += ((ManagedLedgerImpl) this.ledger).getCacheSize();
        statsOutputStream.endObject();
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public TopicStats getStats() {
        TopicStats topicStats = new TopicStats();
        ObjectObjectHashMap objectObjectHashMap = new ObjectObjectHashMap();
        this.producers.forEach(producer -> {
            PublisherStats stats = producer.getStats();
            topicStats.msgRateIn += stats.msgRateIn;
            topicStats.msgThroughputIn += stats.msgThroughputIn;
            if (producer.isRemote()) {
                objectObjectHashMap.put(producer.getRemoteCluster(), stats);
            } else {
                topicStats.publishers.add(stats);
            }
        });
        topicStats.averageMsgSize = topicStats.msgRateIn == 0.0d ? 0.0d : topicStats.msgThroughputIn / topicStats.msgRateIn;
        this.subscriptions.forEach((str, persistentSubscription) -> {
            SubscriptionStats stats = persistentSubscription.getStats();
            topicStats.msgRateOut += stats.msgRateOut;
            topicStats.msgThroughputOut += stats.msgThroughputOut;
            topicStats.subscriptions.put(str, stats);
        });
        this.replicators.forEach((str2, replicator) -> {
            ReplicatorStats stats = replicator.getStats();
            PublisherStats publisherStats = (PublisherStats) objectObjectHashMap.get(replicator.getRemoteCluster());
            if (publisherStats != null) {
                stats.msgRateIn = publisherStats.msgRateIn;
                stats.msgThroughputIn = publisherStats.msgThroughputIn;
                stats.inboundConnection = publisherStats.getAddress();
                stats.inboundConnectedSince = publisherStats.getConnectedSince();
            }
            topicStats.msgRateOut += stats.msgRateOut;
            topicStats.msgThroughputOut += stats.msgThroughputOut;
            topicStats.replication.put(replicator.getRemoteCluster(), stats);
        });
        topicStats.storageSize = this.ledger.getEstimatedBacklogSize();
        topicStats.deduplicationStatus = this.messageDeduplication.getStatus().toString();
        return topicStats;
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public PersistentTopicInternalStats getInternalStats() {
        PersistentTopicInternalStats persistentTopicInternalStats = new PersistentTopicInternalStats();
        ManagedLedgerImpl managedLedgerImpl = (ManagedLedgerImpl) this.ledger;
        persistentTopicInternalStats.entriesAddedCounter = managedLedgerImpl.getEntriesAddedCounter();
        persistentTopicInternalStats.numberOfEntries = managedLedgerImpl.getNumberOfEntries();
        persistentTopicInternalStats.totalSize = managedLedgerImpl.getTotalSize();
        persistentTopicInternalStats.currentLedgerEntries = managedLedgerImpl.getCurrentLedgerEntries();
        persistentTopicInternalStats.currentLedgerSize = managedLedgerImpl.getCurrentLedgerSize();
        persistentTopicInternalStats.lastLedgerCreatedTimestamp = DateFormatter.format(managedLedgerImpl.getLastLedgerCreatedTimestamp());
        if (managedLedgerImpl.getLastLedgerCreationFailureTimestamp() != 0) {
            persistentTopicInternalStats.lastLedgerCreationFailureTimestamp = DateFormatter.format(managedLedgerImpl.getLastLedgerCreationFailureTimestamp());
        }
        persistentTopicInternalStats.waitingCursorsCount = managedLedgerImpl.getWaitingCursorsCount();
        persistentTopicInternalStats.pendingAddEntriesCount = managedLedgerImpl.getPendingAddEntriesCount();
        persistentTopicInternalStats.lastConfirmedEntry = managedLedgerImpl.getLastConfirmedEntry().toString();
        persistentTopicInternalStats.state = managedLedgerImpl.getState().toString();
        persistentTopicInternalStats.ledgers = Lists.newArrayList();
        managedLedgerImpl.getLedgersInfo().forEach((l, ledgerInfo) -> {
            PersistentTopicInternalStats.LedgerInfo ledgerInfo = new PersistentTopicInternalStats.LedgerInfo();
            ledgerInfo.ledgerId = ledgerInfo.getLedgerId();
            ledgerInfo.entries = ledgerInfo.getEntries();
            ledgerInfo.size = ledgerInfo.getSize();
            ledgerInfo.offloaded = ledgerInfo.hasOffloadContext() && ledgerInfo.getOffloadContext().getComplete();
            persistentTopicInternalStats.ledgers.add(ledgerInfo);
        });
        persistentTopicInternalStats.cursors = Maps.newTreeMap();
        managedLedgerImpl.getCursors().forEach(managedCursor -> {
            ManagedCursorImpl managedCursorImpl = (ManagedCursorImpl) managedCursor;
            PersistentTopicInternalStats.CursorStats cursorStats = new PersistentTopicInternalStats.CursorStats();
            cursorStats.markDeletePosition = managedCursorImpl.getMarkDeletedPosition().toString();
            cursorStats.readPosition = managedCursorImpl.getReadPosition().toString();
            cursorStats.waitingReadOp = managedCursorImpl.hasPendingReadRequest();
            cursorStats.pendingReadOps = managedCursorImpl.getPendingReadOpsCount();
            cursorStats.messagesConsumedCounter = managedCursorImpl.getMessagesConsumedCounter();
            cursorStats.cursorLedger = managedCursorImpl.getCursorLedger();
            cursorStats.cursorLedgerLastEntry = managedCursorImpl.getCursorLedgerLastEntry();
            cursorStats.individuallyDeletedMessages = managedCursorImpl.getIndividuallyDeletedMessages();
            cursorStats.lastLedgerSwitchTimestamp = DateFormatter.format(managedCursorImpl.getLastLedgerSwitchTimestamp());
            cursorStats.state = managedCursorImpl.getState();
            cursorStats.numberOfEntriesSinceFirstNotAckedMessage = managedCursorImpl.getNumberOfEntriesSinceFirstNotAckedMessage();
            cursorStats.totalNonContiguousDeletedMessagesRange = managedCursorImpl.getTotalNonContiguousDeletedMessagesRange();
            cursorStats.properties = managedCursorImpl.getProperties();
            persistentTopicInternalStats.cursors.put(managedCursorImpl.getName(), cursorStats);
        });
        return persistentTopicInternalStats;
    }

    public long getBacklogSize() {
        return this.ledger.getEstimatedBacklogSize();
    }

    public boolean isActive() {
        return TopicName.get(this.topic).isGlobal() ? !this.subscriptions.isEmpty() || hasLocalProducers() : (USAGE_COUNT_UPDATER.get(this) == 0 && this.subscriptions.isEmpty()) ? false : true;
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public void checkGC(int i) {
        if (isActive()) {
            this.lastActive = System.nanoTime();
            return;
        }
        if (System.nanoTime() - this.lastActive >= TimeUnit.SECONDS.toNanos(i) && !shouldTopicBeRetained()) {
            CompletableFuture completableFuture = new CompletableFuture();
            if (TopicName.get(this.topic).isGlobal()) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Global topic inactive for {} seconds, closing repl producers.", this.topic, Integer.valueOf(i));
                }
                closeReplProducersIfNoBacklog().thenRun(() -> {
                    if (!hasRemoteProducers()) {
                        log.info("[{}] Global topic inactive for {} seconds, closed repl producers", this.topic, Integer.valueOf(i));
                        completableFuture.complete(null);
                    } else {
                        if (log.isDebugEnabled()) {
                            log.debug("[{}] Global topic has connected remote producers. Not a candidate for GC", this.topic);
                        }
                        completableFuture.completeExceptionally(new BrokerServiceException.TopicBusyException("Topic has connected remote producers"));
                    }
                }).exceptionally(th -> {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Global topic has replication backlog. Not a candidate for GC", this.topic);
                    }
                    completableFuture.completeExceptionally(th.getCause());
                    return null;
                });
            } else {
                completableFuture.complete(null);
            }
            completableFuture.thenCompose(r4 -> {
                return delete(true);
            }).thenRun(() -> {
                log.info("[{}] Topic deleted successfully due to inactivity", this.topic);
            }).exceptionally(th2 -> {
                if (!(th2.getCause() instanceof BrokerServiceException.TopicBusyException)) {
                    log.warn("[{}] Inactive topic deletion failed", this.topic, th2);
                    return null;
                }
                if (!log.isDebugEnabled()) {
                    return null;
                }
                log.debug("[{}] Did not delete busy topic: {}", this.topic, th2.getCause().getMessage());
                return null;
            });
        }
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public void checkInactiveSubscriptions() {
        long millis = TimeUnit.MINUTES.toMillis(this.brokerService.pulsar().getConfiguration().getSubscriptionExpirationTimeMinutes());
        if (millis <= 0) {
            return;
        }
        this.subscriptions.forEach((str, persistentSubscription) -> {
            if ((persistentSubscription.dispatcher == null || !persistentSubscription.dispatcher.isConsumerConnected()) && System.currentTimeMillis() - persistentSubscription.cursor.getLastActive() > millis) {
                persistentSubscription.delete().thenAccept(r7 -> {
                    log.info("[{}][{}] The subscription was deleted due to expiration", this.topic, str);
                });
            }
        });
    }

    private boolean shouldTopicBeRetained() {
        try {
            return ((Boolean) this.brokerService.pulsar().getConfigurationCache().policiesCache().get(AdminResource.path("policies", TopicName.get(this.topic).getNamespace())).map(policies -> {
                return policies.retention_policies;
            }).map(retentionPolicies -> {
                long nanos = TimeUnit.MINUTES.toNanos(retentionPolicies.getRetentionTimeInMinutes());
                return nanos < 0 || System.nanoTime() - this.lastActive < nanos;
            }).orElse(false)).booleanValue();
        } catch (Exception unused) {
            if (!log.isDebugEnabled()) {
                return true;
            }
            log.debug("[{}] Error getting policies", this.topic);
            return true;
        }
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public CompletableFuture<Void> onPoliciesUpdate(Policies policies) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] isEncryptionRequired changes: {} -> {}", this.topic, Boolean.valueOf(this.isEncryptionRequired), Boolean.valueOf(policies.encryption_required));
        }
        this.isEncryptionRequired = policies.encryption_required;
        this.producers.forEach(producer -> {
            producer.checkPermissions();
            producer.checkEncryption();
        });
        this.subscriptions.forEach((str, persistentSubscription) -> {
            persistentSubscription.getConsumers().forEach((v0) -> {
                v0.checkPermissions();
            });
            if (persistentSubscription.getDispatcher().getRateLimiter() != null) {
                persistentSubscription.getDispatcher().getRateLimiter().onPoliciesUpdate(policies);
            }
        });
        checkMessageExpiry();
        CompletableFuture<Void> checkReplicationAndRetryOnFailure = checkReplicationAndRetryOnFailure();
        CompletableFuture<Void> checkDeduplicationStatus = checkDeduplicationStatus();
        CompletableFuture<Void> checkPersistencePolicies = checkPersistencePolicies();
        this.dispatchRateLimiter.onPoliciesUpdate(policies);
        return CompletableFuture.allOf(checkReplicationAndRetryOnFailure, checkDeduplicationStatus, checkPersistencePolicies);
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public BacklogQuota getBacklogQuota() {
        String namespace = TopicName.get(getName()).getNamespace();
        return this.brokerService.getBacklogQuotaManager().getBacklogQuota(namespace, AdminResource.path("policies", namespace));
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public boolean isBacklogQuotaExceeded(String str) {
        BacklogQuota backlogQuota = getBacklogQuota();
        if (backlogQuota == null) {
            return false;
        }
        BacklogQuota.RetentionPolicy policy = backlogQuota.getPolicy();
        if ((policy != BacklogQuota.RetentionPolicy.producer_request_hold && policy != BacklogQuota.RetentionPolicy.producer_exception) || !this.brokerService.isBacklogExceeded(this)) {
            return false;
        }
        log.info("[{}] Backlog quota exceeded. Cannot create producer [{}]", getName(), str);
        return true;
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public boolean isEncryptionRequired() {
        return this.isEncryptionRequired;
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public boolean isReplicated() {
        return !this.replicators.isEmpty();
    }

    public CompletableFuture<MessageId> terminate() {
        final CompletableFuture<MessageId> completableFuture = new CompletableFuture<>();
        this.ledger.asyncTerminate(new AsyncCallbacks.TerminateCallback() { // from class: org.apache.pulsar.broker.service.persistent.PersistentTopic.8
            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback
            public void terminateComplete(Position position, Object obj) {
                PersistentTopic.this.producers.forEach((v0) -> {
                    v0.disconnect();
                });
                PersistentTopic.this.subscriptions.forEach((str, persistentSubscription) -> {
                    persistentSubscription.topicTerminated();
                });
                PositionImpl positionImpl = (PositionImpl) position;
                MessageIdImpl messageIdImpl = new MessageIdImpl(positionImpl.getLedgerId(), positionImpl.getEntryId(), -1);
                PersistentTopic.log.info("[{}] Topic terminated at {}", PersistentTopic.this.getName(), messageIdImpl);
                completableFuture.complete(messageIdImpl);
            }

            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback
            public void terminateFailed(ManagedLedgerException managedLedgerException, Object obj) {
                completableFuture.completeExceptionally(managedLedgerException);
            }
        }, null);
        return completableFuture;
    }

    public boolean isOldestMessageExpired(ManagedCursor managedCursor, long j) {
        boolean z;
        MessageImpl<byte[]> messageImpl = null;
        Entry entry = null;
        boolean z2 = false;
        try {
            try {
                entry = managedCursor.getNthEntry(1, ManagedCursor.IndividualDeletedEntries.Include);
                if (entry != null) {
                    messageImpl = MessageImpl.deserialize(entry.getDataBuffer());
                    if (j != 0) {
                        if (System.currentTimeMillis() > messageImpl.getPublishTime() + TimeUnit.SECONDS.toMillis((long) (j * MESSAGE_EXPIRY_THRESHOLD))) {
                            z = true;
                            z2 = z;
                        }
                    }
                    z = false;
                    z2 = z;
                }
                if (entry != null) {
                    entry.release();
                }
                if (messageImpl != null) {
                    messageImpl.recycle();
                }
            } catch (Exception e) {
                log.warn("[{}] Error while getting the oldest message", this.topic, e);
                if (entry != null) {
                    entry.release();
                }
                if (messageImpl != null) {
                    messageImpl.recycle();
                }
            }
            return z2;
        } catch (Throwable th) {
            if (entry != null) {
                entry.release();
            }
            if (messageImpl != null) {
                messageImpl.recycle();
            }
            throw th;
        }
    }

    public CompletableFuture<Void> clearBacklog() {
        log.info("[{}] Clearing backlog on all cursors in the topic.", this.topic);
        ArrayList newArrayList = Lists.newArrayList();
        List<String> keys = getSubscriptions().keys();
        keys.addAll(getReplicators().keys());
        Iterator<String> it = keys.iterator();
        while (it.hasNext()) {
            newArrayList.add(clearBacklog(it.next()));
        }
        return FutureUtil.waitForAll(newArrayList);
    }

    public CompletableFuture<Void> clearBacklog(String str) {
        log.info("[{}] Clearing backlog for cursor {} in the topic.", this.topic, str);
        PersistentSubscription subscription = getSubscription(str);
        if (subscription != null) {
            return subscription.clearBacklog();
        }
        PersistentReplicator persistentReplicator = (PersistentReplicator) getPersistentReplicator(str);
        return persistentReplicator != null ? persistentReplicator.clearBacklog() : FutureUtil.failedFuture(new BrokerServiceException("Cursor not found"));
    }

    public void markBatchMessagePublished() {
        this.hasBatchMessagePublished = true;
    }

    public DispatchRateLimiter getDispatchRateLimiter() {
        return this.dispatchRateLimiter;
    }

    public long getLastPublishedSequenceId(String str) {
        return this.messageDeduplication.getLastPublishedSequenceId(str);
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public Position getLastMessageId() {
        return this.ledger.getLastConfirmedEntry();
    }

    public synchronized void triggerCompaction() throws PulsarServerException, BrokerServiceException.AlreadyRunningException {
        if (!this.currentCompaction.isDone()) {
            throw new BrokerServiceException.AlreadyRunningException("Compaction already in progress");
        }
        this.currentCompaction = this.brokerService.pulsar().getCompactor().compact(this.topic);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v4 */
    public synchronized LongRunningProcessStatus compactionStatus() {
        ?? r0 = this;
        synchronized (r0) {
            CompletableFuture<Long> completableFuture = this.currentCompaction;
            r0 = r0;
            if (!completableFuture.isDone()) {
                return LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.RUNNING);
            }
            try {
                return completableFuture.join().longValue() == COMPACTION_NEVER_RUN ? LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.NOT_RUN) : LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.SUCCESS);
            } catch (CancellationException | CompletionException e) {
                return LongRunningProcessStatus.forError(e.getMessage());
            }
        }
    }

    public synchronized void triggerOffload(MessageIdImpl messageIdImpl) throws BrokerServiceException.AlreadyRunningException {
        if (!this.currentOffload.isDone()) {
            throw new BrokerServiceException.AlreadyRunningException("Offload already in progress");
        }
        final CompletableFuture<MessageIdImpl> completableFuture = new CompletableFuture<>();
        this.currentOffload = completableFuture;
        getManagedLedger().asyncOffloadPrefix(PositionImpl.get(messageIdImpl.getLedgerId(), messageIdImpl.getEntryId()), new AsyncCallbacks.OffloadCallback() { // from class: org.apache.pulsar.broker.service.persistent.PersistentTopic.9
            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.OffloadCallback
            public void offloadComplete(Position position, Object obj) {
                PositionImpl positionImpl = (PositionImpl) position;
                completableFuture.complete(new MessageIdImpl(positionImpl.getLedgerId(), positionImpl.getEntryId(), -1));
            }

            @Override // org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks.OffloadCallback
            public void offloadFailed(ManagedLedgerException managedLedgerException, Object obj) {
                completableFuture.completeExceptionally(managedLedgerException);
            }
        }, null);
    }

    public synchronized OffloadProcessStatus offloadStatus() {
        if (!this.currentOffload.isDone()) {
            return OffloadProcessStatus.forStatus(LongRunningProcessStatus.Status.RUNNING);
        }
        try {
            return this.currentOffload.join() == MessageId.earliest ? OffloadProcessStatus.forStatus(LongRunningProcessStatus.Status.NOT_RUN) : OffloadProcessStatus.forSuccess(this.currentOffload.join());
        } catch (CancellationException | CompletionException e) {
            return OffloadProcessStatus.forError(e.getMessage());
        }
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public CompletableFuture<Boolean> hasSchema() {
        return this.brokerService.pulsar().getSchemaRegistryService().getSchema(TopicName.get(TopicName.get(getName()).getPartitionedTopicName()).getSchemaName()).thenApply(schemaAndMetadata -> {
            return Boolean.valueOf(schemaAndMetadata != null);
        });
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public CompletableFuture<SchemaVersion> addSchema(SchemaData schemaData) {
        if (schemaData == null) {
            return CompletableFuture.completedFuture(SchemaVersion.Empty);
        }
        return this.brokerService.pulsar().getSchemaRegistryService().putSchemaIfAbsent(TopicName.get(TopicName.get(getName()).getPartitionedTopicName()).getSchemaName(), schemaData);
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public CompletableFuture<Boolean> isSchemaCompatible(SchemaData schemaData) {
        return this.brokerService.pulsar().getSchemaRegistryService().isCompatibleWithLatestVersion(TopicName.get(TopicName.get(getName()).getPartitionedTopicName()).getSchemaName(), schemaData);
    }

    @Override // org.apache.pulsar.broker.service.Topic
    public CompletableFuture<Boolean> addSchemaIfIdleOrCheckCompatible(SchemaData schemaData) {
        return hasSchema().thenCompose(bool -> {
            return (bool.booleanValue() || isActive() || this.ledger.getTotalSize() != 0) ? isSchemaCompatible(schemaData) : addSchema(schemaData).thenApply(schemaVersion -> {
                return true;
            });
        });
    }
}
