package org.apache.hedwig.server.persistence;

import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.io.IOException;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.protocol.PubSubProtocol;
import org.apache.hedwig.protoextensions.MessageIdUtils;
import org.apache.hedwig.server.common.ServerConfiguration;
import org.apache.hedwig.server.common.TopicOpQueuer;
import org.apache.hedwig.server.common.UnexpectedError;
import org.apache.hedwig.server.meta.MetadataManagerFactory;
import org.apache.hedwig.server.meta.TopicPersistenceManager;
import org.apache.hedwig.server.persistence.ScanCallback;
import org.apache.hedwig.server.topics.TopicManager;
import org.apache.hedwig.server.topics.TopicOwnershipChangeListener;
import org.apache.hedwig.util.Callback;
import org.apache.hedwig.util.VarArgs;
import org.apache.hedwig.zookeeper.SafeAsynBKCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hedwig/server/persistence/BookkeeperPersistenceManager.class */
public class BookkeeperPersistenceManager implements PersistenceManagerWithRangeScan, TopicOwnershipChangeListener {
    private BookKeeper bk;
    private TopicPersistenceManager tpManager;
    private ServerConfiguration cfg;
    private TopicManager tm;
    private static final long START_SEQ_ID = 1;
    private static final long UNLIMITED_ENTRIES = 0;
    private final long maxEntriesPerLedger;
    Map<ByteString, TopicInfo> topicInfos = new ConcurrentHashMap();
    TopicOpQueuer queuer;
    static Logger logger = LoggerFactory.getLogger(BookkeeperPersistenceManager.class);
    static byte[] passwd = "sillysecret".getBytes();
    static SafeAsynBKCallback.CloseCallback noOpCloseCallback = new SafeAsynBKCallback.CloseCallback() { // from class: org.apache.hedwig.server.persistence.BookkeeperPersistenceManager.5
        @Override // org.apache.hedwig.zookeeper.SafeAsynBKCallback.CloseCallback
        public void safeCloseComplete(int i, LedgerHandle ledgerHandle, Object obj) {
        }
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hedwig/server/persistence/BookkeeperPersistenceManager$AcquireOp.class */
    public class AcquireOp extends TopicOpQueuer.AsynchronousOp<Void> {
        /* JADX WARN: Illegal instructions before constructor call */
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public AcquireOp(com.google.protobuf.ByteString r8, org.apache.hedwig.util.Callback<java.lang.Void> r9, java.lang.Object r10) {
            /*
                r6 = this;
                r0 = r6
                r1 = r7
                org.apache.hedwig.server.persistence.BookkeeperPersistenceManager.this = r1
                r0 = r6
                r1 = r7
                org.apache.hedwig.server.common.TopicOpQueuer r1 = r1.queuer
                r2 = r1
                java.lang.Class r2 = r2.getClass()
                r2 = r8
                r3 = r9
                r4 = r10
                r0.<init>(r2, r3, r4)
                return
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.hedwig.server.persistence.BookkeeperPersistenceManager.AcquireOp.<init>(org.apache.hedwig.server.persistence.BookkeeperPersistenceManager, com.google.protobuf.ByteString, org.apache.hedwig.util.Callback, java.lang.Object):void");
        }

        @Override // java.lang.Runnable
        public void run() {
            if (BookkeeperPersistenceManager.this.topicInfos.containsKey(this.topic)) {
                this.cb.operationFinished(this.ctx, (Object) null);
            } else {
                BookkeeperPersistenceManager.this.tpManager.readTopicPersistenceInfo(this.topic, new Callback<Versioned<PubSubProtocol.LedgerRanges>>() { // from class: org.apache.hedwig.server.persistence.BookkeeperPersistenceManager.AcquireOp.1
                    public void operationFinished(Object obj, Versioned<PubSubProtocol.LedgerRanges> versioned) {
                        if (null != versioned) {
                            AcquireOp.this.processTopicLedgerRanges((PubSubProtocol.LedgerRanges) versioned.getValue(), versioned.getVersion());
                        } else {
                            AcquireOp.this.processTopicLedgerRanges(PubSubProtocol.LedgerRanges.getDefaultInstance(), Version.NEW);
                        }
                    }

                    public void operationFailed(Object obj, PubSubException pubSubException) {
                        AcquireOp.this.cb.operationFailed(obj, pubSubException);
                    }
                }, this.ctx);
            }
        }

        void processTopicLedgerRanges(PubSubProtocol.LedgerRanges ledgerRanges, Version version) {
            List<PubSubProtocol.LedgerRange> rangesList = ledgerRanges.getRangesList();
            if (rangesList.isEmpty()) {
                processTopicLedgerRanges(rangesList, version, BookkeeperPersistenceManager.START_SEQ_ID);
                return;
            }
            PubSubProtocol.LedgerRange ledgerRange = rangesList.get(0);
            if (ledgerRange.hasStartSeqIdIncluded()) {
                processTopicLedgerRanges(rangesList, version, ledgerRange.getStartSeqIdIncluded());
            } else {
                getStartSeqIdToProcessTopicLedgerRanges(rangesList, version);
            }
        }

        void getStartSeqIdToProcessTopicLedgerRanges(final List<PubSubProtocol.LedgerRange> list, final Version version) {
            final PubSubProtocol.LedgerRange ledgerRange = list.get(0);
            if (!ledgerRange.hasEndSeqIdIncluded()) {
                processTopicLedgerRanges(list, version, BookkeeperPersistenceManager.START_SEQ_ID);
            } else {
                final long ledgerId = ledgerRange.getLedgerId();
                BookkeeperPersistenceManager.this.bk.asyncOpenLedger(ledgerId, BookKeeper.DigestType.CRC32, BookkeeperPersistenceManager.passwd, new SafeAsynBKCallback.OpenCallback() { // from class: org.apache.hedwig.server.persistence.BookkeeperPersistenceManager.AcquireOp.2
                    @Override // org.apache.hedwig.zookeeper.SafeAsynBKCallback.OpenCallback
                    public void safeOpenComplete(int i, LedgerHandle ledgerHandle, Object obj) {
                        if (i == -7) {
                            AcquireOp.this.processTopicLedgerRanges(list, version, BookkeeperPersistenceManager.START_SEQ_ID);
                            return;
                        }
                        if (i != 0) {
                            BKException create = BKException.create(i);
                            BookkeeperPersistenceManager.logger.error("Could not open ledger {} to get start seq id while acquiring topic {} : {}", VarArgs.va(new Object[]{Long.valueOf(ledgerId), AcquireOp.this.topic.toStringUtf8(), create}));
                            AcquireOp.this.cb.operationFailed(obj, new PubSubException.ServiceDownException(create));
                            return;
                        }
                        long lastAddConfirmed = ledgerHandle.getLastAddConfirmed() + BookkeeperPersistenceManager.START_SEQ_ID;
                        try {
                            ledgerHandle.close();
                        } catch (InterruptedException e) {
                        } catch (BKException e2) {
                        }
                        if (lastAddConfirmed > BookkeeperPersistenceManager.UNLIMITED_ENTRIES) {
                            AcquireOp.this.processTopicLedgerRanges(list, version, (ledgerRange.getEndSeqIdIncluded().getLocalComponent() - lastAddConfirmed) + BookkeeperPersistenceManager.START_SEQ_ID);
                            return;
                        }
                        String str = "No entries found in a have-end-seq-id ledger " + ledgerId + " when acquiring topic " + AcquireOp.this.topic.toStringUtf8() + ".";
                        BookkeeperPersistenceManager.logger.error(str);
                        AcquireOp.this.cb.operationFailed(obj, new PubSubException.UnexpectedConditionException(str));
                    }
                }, this.ctx);
            }
        }

        void processTopicLedgerRanges(List<PubSubProtocol.LedgerRange> list, Version version, long j) {
            BookkeeperPersistenceManager.logger.info("Process {} ledgers for topic {} starting from seq id {}.", VarArgs.va(new Object[]{Integer.valueOf(list.size()), this.topic.toStringUtf8(), Long.valueOf(j)}));
            Iterator<PubSubProtocol.LedgerRange> it = list.iterator();
            TopicInfo topicInfo = new TopicInfo();
            while (it.hasNext()) {
                PubSubProtocol.LedgerRange next = it.next();
                if (!next.hasEndSeqIdIncluded()) {
                    if (it.hasNext()) {
                        String str = "Ledger-id: " + next.getLedgerId() + " for topic: " + this.topic.toStringUtf8() + " is not the last one but still does not have an end seq-id";
                        BookkeeperPersistenceManager.logger.error(str);
                        this.cb.operationFailed(this.ctx, new PubSubException.UnexpectedConditionException(str));
                        return;
                    } else {
                        if (next.hasStartSeqIdIncluded()) {
                            j = next.getStartSeqIdIncluded();
                        }
                        recoverLastTopicLedgerAndOpenNewOne(next.getLedgerId(), j, version, topicInfo);
                        return;
                    }
                }
                long localComponent = next.getEndSeqIdIncluded().getLocalComponent();
                if (next.hasStartSeqIdIncluded()) {
                    j = next.getStartSeqIdIncluded();
                } else {
                    next = BookkeeperPersistenceManager.buildLedgerRange(next.getLedgerId(), j, next.getEndSeqIdIncluded());
                }
                topicInfo.ledgerRanges.put(Long.valueOf(localComponent), new InMemoryLedgerRange(next));
                if (j < localComponent + BookkeeperPersistenceManager.START_SEQ_ID) {
                    j = localComponent + BookkeeperPersistenceManager.START_SEQ_ID;
                }
            }
            BookkeeperPersistenceManager.this.openNewTopicLedger(this.topic, version, topicInfo, j, false, this.cb, this.ctx);
        }

        private void recoverLastTopicLedgerAndOpenNewOne(final long j, final long j2, final Version version, final TopicInfo topicInfo) {
            BookkeeperPersistenceManager.this.bk.asyncOpenLedger(j, BookKeeper.DigestType.CRC32, BookkeeperPersistenceManager.passwd, new SafeAsynBKCallback.OpenCallback() { // from class: org.apache.hedwig.server.persistence.BookkeeperPersistenceManager.AcquireOp.3
                @Override // org.apache.hedwig.zookeeper.SafeAsynBKCallback.OpenCallback
                public void safeOpenComplete(int i, LedgerHandle ledgerHandle, Object obj) {
                    if (i != 0) {
                        BKException create = BKException.create(i);
                        BookkeeperPersistenceManager.logger.error("While acquiring topic: " + AcquireOp.this.topic.toStringUtf8() + ", could not open unrecovered ledger: " + j, create);
                        AcquireOp.this.cb.operationFailed(obj, new PubSubException.ServiceDownException(create));
                        return;
                    }
                    final long lastAddConfirmed = ledgerHandle.getLastAddConfirmed() + BookkeeperPersistenceManager.START_SEQ_ID;
                    if (lastAddConfirmed > BookkeeperPersistenceManager.UNLIMITED_ENTRIES) {
                        ledgerHandle.asyncReadEntries(lastAddConfirmed - BookkeeperPersistenceManager.START_SEQ_ID, lastAddConfirmed - BookkeeperPersistenceManager.START_SEQ_ID, new SafeAsynBKCallback.ReadCallback() { // from class: org.apache.hedwig.server.persistence.BookkeeperPersistenceManager.AcquireOp.3.1
                            @Override // org.apache.hedwig.zookeeper.SafeAsynBKCallback.ReadCallback
                            public void safeReadComplete(int i2, LedgerHandle ledgerHandle2, Enumeration<LedgerEntry> enumeration, Object obj2) {
                                if (i2 != 0 || !enumeration.hasMoreElements()) {
                                    if (i2 == 0) {
                                        i2 = -13;
                                    }
                                    BookkeeperPersistenceManager.logger.info("Received error code {}", Integer.valueOf(i2));
                                    BKException create2 = BKException.create(i2);
                                    BookkeeperPersistenceManager.logger.error("While recovering ledger: " + j + " for topic: " + AcquireOp.this.topic.toStringUtf8() + ", could not read last entry", create2);
                                    AcquireOp.this.cb.operationFailed(obj2, new PubSubException.ServiceDownException(create2));
                                    return;
                                }
                                try {
                                    PubSubProtocol.Message parseFrom = PubSubProtocol.Message.parseFrom(enumeration.nextElement().getEntry());
                                    long localComponent = parseFrom.getMsgId().getLocalComponent();
                                    long j3 = (localComponent - lastAddConfirmed) + BookkeeperPersistenceManager.START_SEQ_ID;
                                    if (j3 != j2) {
                                        BookkeeperPersistenceManager.logger.warn("Expected start seq id of recovered ledger " + j + " to be " + j2 + " but it was " + j3 + ".");
                                    }
                                    topicInfo.ledgerRanges.put(Long.valueOf(localComponent), new InMemoryLedgerRange(BookkeeperPersistenceManager.buildLedgerRange(j, j3, parseFrom.getMsgId()), ledgerHandle2));
                                    BookkeeperPersistenceManager.logger.info("Recovered unclosed ledger: {} for topic: {} with {} entries starting from seq id {}", VarArgs.va(new Object[]{Long.valueOf(j), AcquireOp.this.topic.toStringUtf8(), Long.valueOf(lastAddConfirmed), Long.valueOf(j3)}));
                                    BookkeeperPersistenceManager.this.openNewTopicLedger(AcquireOp.this.topic, version, topicInfo, localComponent + BookkeeperPersistenceManager.START_SEQ_ID, false, AcquireOp.this.cb, obj2);
                                } catch (InvalidProtocolBufferException e) {
                                    String str = "While recovering ledger: " + j + " for topic: " + AcquireOp.this.topic.toStringUtf8() + ", could not deserialize last message";
                                    BookkeeperPersistenceManager.logger.error(str, e);
                                    AcquireOp.this.cb.operationFailed(obj2, new PubSubException.UnexpectedConditionException(str));
                                }
                            }
                        }, obj);
                        return;
                    }
                    BookkeeperPersistenceManager.logger.info("Pruning empty ledger: " + j + " for topic: " + AcquireOp.this.topic.toStringUtf8());
                    BookkeeperPersistenceManager.this.closeLedger(ledgerHandle);
                    BookkeeperPersistenceManager.this.openNewTopicLedger(AcquireOp.this.topic, version, topicInfo, j2, false, AcquireOp.this.cb, obj);
                }
            }, this.ctx);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hedwig/server/persistence/BookkeeperPersistenceManager$ChangeLedgerOp.class */
    public class ChangeLedgerOp extends TopicOpQueuer.AsynchronousOp<Void> {
        /* JADX WARN: Illegal instructions before constructor call */
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public ChangeLedgerOp(com.google.protobuf.ByteString r8, org.apache.hedwig.util.Callback<java.lang.Void> r9, java.lang.Object r10) {
            /*
                r6 = this;
                r0 = r6
                r1 = r7
                org.apache.hedwig.server.persistence.BookkeeperPersistenceManager.this = r1
                r0 = r6
                r1 = r7
                org.apache.hedwig.server.common.TopicOpQueuer r1 = r1.queuer
                r2 = r1
                java.lang.Class r2 = r2.getClass()
                r2 = r8
                r3 = r9
                r4 = r10
                r0.<init>(r2, r3, r4)
                return
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.hedwig.server.persistence.BookkeeperPersistenceManager.ChangeLedgerOp.<init>(org.apache.hedwig.server.persistence.BookkeeperPersistenceManager, com.google.protobuf.ByteString, org.apache.hedwig.util.Callback, java.lang.Object):void");
        }

        @Override // java.lang.Runnable
        public void run() {
            TopicInfo topicInfo = BookkeeperPersistenceManager.this.topicInfos.get(this.topic);
            if (null != topicInfo) {
                closeLastTopicLedgerAndOpenNewOne(topicInfo);
            } else {
                BookkeeperPersistenceManager.logger.error("Weired! hub server doesn't own topic " + this.topic.toStringUtf8() + " when changing ledger to write.");
                this.cb.operationFailed(this.ctx, new PubSubException.ServerNotResponsibleForTopicException(""));
            }
        }

        private void closeLastTopicLedgerAndOpenNewOne(final TopicInfo topicInfo) {
            final long id = topicInfo.currentLedgerRange.handle.getId();
            topicInfo.currentLedgerRange.handle.asyncClose(new AsyncCallback.CloseCallback() { // from class: org.apache.hedwig.server.persistence.BookkeeperPersistenceManager.ChangeLedgerOp.1
                AtomicBoolean processed = new AtomicBoolean(false);

                public void closeComplete(int i, LedgerHandle ledgerHandle, Object obj) {
                    if (this.processed.compareAndSet(false, true)) {
                        if (0 != i) {
                            BKException create = BKException.create(i);
                            BookkeeperPersistenceManager.logger.error("Could not close ledger " + id + " while changing ledger of topic " + ChangeLedgerOp.this.topic.toStringUtf8(), create);
                            ChangeLedgerOp.this.cb.operationFailed(obj, new PubSubException.ServiceDownException(create));
                        } else {
                            long localComponent = topicInfo.lastSeqIdPushed.getLocalComponent();
                            topicInfo.currentLedgerRange.range = BookkeeperPersistenceManager.buildLedgerRange(id, topicInfo.currentLedgerRange.getStartSeqIdIncluded(), topicInfo.lastSeqIdPushed);
                            topicInfo.ledgerRanges.put(Long.valueOf(localComponent), topicInfo.currentLedgerRange);
                            BookkeeperPersistenceManager.logger.info("Closed written ledger " + id + " for topic " + ChangeLedgerOp.this.topic.toStringUtf8() + " to change ledger.");
                            BookkeeperPersistenceManager.this.openNewTopicLedger(ChangeLedgerOp.this.topic, topicInfo.ledgerRangesVersion, topicInfo, localComponent + BookkeeperPersistenceManager.START_SEQ_ID, true, ChangeLedgerOp.this.cb, obj);
                        }
                    }
                }
            }, this.ctx);
        }
    }

    /* loaded from: input_file:org/apache/hedwig/server/persistence/BookkeeperPersistenceManager$ConsumeUntilOp.class */
    class ConsumeUntilOp extends TopicOpQueuer.SynchronousOp {
        private final long seqId;

        /* JADX WARN: Illegal instructions before constructor call */
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public ConsumeUntilOp(com.google.protobuf.ByteString r6, long r7) {
            /*
                r4 = this;
                r0 = r4
                r1 = r5
                org.apache.hedwig.server.persistence.BookkeeperPersistenceManager.this = r1
                r0 = r4
                r1 = r5
                org.apache.hedwig.server.common.TopicOpQueuer r1 = r1.queuer
                r2 = r1
                java.lang.Class r2 = r2.getClass()
                r2 = r6
                r0.<init>(r2)
                r0 = r4
                r1 = r7
                r0.seqId = r1
                return
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.hedwig.server.persistence.BookkeeperPersistenceManager.ConsumeUntilOp.<init>(org.apache.hedwig.server.persistence.BookkeeperPersistenceManager, com.google.protobuf.ByteString, long):void");
        }

        @Override // org.apache.hedwig.server.common.TopicOpQueuer.SynchronousOp
        public void runInternal() {
            TopicInfo topicInfo = BookkeeperPersistenceManager.this.topicInfos.get(this.topic);
            if (topicInfo == null) {
                BookkeeperPersistenceManager.logger.error("Server is not responsible for topic!");
                return;
            }
            LinkedList linkedList = new LinkedList();
            for (Long l : topicInfo.ledgerRanges.keySet()) {
                if (l.longValue() > this.seqId) {
                    break;
                } else {
                    linkedList.add(Long.valueOf(topicInfo.ledgerRanges.get(l).range.getLedgerId()));
                }
            }
            if (linkedList.isEmpty()) {
                return;
            }
            BookkeeperPersistenceManager.this.deleteLedgersAndUpdateLedgersRange(this.topic, linkedList, new HashSet());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hedwig/server/persistence/BookkeeperPersistenceManager$InMemoryLedgerRange.class */
    public static class InMemoryLedgerRange {
        PubSubProtocol.LedgerRange range;
        LedgerHandle handle;
        static final /* synthetic */ boolean $assertionsDisabled;

        public InMemoryLedgerRange(PubSubProtocol.LedgerRange ledgerRange, LedgerHandle ledgerHandle) {
            this.range = ledgerRange;
            this.handle = ledgerHandle;
        }

        public InMemoryLedgerRange(PubSubProtocol.LedgerRange ledgerRange) {
            this(ledgerRange, null);
        }

        public long getStartSeqIdIncluded() {
            if ($assertionsDisabled || this.range.hasStartSeqIdIncluded()) {
                return this.range.getStartSeqIdIncluded();
            }
            throw new AssertionError();
        }

        static {
            $assertionsDisabled = !BookkeeperPersistenceManager.class.desiredAssertionStatus();
        }
    }

    /* loaded from: input_file:org/apache/hedwig/server/persistence/BookkeeperPersistenceManager$PersistOp.class */
    public class PersistOp extends TopicOpQueuer.SynchronousOp {
        PersistRequest request;

        /* JADX WARN: Illegal instructions before constructor call */
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public PersistOp(org.apache.hedwig.server.persistence.PersistRequest r6) {
            /*
                r4 = this;
                r0 = r4
                r1 = r5
                org.apache.hedwig.server.persistence.BookkeeperPersistenceManager.this = r1
                r0 = r4
                r1 = r5
                org.apache.hedwig.server.common.TopicOpQueuer r1 = r1.queuer
                r2 = r1
                java.lang.Class r2 = r2.getClass()
                r2 = r6
                com.google.protobuf.ByteString r2 = r2.topic
                r0.<init>(r2)
                r0 = r4
                r1 = r6
                r0.request = r1
                return
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.hedwig.server.persistence.BookkeeperPersistenceManager.PersistOp.<init>(org.apache.hedwig.server.persistence.BookkeeperPersistenceManager, org.apache.hedwig.server.persistence.PersistRequest):void");
        }

        @Override // org.apache.hedwig.server.common.TopicOpQueuer.SynchronousOp
        public void runInternal() {
            BookkeeperPersistenceManager.this.doPersistMessage(this.request);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hedwig/server/persistence/BookkeeperPersistenceManager$RangeScanOp.class */
    public class RangeScanOp extends TopicOpQueuer.SynchronousOp {
        RangeScanRequest request;
        int numMessagesRead;
        long totalSizeRead;
        TopicInfo topicInfo;
        long startSeqIdToScan;

        public RangeScanOp(BookkeeperPersistenceManager bookkeeperPersistenceManager, RangeScanRequest rangeScanRequest) {
            this(rangeScanRequest, -1L, 0, BookkeeperPersistenceManager.UNLIMITED_ENTRIES);
        }

        /* JADX WARN: Illegal instructions before constructor call */
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public RangeScanOp(org.apache.hedwig.server.persistence.RangeScanRequest r6, long r7, int r9, long r10) {
            /*
                r4 = this;
                r0 = r4
                r1 = r5
                org.apache.hedwig.server.persistence.BookkeeperPersistenceManager.this = r1
                r0 = r4
                r1 = r5
                org.apache.hedwig.server.common.TopicOpQueuer r1 = r1.queuer
                r2 = r1
                java.lang.Class r2 = r2.getClass()
                r2 = r6
                com.google.protobuf.ByteString r2 = r2.topic
                r0.<init>(r2)
                r0 = r4
                r1 = 0
                r0.numMessagesRead = r1
                r0 = r4
                r1 = 0
                r0.totalSizeRead = r1
                r0 = r4
                r1 = r6
                r0.request = r1
                r0 = r4
                r1 = r7
                r0.startSeqIdToScan = r1
                r0 = r4
                r1 = r9
                r0.numMessagesRead = r1
                r0 = r4
                r1 = r10
                r0.totalSizeRead = r1
                return
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.hedwig.server.persistence.BookkeeperPersistenceManager.RangeScanOp.<init>(org.apache.hedwig.server.persistence.BookkeeperPersistenceManager, org.apache.hedwig.server.persistence.RangeScanRequest, long, int, long):void");
        }

        @Override // org.apache.hedwig.server.common.TopicOpQueuer.SynchronousOp
        protected void runInternal() {
            this.topicInfo = BookkeeperPersistenceManager.this.topicInfos.get(this.topic);
            if (this.topicInfo == null) {
                this.request.callback.scanFailed(this.request.ctx, new PubSubException.ServerNotResponsibleForTopicException(""));
            } else {
                startReadingFrom(this.startSeqIdToScan < BookkeeperPersistenceManager.UNLIMITED_ENTRIES ? this.request.startSeqId : this.startSeqIdToScan);
            }
        }

        protected void read(final InMemoryLedgerRange inMemoryLedgerRange, final long j, final long j2) {
            if (inMemoryLedgerRange.getStartSeqIdIncluded() <= j) {
                if (inMemoryLedgerRange.handle == null) {
                    BookkeeperPersistenceManager.this.bk.asyncOpenLedger(inMemoryLedgerRange.range.getLedgerId(), BookKeeper.DigestType.CRC32, BookkeeperPersistenceManager.passwd, new SafeAsynBKCallback.OpenCallback() { // from class: org.apache.hedwig.server.persistence.BookkeeperPersistenceManager.RangeScanOp.1
                        @Override // org.apache.hedwig.zookeeper.SafeAsynBKCallback.OpenCallback
                        public void safeOpenComplete(int i, LedgerHandle ledgerHandle, Object obj) {
                            if (i == 0) {
                                inMemoryLedgerRange.handle = ledgerHandle;
                                RangeScanOp.this.read(inMemoryLedgerRange, j, j2);
                            } else {
                                BKException create = BKException.create(i);
                                BookkeeperPersistenceManager.logger.error("Could not open ledger: " + inMemoryLedgerRange.range.getLedgerId() + " for topic: " + RangeScanOp.this.topic);
                                RangeScanOp.this.request.callback.scanFailed(obj, new PubSubException.ServiceDownException(create));
                            }
                        }
                    }, this.request.ctx);
                    return;
                }
                long min = Math.min(((j + this.request.messageLimit) - this.numMessagesRead) - BookkeeperPersistenceManager.START_SEQ_ID, j2);
                if (BookkeeperPersistenceManager.logger.isDebugEnabled()) {
                    BookkeeperPersistenceManager.logger.debug("Issuing a bk read for ledger: " + inMemoryLedgerRange.handle.getId() + " from entry-id: " + (j - inMemoryLedgerRange.getStartSeqIdIncluded()) + " to entry-id: " + (min - inMemoryLedgerRange.getStartSeqIdIncluded()));
                }
                inMemoryLedgerRange.handle.asyncReadEntries(j - inMemoryLedgerRange.getStartSeqIdIncluded(), min - inMemoryLedgerRange.getStartSeqIdIncluded(), new SafeAsynBKCallback.ReadCallback() { // from class: org.apache.hedwig.server.persistence.BookkeeperPersistenceManager.RangeScanOp.2
                    long expectedEntryId;
                    static final /* synthetic */ boolean $assertionsDisabled;

                    {
                        this.expectedEntryId = j - inMemoryLedgerRange.getStartSeqIdIncluded();
                    }

                    @Override // org.apache.hedwig.zookeeper.SafeAsynBKCallback.ReadCallback
                    public void safeReadComplete(int i, LedgerHandle ledgerHandle, Enumeration<LedgerEntry> enumeration, Object obj) {
                        if (i != 0 || !enumeration.hasMoreElements()) {
                            if (i == 0) {
                                i = -13;
                            }
                            BKException create = BKException.create(i);
                            BookkeeperPersistenceManager.logger.error("Error while reading from ledger: " + inMemoryLedgerRange.range.getLedgerId() + " for topic: " + RangeScanOp.this.topic.toStringUtf8(), create);
                            RangeScanOp.this.request.callback.scanFailed(RangeScanOp.this.request.ctx, new PubSubException.ServiceDownException(create));
                            return;
                        }
                        LedgerEntry ledgerEntry = null;
                        while (enumeration.hasMoreElements()) {
                            ledgerEntry = enumeration.nextElement();
                            try {
                                PubSubProtocol.Message parseFrom = PubSubProtocol.Message.parseFrom(ledgerEntry.getEntryInputStream());
                                BookkeeperPersistenceManager.logger.debug("Read response from ledger: {} entry-id: {}", Long.valueOf(ledgerHandle.getId()), Long.valueOf(ledgerEntry.getEntryId()));
                                if (!$assertionsDisabled && this.expectedEntryId != ledgerEntry.getEntryId()) {
                                    throw new AssertionError("expectedEntryId (" + this.expectedEntryId + ") != entry.getEntryId() (" + ledgerEntry.getEntryId() + ")");
                                }
                                if (!$assertionsDisabled && parseFrom.getMsgId().getLocalComponent() - inMemoryLedgerRange.getStartSeqIdIncluded() != this.expectedEntryId) {
                                    throw new AssertionError();
                                }
                                this.expectedEntryId += BookkeeperPersistenceManager.START_SEQ_ID;
                                RangeScanOp.this.request.callback.messageScanned(obj, parseFrom);
                                RangeScanOp.this.numMessagesRead++;
                                RangeScanOp.this.totalSizeRead += parseFrom.getBody().size();
                                if (RangeScanOp.this.numMessagesRead >= RangeScanOp.this.request.messageLimit) {
                                    RangeScanOp.this.request.callback.scanFinished(obj, ScanCallback.ReasonForFinish.NUM_MESSAGES_LIMIT_EXCEEDED);
                                    return;
                                } else if (RangeScanOp.this.totalSizeRead >= RangeScanOp.this.request.sizeLimit) {
                                    RangeScanOp.this.request.callback.scanFinished(obj, ScanCallback.ReasonForFinish.SIZE_LIMIT_EXCEEDED);
                                    return;
                                }
                            } catch (IOException e) {
                                String str = "Unreadable message found in ledger: " + inMemoryLedgerRange.range.getLedgerId() + " for topic: " + RangeScanOp.this.topic.toStringUtf8();
                                BookkeeperPersistenceManager.logger.error(str, e);
                                RangeScanOp.this.request.callback.scanFailed(obj, new PubSubException.UnexpectedConditionException(str));
                                return;
                            }
                        }
                        BookkeeperPersistenceManager.this.scanMessages(RangeScanOp.this.request, inMemoryLedgerRange.getStartSeqIdIncluded() + ledgerEntry.getEntryId() + BookkeeperPersistenceManager.START_SEQ_ID, RangeScanOp.this.numMessagesRead, RangeScanOp.this.totalSizeRead);
                    }

                    static {
                        $assertionsDisabled = !BookkeeperPersistenceManager.class.desiredAssertionStatus();
                    }
                }, this.request.ctx);
                return;
            }
            Logger logger = BookkeeperPersistenceManager.logger;
            Object[] objArr = new Object[3];
            objArr[0] = Long.valueOf(j);
            objArr[1] = Long.valueOf(inMemoryLedgerRange.getStartSeqIdIncluded());
            objArr[2] = inMemoryLedgerRange.range.hasEndSeqIdIncluded() ? Long.valueOf(inMemoryLedgerRange.range.getEndSeqIdIncluded().getLocalComponent()) : "";
            logger.error("Invalid RangeScan read, startSeqId {} doesn't fall in ledger range [{} ~ {}]", VarArgs.va(objArr));
            this.request.callback.scanFailed(this.request.ctx, new PubSubException.UnexpectedConditionException("Scan request is out of range"));
            BookkeeperPersistenceManager.this.lostTopic(this.topic);
        }

        protected void startReadingFrom(long j) {
            Map.Entry<Long, InMemoryLedgerRange> ceilingEntry = this.topicInfo.ledgerRanges.ceilingEntry(Long.valueOf(j));
            if (ceilingEntry != null) {
                read(ceilingEntry.getValue(), j, ceilingEntry.getValue().range.getEndSeqIdIncluded().getLocalComponent());
                return;
            }
            long startSeqIdIncluded = this.topicInfo.currentLedgerRange.getStartSeqIdIncluded() + this.topicInfo.lastEntryIdAckedInCurrentLedger;
            if (startSeqIdIncluded < j) {
                this.request.callback.scanFinished(this.request.ctx, ScanCallback.ReasonForFinish.NO_MORE_MESSAGES);
            } else {
                read(this.topicInfo.currentLedgerRange, j, startSeqIdIncluded);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hedwig/server/persistence/BookkeeperPersistenceManager$ReleaseOp.class */
    public class ReleaseOp extends TopicOpQueuer.SynchronousOp {
        /* JADX WARN: Illegal instructions before constructor call */
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public ReleaseOp(com.google.protobuf.ByteString r6) {
            /*
                r4 = this;
                r0 = r4
                r1 = r5
                org.apache.hedwig.server.persistence.BookkeeperPersistenceManager.this = r1
                r0 = r4
                r1 = r5
                org.apache.hedwig.server.common.TopicOpQueuer r1 = r1.queuer
                r2 = r1
                java.lang.Class r2 = r2.getClass()
                r2 = r6
                r0.<init>(r2)
                return
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.hedwig.server.persistence.BookkeeperPersistenceManager.ReleaseOp.<init>(org.apache.hedwig.server.persistence.BookkeeperPersistenceManager, com.google.protobuf.ByteString):void");
        }

        @Override // org.apache.hedwig.server.common.TopicOpQueuer.SynchronousOp
        public void runInternal() {
            TopicInfo remove = BookkeeperPersistenceManager.this.topicInfos.remove(this.topic);
            if (remove == null) {
                return;
            }
            for (InMemoryLedgerRange inMemoryLedgerRange : remove.ledgerRanges.values()) {
                if (inMemoryLedgerRange.handle != null) {
                    BookkeeperPersistenceManager.this.closeLedger(inMemoryLedgerRange.handle);
                }
            }
            if (remove.currentLedgerRange == null || remove.currentLedgerRange.handle == null) {
                return;
            }
            BookkeeperPersistenceManager.this.closeLedger(remove.currentLedgerRange.handle);
        }
    }

    /* loaded from: input_file:org/apache/hedwig/server/persistence/BookkeeperPersistenceManager$SetMessageBoundOp.class */
    class SetMessageBoundOp extends TopicOpQueuer.SynchronousOp {
        final int bound;

        /* JADX WARN: Illegal instructions before constructor call */
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public SetMessageBoundOp(com.google.protobuf.ByteString r6, int r7) {
            /*
                r4 = this;
                r0 = r4
                r1 = r5
                org.apache.hedwig.server.persistence.BookkeeperPersistenceManager.this = r1
                r0 = r4
                r1 = r5
                org.apache.hedwig.server.common.TopicOpQueuer r1 = r1.queuer
                r2 = r1
                java.lang.Class r2 = r2.getClass()
                r2 = r6
                r0.<init>(r2)
                r0 = r4
                r1 = r7
                r0.bound = r1
                return
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.hedwig.server.persistence.BookkeeperPersistenceManager.SetMessageBoundOp.<init>(org.apache.hedwig.server.persistence.BookkeeperPersistenceManager, com.google.protobuf.ByteString, int):void");
        }

        @Override // org.apache.hedwig.server.common.TopicOpQueuer.SynchronousOp
        public void runInternal() {
            TopicInfo topicInfo = BookkeeperPersistenceManager.this.topicInfos.get(this.topic);
            if (topicInfo != null) {
                topicInfo.messageBound = this.bound;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hedwig/server/persistence/BookkeeperPersistenceManager$TopicInfo.class */
    public static class TopicInfo {
        PubSubProtocol.MessageSeqId lastSeqIdPushed;
        InMemoryLedgerRange currentLedgerRange;
        static final int UNLIMITED = 0;
        long lastEntryIdAckedInCurrentLedger = -1;
        TreeMap<Long, InMemoryLedgerRange> ledgerRanges = new TreeMap<>();
        Version ledgerRangesVersion = Version.NEW;
        AtomicBoolean doRelease = new AtomicBoolean(false);
        AtomicBoolean doChangeLedger = new AtomicBoolean(false);
        long lastSeqIdBeforeLedgerChange = -1;
        LinkedList<PersistRequest> deferredRequests = null;
        int messageBound = UNLIMITED;

        TopicInfo() {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hedwig/server/persistence/BookkeeperPersistenceManager$UpdateLedgerOp.class */
    public class UpdateLedgerOp extends TopicOpQueuer.AsynchronousOp<Void> {
        private Set<Long> ledgersDeleted;

        /* JADX WARN: Illegal instructions before constructor call */
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public UpdateLedgerOp(com.google.protobuf.ByteString r8, org.apache.hedwig.util.Callback<java.lang.Void> r9, java.lang.Object r10, java.util.Set<java.lang.Long> r11) {
            /*
                r6 = this;
                r0 = r6
                r1 = r7
                org.apache.hedwig.server.persistence.BookkeeperPersistenceManager.this = r1
                r0 = r6
                r1 = r7
                org.apache.hedwig.server.common.TopicOpQueuer r1 = r1.queuer
                r2 = r1
                java.lang.Class r2 = r2.getClass()
                r2 = r8
                r3 = r9
                r4 = r10
                r0.<init>(r2, r3, r4)
                r0 = r6
                r1 = r11
                r0.ledgersDeleted = r1
                return
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.hedwig.server.persistence.BookkeeperPersistenceManager.UpdateLedgerOp.<init>(org.apache.hedwig.server.persistence.BookkeeperPersistenceManager, com.google.protobuf.ByteString, org.apache.hedwig.util.Callback, java.lang.Object, java.util.Set):void");
        }

        @Override // java.lang.Runnable
        public void run() {
            final TopicInfo topicInfo = BookkeeperPersistenceManager.this.topicInfos.get(this.topic);
            if (topicInfo == null) {
                BookkeeperPersistenceManager.logger.error("Server is not responsible for topic!");
                this.cb.operationFailed(this.ctx, new PubSubException.ServerNotResponsibleForTopicException(""));
                return;
            }
            PubSubProtocol.LedgerRanges.Builder newBuilder = PubSubProtocol.LedgerRanges.newBuilder();
            final HashSet hashSet = new HashSet();
            boolean z = false;
            for (Map.Entry<Long, InMemoryLedgerRange> entry : topicInfo.ledgerRanges.entrySet()) {
                PubSubProtocol.LedgerRange ledgerRange = entry.getValue().range;
                long ledgerId = ledgerRange.getLedgerId();
                if (z || !this.ledgersDeleted.contains(Long.valueOf(ledgerId))) {
                    z = true;
                    newBuilder.addRanges(ledgerRange);
                } else {
                    hashSet.add(entry.getKey());
                    if (!ledgerRange.hasEndSeqIdIncluded()) {
                        String str = "Should not remove unclosed ledger " + ledgerId + " for topic " + this.topic.toStringUtf8();
                        BookkeeperPersistenceManager.logger.error(str);
                        this.cb.operationFailed(this.ctx, new PubSubException.UnexpectedConditionException(str));
                        return;
                    }
                }
            }
            newBuilder.addRanges(topicInfo.currentLedgerRange.range);
            if (hashSet.isEmpty()) {
                this.cb.operationFinished(this.ctx, (Object) null);
            } else {
                BookkeeperPersistenceManager.this.tpManager.writeTopicPersistenceInfo(this.topic, newBuilder.build(), topicInfo.ledgerRangesVersion, new Callback<Version>() { // from class: org.apache.hedwig.server.persistence.BookkeeperPersistenceManager.UpdateLedgerOp.1
                    public void operationFinished(Object obj, Version version) {
                        Iterator it = hashSet.iterator();
                        while (it.hasNext()) {
                            topicInfo.ledgerRanges.remove((Long) it.next());
                        }
                        topicInfo.ledgerRangesVersion = version;
                        UpdateLedgerOp.this.cb.operationFinished(obj, (Object) null);
                    }

                    public void operationFailed(Object obj, PubSubException pubSubException) {
                        UpdateLedgerOp.this.cb.operationFailed(obj, pubSubException);
                    }
                }, this.ctx);
            }
        }
    }

    public BookkeeperPersistenceManager(BookKeeper bookKeeper, MetadataManagerFactory metadataManagerFactory, TopicManager topicManager, ServerConfiguration serverConfiguration, ScheduledExecutorService scheduledExecutorService) {
        this.bk = bookKeeper;
        this.tpManager = metadataManagerFactory.newTopicPersistenceManager();
        this.cfg = serverConfiguration;
        this.tm = topicManager;
        this.maxEntriesPerLedger = serverConfiguration.getMaxEntriesPerLedger();
        this.queuer = new TopicOpQueuer(scheduledExecutorService);
        topicManager.addTopicOwnershipChangeListener(this);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static PubSubProtocol.LedgerRange buildLedgerRange(long j, long j2, PubSubProtocol.MessageSeqId messageSeqId) {
        return PubSubProtocol.LedgerRange.newBuilder().setLedgerId(j).setStartSeqIdIncluded(j2).setEndSeqIdIncluded(messageSeqId).build();
    }

    @Override // org.apache.hedwig.server.persistence.PersistenceManagerWithRangeScan
    public void scanMessages(RangeScanRequest rangeScanRequest) {
        this.queuer.pushAndMaybeRun(rangeScanRequest.topic, new RangeScanOp(this, rangeScanRequest));
    }

    protected void scanMessages(RangeScanRequest rangeScanRequest, long j, int i, long j2) {
        this.queuer.pushAndMaybeRun(rangeScanRequest.topic, new RangeScanOp(this, rangeScanRequest, j, i, j2));
    }

    @Override // org.apache.hedwig.server.persistence.PersistenceManager
    public void deliveredUntil(ByteString byteString, Long l) {
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void deleteLedgersAndUpdateLedgersRange(final ByteString byteString, final LinkedList<Long> linkedList, final Set<Long> set) {
        if (linkedList.isEmpty()) {
            this.queuer.pushAndMaybeRun(byteString, new UpdateLedgerOp(this, byteString, new Callback<Void>() { // from class: org.apache.hedwig.server.persistence.BookkeeperPersistenceManager.1
                public void operationFinished(Object obj, Void r3) {
                }

                public void operationFailed(Object obj, PubSubException pubSubException) {
                    BookkeeperPersistenceManager.logger.error("Failed to update ledger znode for topic {} deleting ledgers {} : {}", VarArgs.va(new Object[]{byteString.toStringUtf8(), set, pubSubException.getMessage()}));
                }
            }, null, set));
            return;
        }
        final Long poll = linkedList.poll();
        if (null == poll) {
            deleteLedgersAndUpdateLedgersRange(byteString, linkedList, set);
        } else {
            this.bk.asyncDeleteLedger(poll.longValue(), new AsyncCallback.DeleteCallback() { // from class: org.apache.hedwig.server.persistence.BookkeeperPersistenceManager.2
                public void deleteComplete(int i, Object obj) {
                    if (-7 == i || 0 == i) {
                        set.add(poll);
                        BookkeeperPersistenceManager.this.deleteLedgersAndUpdateLedgersRange(byteString, linkedList, set);
                    } else {
                        BookkeeperPersistenceManager.logger.warn("Exception while deleting consumed ledger {}, stop deleting other ledgers {} and update ledger ranges with deleted ledgers {} : {}", VarArgs.va(new Object[]{poll, linkedList, set, BKException.create(i)}));
                        BookkeeperPersistenceManager.this.queuer.pushAndMaybeRun(byteString, new UpdateLedgerOp(BookkeeperPersistenceManager.this, byteString, new Callback<Void>() { // from class: org.apache.hedwig.server.persistence.BookkeeperPersistenceManager.2.1
                            public void operationFinished(Object obj2, Void r3) {
                            }

                            public void operationFailed(Object obj2, PubSubException pubSubException) {
                                BookkeeperPersistenceManager.logger.error("Failed to update ledger znode for topic {} deleting ledgers {} : {}", VarArgs.va(new Object[]{byteString, set, pubSubException.getMessage()}));
                            }
                        }, null, set));
                    }
                }
            }, (Object) null);
        }
    }

    @Override // org.apache.hedwig.server.persistence.PersistenceManager
    public void consumedUntil(ByteString byteString, Long l) {
        this.queuer.pushAndMaybeRun(byteString, new ConsumeUntilOp(this, byteString, Math.max(l.longValue(), getMinSeqIdForTopic(byteString))));
    }

    @Override // org.apache.hedwig.server.persistence.PersistenceManager
    public void consumeToBound(ByteString byteString) {
        TopicInfo topicInfo = this.topicInfos.get(byteString);
        if (topicInfo == null || topicInfo.messageBound == 0) {
            return;
        }
        this.queuer.pushAndMaybeRun(byteString, new ConsumeUntilOp(this, byteString, getMinSeqIdForTopic(byteString)));
    }

    public long getMinSeqIdForTopic(ByteString byteString) {
        TopicInfo topicInfo = this.topicInfos.get(byteString);
        if (topicInfo == null || topicInfo.messageBound == 0) {
            return Long.MIN_VALUE;
        }
        return (topicInfo.lastSeqIdPushed.getLocalComponent() - topicInfo.messageBound) + START_SEQ_ID;
    }

    @Override // org.apache.hedwig.server.persistence.PersistenceManager
    public PubSubProtocol.MessageSeqId getCurrentSeqIdForTopic(ByteString byteString) throws PubSubException.ServerNotResponsibleForTopicException {
        TopicInfo topicInfo = this.topicInfos.get(byteString);
        if (topicInfo == null) {
            throw new PubSubException.ServerNotResponsibleForTopicException("");
        }
        return topicInfo.lastSeqIdPushed;
    }

    @Override // org.apache.hedwig.server.persistence.PersistenceManager
    public long getSeqIdAfterSkipping(ByteString byteString, long j, int i) {
        return Math.max(j + i, getMinSeqIdForTopic(byteString));
    }

    protected void releaseTopicIfRequested(final ByteString byteString, Exception exc, Object obj) {
        TopicInfo topicInfo = this.topicInfos.get(byteString);
        if (topicInfo == null) {
            logger.warn("No topic found when trying to release ownership of topic " + byteString.toStringUtf8() + " on failure.");
            return;
        }
        if (topicInfo.doRelease.compareAndSet(false, true)) {
            logger.info("Release topic " + byteString.toStringUtf8() + " when bookkeeper persistence mananger encounters failure :", exc);
            this.tm.releaseTopic(byteString, new Callback<Void>() { // from class: org.apache.hedwig.server.persistence.BookkeeperPersistenceManager.3
                public void operationFailed(Object obj2, PubSubException pubSubException) {
                    BookkeeperPersistenceManager.logger.error("Exception found on releasing topic " + byteString.toStringUtf8() + " when encountering exception from bookkeeper:", pubSubException);
                }

                public void operationFinished(Object obj2, Void r6) {
                    BookkeeperPersistenceManager.logger.info("successfully releasing topic {} when encountering exception from bookkeeper", byteString.toStringUtf8());
                }
            }, null);
        }
        if (topicInfo.doChangeLedger.get()) {
            Iterator<PersistRequest> it = topicInfo.deferredRequests.iterator();
            while (it.hasNext()) {
                it.next().getCallback().operationFailed(obj, new PubSubException.ServiceDownException(exc));
            }
            topicInfo.deferredRequests.clear();
            topicInfo.lastSeqIdBeforeLedgerChange = -1L;
        }
    }

    protected void doPersistMessage(final PersistRequest persistRequest) {
        final ByteString byteString = persistRequest.topic;
        final TopicInfo topicInfo = this.topicInfos.get(byteString);
        if (topicInfo == null) {
            persistRequest.getCallback().operationFailed(persistRequest.ctx, new PubSubException.ServerNotResponsibleForTopicException(""));
            return;
        }
        if (topicInfo.doRelease.get()) {
            persistRequest.getCallback().operationFailed(persistRequest.ctx, new PubSubException.ServiceDownException("The ownership of the topic is releasing due to unrecoverable issue."));
            return;
        }
        if (topicInfo.doChangeLedger.get()) {
            logger.info("Topic {} is changing ledger, so queue persist request for message.", byteString.toStringUtf8());
            topicInfo.deferredRequests.add(persistRequest);
            return;
        }
        final long localComponent = topicInfo.lastSeqIdPushed.getLocalComponent() + START_SEQ_ID;
        PubSubProtocol.MessageSeqId.Builder newBuilder = PubSubProtocol.MessageSeqId.newBuilder();
        if (persistRequest.message.hasMsgId()) {
            MessageIdUtils.takeRegionMaximum(newBuilder, topicInfo.lastSeqIdPushed, persistRequest.message.getMsgId());
        } else {
            newBuilder.addAllRemoteComponents(topicInfo.lastSeqIdPushed.getRemoteComponentsList());
        }
        newBuilder.setLocalComponent(localComponent);
        long startSeqIdIncluded = (localComponent - topicInfo.currentLedgerRange.getStartSeqIdIncluded()) + START_SEQ_ID;
        if (UNLIMITED_ENTRIES != this.maxEntriesPerLedger && startSeqIdIncluded >= this.maxEntriesPerLedger && topicInfo.doChangeLedger.compareAndSet(false, true)) {
            if (null == topicInfo.deferredRequests) {
                topicInfo.deferredRequests = new LinkedList<>();
            }
            topicInfo.lastSeqIdBeforeLedgerChange = localComponent;
        }
        topicInfo.lastSeqIdPushed = newBuilder.build();
        PubSubProtocol.Message build = PubSubProtocol.Message.newBuilder(persistRequest.message).setMsgId(topicInfo.lastSeqIdPushed).build();
        final PubSubProtocol.MessageSeqId msgId = build.getMsgId();
        topicInfo.currentLedgerRange.handle.asyncAddEntry(build.toByteArray(), new SafeAsynBKCallback.AddCallback() { // from class: org.apache.hedwig.server.persistence.BookkeeperPersistenceManager.4
            AtomicBoolean processed = new AtomicBoolean(false);

            @Override // org.apache.hedwig.zookeeper.SafeAsynBKCallback.AddCallback
            public void safeAddComplete(int i, LedgerHandle ledgerHandle, long j, Object obj) {
                if (this.processed.compareAndSet(false, true)) {
                    if (i != 0) {
                        Exception create = BKException.create(i);
                        BookkeeperPersistenceManager.logger.error("Error while persisting entry to ledger: " + ledgerHandle.getId() + " for topic: " + byteString.toStringUtf8(), create);
                        persistRequest.getCallback().operationFailed(obj, new PubSubException.ServiceDownException(create));
                        BookkeeperPersistenceManager.this.releaseTopicIfRequested(persistRequest.topic, create, obj);
                        return;
                    }
                    if (j + topicInfo.currentLedgerRange.getStartSeqIdIncluded() != localComponent) {
                        String str = "Expected BK to assign entry-id: " + (localComponent - topicInfo.currentLedgerRange.getStartSeqIdIncluded()) + " but it instead assigned entry-id: " + j + " topic: " + byteString.toStringUtf8() + "ledger: " + ledgerHandle.getId();
                        BookkeeperPersistenceManager.logger.error(str);
                        throw new UnexpectedError(str);
                    }
                    topicInfo.lastEntryIdAckedInCurrentLedger = j;
                    persistRequest.getCallback().operationFinished(obj, msgId);
                    if (topicInfo.doChangeLedger.get() && j + topicInfo.currentLedgerRange.getStartSeqIdIncluded() == topicInfo.lastSeqIdBeforeLedgerChange) {
                        BookkeeperPersistenceManager.this.changeLedger(byteString, new Callback<Void>() { // from class: org.apache.hedwig.server.persistence.BookkeeperPersistenceManager.4.1
                            public void operationFailed(Object obj2, PubSubException pubSubException) {
                                BookkeeperPersistenceManager.logger.error("Failed to change ledger for topic " + byteString.toStringUtf8(), pubSubException);
                                BookkeeperPersistenceManager.this.releaseTopicIfRequested(persistRequest.topic, pubSubException, obj2);
                            }

                            public void operationFinished(Object obj2, Void r7) {
                                topicInfo.doChangeLedger.set(false);
                                topicInfo.lastSeqIdBeforeLedgerChange = -1L;
                                int i2 = 0;
                                while (!topicInfo.deferredRequests.isEmpty() && i2 < BookkeeperPersistenceManager.this.maxEntriesPerLedger) {
                                    BookkeeperPersistenceManager.this.doPersistMessage(topicInfo.deferredRequests.removeFirst());
                                    i2++;
                                }
                                BookkeeperPersistenceManager.logger.debug("Finished persisting {} queued requests, but there are still {} requests in queue.", Integer.valueOf(i2), Integer.valueOf(topicInfo.deferredRequests.size()));
                            }
                        }, obj);
                    }
                }
            }
        }, persistRequest.ctx);
    }

    @Override // org.apache.hedwig.server.persistence.PersistenceManager
    public void persistMessage(PersistRequest persistRequest) {
        this.queuer.pushAndMaybeRun(persistRequest.topic, new PersistOp(this, persistRequest));
    }

    @Override // org.apache.hedwig.server.persistence.PersistenceManager
    public void scanSingleMessage(ScanRequest scanRequest) {
        throw new RuntimeException("Not implemented");
    }

    void openNewTopicLedger(final ByteString byteString, final Version version, final TopicInfo topicInfo, final long j, final boolean z, final Callback<Void> callback, Object obj) {
        this.bk.asyncCreateLedger(this.cfg.getBkEnsembleSize(), this.cfg.getBkWriteQuorumSize(), this.cfg.getBkAckQuorumSize(), BookKeeper.DigestType.CRC32, passwd, new SafeAsynBKCallback.CreateCallback() { // from class: org.apache.hedwig.server.persistence.BookkeeperPersistenceManager.6
            AtomicBoolean processed = new AtomicBoolean(false);

            @Override // org.apache.hedwig.zookeeper.SafeAsynBKCallback.CreateCallback
            public void safeCreateComplete(int i, LedgerHandle ledgerHandle, Object obj2) {
                if (this.processed.compareAndSet(false, true)) {
                    if (i != 0) {
                        BKException create = BKException.create(i);
                        BookkeeperPersistenceManager.logger.error("Could not create new ledger while acquiring topic: " + byteString.toStringUtf8(), create);
                        callback.operationFailed(obj2, new PubSubException.ServiceDownException(create));
                        return;
                    }
                    if (!z) {
                        topicInfo.lastSeqIdPushed = topicInfo.ledgerRanges.isEmpty() ? PubSubProtocol.MessageSeqId.newBuilder().setLocalComponent(j - BookkeeperPersistenceManager.START_SEQ_ID).build() : topicInfo.ledgerRanges.lastEntry().getValue().range.getEndSeqIdIncluded();
                    }
                    PubSubProtocol.LedgerRange build = PubSubProtocol.LedgerRange.newBuilder().setLedgerId(ledgerHandle.getId()).setStartSeqIdIncluded(j).build();
                    topicInfo.currentLedgerRange = new InMemoryLedgerRange(build, ledgerHandle);
                    topicInfo.lastEntryIdAckedInCurrentLedger = -1L;
                    PubSubProtocol.LedgerRanges.Builder newBuilder = PubSubProtocol.LedgerRanges.newBuilder();
                    Iterator<InMemoryLedgerRange> it = topicInfo.ledgerRanges.values().iterator();
                    while (it.hasNext()) {
                        newBuilder.addRanges(it.next().range);
                    }
                    newBuilder.addRanges(build);
                    BookkeeperPersistenceManager.this.tpManager.writeTopicPersistenceInfo(byteString, newBuilder.build(), version, new Callback<Version>() { // from class: org.apache.hedwig.server.persistence.BookkeeperPersistenceManager.6.1
                        public void operationFinished(Object obj3, Version version2) {
                            topicInfo.ledgerRangesVersion = version2;
                            BookkeeperPersistenceManager.this.topicInfos.put(byteString, topicInfo);
                            callback.operationFinished(obj3, (Object) null);
                        }

                        public void operationFailed(Object obj3, PubSubException pubSubException) {
                            callback.operationFailed(obj3, pubSubException);
                        }
                    }, obj2);
                }
            }
        }, obj);
    }

    @Override // org.apache.hedwig.server.topics.TopicOwnershipChangeListener
    public void acquiredTopic(ByteString byteString, Callback<Void> callback, Object obj) {
        this.queuer.pushAndMaybeRun(byteString, new AcquireOp(this, byteString, callback, obj));
    }

    protected void changeLedger(ByteString byteString, Callback<Void> callback, Object obj) {
        this.queuer.pushAndMaybeRun(byteString, new ChangeLedgerOp(this, byteString, callback, obj));
    }

    public void closeLedger(LedgerHandle ledgerHandle) {
    }

    @Override // org.apache.hedwig.server.topics.TopicOwnershipChangeListener
    public void lostTopic(ByteString byteString) {
        this.queuer.pushAndMaybeRun(byteString, new ReleaseOp(this, byteString));
    }

    @Override // org.apache.hedwig.server.persistence.PersistenceManager
    public void setMessageBound(ByteString byteString, Integer num) {
        this.queuer.pushAndMaybeRun(byteString, new SetMessageBoundOp(this, byteString, num.intValue()));
    }

    @Override // org.apache.hedwig.server.persistence.PersistenceManager
    public void clearMessageBound(ByteString byteString) {
        this.queuer.pushAndMaybeRun(byteString, new SetMessageBoundOp(this, byteString, 0));
    }

    @Override // org.apache.hedwig.server.persistence.PersistenceManager
    public void stop() {
        try {
            this.tpManager.close();
        } catch (IOException e) {
            logger.warn("Exception closing topic persistence manager : ", e);
        }
    }
}
