package org.apache.hedwig.server.persistence;

import com.google.protobuf.ByteString;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.util.MathUtils;
import org.apache.bookkeeper.util.OrderedSafeExecutor;
import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.protocol.PubSubProtocol;
import org.apache.hedwig.protoextensions.MessageIdUtils;
import org.apache.hedwig.server.common.ServerConfiguration;
import org.apache.hedwig.server.jmx.HedwigJMXService;
import org.apache.hedwig.server.jmx.HedwigMBeanInfo;
import org.apache.hedwig.server.jmx.HedwigMBeanRegistry;
import org.apache.hedwig.server.persistence.ScanCallback;
import org.apache.hedwig.util.Callback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hedwig/server/persistence/ReadAheadCache.class */
public class ReadAheadCache implements PersistenceManager, HedwigJMXService {
    static Logger logger = LoggerFactory.getLogger(ReadAheadCache.class);
    protected PersistenceManagerWithRangeScan realPersistenceManager;
    protected ServerConfiguration cfg;
    protected final OrderedSafeExecutor cacheWorkers;
    protected final int numCacheWorkers;
    protected volatile long maxSegmentSize;
    protected volatile long cacheEntryTTL;
    protected ConcurrentMap<CacheKey, CacheValue> cache = new ConcurrentHashMap();
    protected ConcurrentMap<ByteString, SortedSet<Long>> orderedIndexOnSeqId = new ConcurrentHashMap();
    protected AtomicLong presentCacheSize = new AtomicLong(0);
    protected AtomicInteger numPendingRequests = new AtomicInteger(0);
    protected final ThreadLocal<CacheSegment> cacheSegment = new ThreadLocal<CacheSegment>() { // from class: org.apache.hedwig.server.persistence.ReadAheadCache.1
        /* JADX INFO: Access modifiers changed from: protected */
        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.lang.ThreadLocal
        public CacheSegment initialValue() {
            return new CacheSegment();
        }
    };
    protected PersistCallback persistCallbackInstance = new PersistCallback();
    protected NoSuchSeqIdException noSuchSeqIdExceptionInstance = new NoSuchSeqIdException();
    protected ReadAheadException readAheadExceptionInstance = new ReadAheadException();
    protected volatile boolean keepRunning = true;
    ReadAheadCacheBean jmxCacheBean = null;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/hedwig/server/persistence/ReadAheadCache$CacheRequest.class */
    public interface CacheRequest {
        void performRequest();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hedwig/server/persistence/ReadAheadCache$CacheSegment.class */
    public static class CacheSegment {
        protected SortedMap<Long, Set<CacheKey>> timeIndexOfAddition = new TreeMap();
        protected AtomicLong presentSegmentSize = new AtomicLong(0);

        CacheSegment() {
        }
    }

    /* loaded from: input_file:org/apache/hedwig/server/persistence/ReadAheadCache$CancelScanRequestOp.class */
    public class CancelScanRequestOp implements CacheRequest {
        final CancelScanRequest request;

        public CancelScanRequestOp(CancelScanRequest cancelScanRequest) {
            this.request = cancelScanRequest;
        }

        @Override // org.apache.hedwig.server.persistence.ReadAheadCache.CacheRequest
        public void performRequest() {
            cancelScanRequest(this.request.getScanRequest());
        }

        void cancelScanRequest(ScanRequest scanRequest) {
            if (null == scanRequest) {
                return;
            }
            CacheValue cacheValue = ReadAheadCache.this.cache.get(new CacheKey(scanRequest.getTopic(), scanRequest.getStartSeqId()));
            if (null == cacheValue) {
                return;
            }
            cacheValue.removeCallback(scanRequest.getCallback(), scanRequest.getCtx());
        }
    }

    /* loaded from: input_file:org/apache/hedwig/server/persistence/ReadAheadCache$DeliveredUntil.class */
    protected class DeliveredUntil implements CacheRequest {
        ByteString topic;
        Long seqId;

        public DeliveredUntil(ByteString byteString, Long l) {
            this.topic = byteString;
            this.seqId = l;
        }

        @Override // org.apache.hedwig.server.persistence.ReadAheadCache.CacheRequest
        public void performRequest() {
            SortedSet<Long> sortedSet = ReadAheadCache.this.orderedIndexOnSeqId.get(this.topic);
            if (sortedSet == null) {
                return;
            }
            Iterator<Long> it = sortedSet.headSet(Long.valueOf(this.seqId.longValue() + 1)).iterator();
            while (it.hasNext()) {
                CacheKey cacheKey = new CacheKey(this.topic, it.next().longValue());
                ReadAheadCache.logger.debug("Removing {} from cache because every subscriber has moved past", cacheKey);
                ReadAheadCache.this.removeMessageFromCache(cacheKey, ReadAheadCache.this.readAheadExceptionInstance, true, false);
                it.remove();
            }
            if (sortedSet.isEmpty()) {
                ReadAheadCache.this.orderedIndexOnSeqId.remove(this.topic);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/hedwig/server/persistence/ReadAheadCache$ExceptionOnCacheKey.class */
    public class ExceptionOnCacheKey implements CacheRequest {
        CacheKey cacheKey;
        Exception exception;

        public ExceptionOnCacheKey(CacheKey cacheKey, Exception exc) {
            this.cacheKey = cacheKey;
            this.exception = exc;
        }

        @Override // org.apache.hedwig.server.persistence.ReadAheadCache.CacheRequest
        public void performRequest() {
            ReadAheadCache.this.removeMessageFromCache(this.cacheKey, this.exception, true, true);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/hedwig/server/persistence/ReadAheadCache$HashSetCacheKeyFactory.class */
    public static class HashSetCacheKeyFactory implements Factory<Set<CacheKey>> {
        protected static final HashSetCacheKeyFactory instance = new HashSetCacheKeyFactory();

        protected HashSetCacheKeyFactory() {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.hedwig.server.persistence.Factory
        public Set<CacheKey> newInstance() {
            return new HashSet();
        }
    }

    /* loaded from: input_file:org/apache/hedwig/server/persistence/ReadAheadCache$NoSuchSeqIdException.class */
    protected static class NoSuchSeqIdException extends Exception {
        public NoSuchSeqIdException() {
            super("No such seq-id");
        }
    }

    /* loaded from: input_file:org/apache/hedwig/server/persistence/ReadAheadCache$PersistCallback.class */
    public class PersistCallback implements Callback<PubSubProtocol.MessageSeqId> {
        public PersistCallback() {
        }

        public void operationFailed(Object obj, PubSubException pubSubException) {
            PersistRequest persistRequest = (PersistRequest) obj;
            persistRequest.getCallback().operationFailed(persistRequest.getCtx(), pubSubException);
        }

        public void operationFinished(Object obj, PubSubProtocol.MessageSeqId messageSeqId) {
            PersistRequest persistRequest = (PersistRequest) obj;
            persistRequest.getCallback().operationFinished(persistRequest.getCtx(), messageSeqId);
            PubSubProtocol.Message mergeLocalSeqId = MessageIdUtils.mergeLocalSeqId(persistRequest.getMessage(), messageSeqId.getLocalComponent());
            CacheKey cacheKey = new CacheKey(persistRequest.getTopic(), messageSeqId.getLocalComponent());
            ReadAheadCache.this.enqueueWithoutFailureByTopic(cacheKey.getTopic(), new ScanResponse(cacheKey, mergeLocalSeqId));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/hedwig/server/persistence/ReadAheadCache$ReadAheadException.class */
    public static class ReadAheadException extends Exception {
        public ReadAheadException() {
            super("Readahead failed");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/hedwig/server/persistence/ReadAheadCache$ReadAheadScanCallback.class */
    public class ReadAheadScanCallback implements ScanCallback {
        Queue<CacheKey> installedStubs;
        ByteString topic;

        public ReadAheadScanCallback(Queue<CacheKey> queue, ByteString byteString) {
            this.installedStubs = queue;
            this.topic = byteString;
        }

        @Override // org.apache.hedwig.server.persistence.ScanCallback
        public void messageScanned(Object obj, PubSubProtocol.Message message) {
            CacheKey cacheKey = new CacheKey(this.topic, message.getMsgId().getLocalComponent());
            ReadAheadCache.this.enqueueWithoutFailureByTopic(this.topic, new ScanResponse(cacheKey, message));
            CacheKey peek = this.installedStubs.peek();
            if (peek == null) {
                return;
            }
            if (peek.equals(cacheKey)) {
                this.installedStubs.poll();
            } else {
                ReadAheadCache.logger.warn("Unexpected message seq-id: " + message.getMsgId().getLocalComponent() + " on topic: " + this.topic.toStringUtf8() + " from readahead scan, was expecting seq-id: " + peek.seqId + " topic: " + peek.topic.toStringUtf8() + " installedStubs: " + this.installedStubs);
                enqueueDeleteOfRemainingStubs(ReadAheadCache.this.noSuchSeqIdExceptionInstance);
            }
        }

        @Override // org.apache.hedwig.server.persistence.ScanCallback
        public void scanFailed(Object obj, Exception exc) {
            enqueueDeleteOfRemainingStubs(exc);
        }

        @Override // org.apache.hedwig.server.persistence.ScanCallback
        public void scanFinished(Object obj, ScanCallback.ReasonForFinish reasonForFinish) {
            if (reasonForFinish != ScanCallback.ReasonForFinish.NO_MORE_MESSAGES) {
                enqueueDeleteOfRemainingStubs(ReadAheadCache.this.readAheadExceptionInstance);
            }
        }

        private void enqueueDeleteOfRemainingStubs(Exception exc) {
            while (true) {
                CacheKey poll = this.installedStubs.poll();
                if (poll == null) {
                    return;
                } else {
                    ReadAheadCache.this.enqueueWithoutFailureByTopic(poll.getTopic(), new ExceptionOnCacheKey(poll, exc));
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/hedwig/server/persistence/ReadAheadCache$ScanRequestWrapper.class */
    public class ScanRequestWrapper implements CacheRequest {
        ScanRequest request;

        public ScanRequestWrapper(ScanRequest scanRequest) {
            this.request = scanRequest;
        }

        @Override // org.apache.hedwig.server.persistence.ReadAheadCache.CacheRequest
        public void performRequest() {
            RangeScanRequest doReadAhead = ReadAheadCache.this.doReadAhead(this.request);
            CacheKey cacheKey = new CacheKey(this.request.getTopic(), this.request.getStartSeqId());
            CacheValue cacheValue = ReadAheadCache.this.cache.get(cacheKey);
            if (null == cacheValue) {
                ReadAheadCache.logger.error("Cache key {} is removed after installing stub when scanning.", cacheKey);
                ReadAheadCache.this.scanSingleMessage(this.request);
                return;
            }
            synchronized (cacheValue) {
                cacheValue.addCallback(this.request.getCallback(), this.request.getCtx());
            }
            if (doReadAhead != null) {
                ReadAheadCache.this.realPersistenceManager.scanMessages(doReadAhead);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/hedwig/server/persistence/ReadAheadCache$ScanResponse.class */
    public class ScanResponse implements CacheRequest {
        CacheKey cacheKey;
        PubSubProtocol.Message message;

        public ScanResponse(CacheKey cacheKey, PubSubProtocol.Message message) {
            this.cacheKey = cacheKey;
            this.message = message;
        }

        @Override // org.apache.hedwig.server.persistence.ReadAheadCache.CacheRequest
        public void performRequest() {
            ReadAheadCache.this.addMessageToCache(this.cacheKey, this.message, MathUtils.now());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/hedwig/server/persistence/ReadAheadCache$TreeSetLongFactory.class */
    public static class TreeSetLongFactory implements Factory<SortedSet<Long>> {
        protected static final TreeSetLongFactory instance = new TreeSetLongFactory();

        protected TreeSetLongFactory() {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.hedwig.server.persistence.Factory
        public SortedSet<Long> newInstance() {
            return new TreeSet();
        }
    }

    public ReadAheadCache(PersistenceManagerWithRangeScan persistenceManagerWithRangeScan, ServerConfiguration serverConfiguration) {
        this.realPersistenceManager = persistenceManagerWithRangeScan;
        this.cfg = serverConfiguration;
        this.numCacheWorkers = serverConfiguration.getNumReadAheadCacheThreads();
        this.cacheWorkers = new OrderedSafeExecutor(this.numCacheWorkers);
        reloadConf(serverConfiguration);
    }

    protected void reloadConf(ServerConfiguration serverConfiguration) {
        this.maxSegmentSize = serverConfiguration.getMaximumCacheSize() / this.numCacheWorkers;
        this.cacheEntryTTL = serverConfiguration.getCacheEntryTTL();
    }

    public ReadAheadCache start() {
        return this;
    }

    @Override // org.apache.hedwig.server.persistence.PersistenceManager
    public long getSeqIdAfterSkipping(ByteString byteString, long j, int i) {
        return this.realPersistenceManager.getSeqIdAfterSkipping(byteString, j, i);
    }

    @Override // org.apache.hedwig.server.persistence.PersistenceManager
    public PubSubProtocol.MessageSeqId getCurrentSeqIdForTopic(ByteString byteString) throws PubSubException.ServerNotResponsibleForTopicException {
        return this.realPersistenceManager.getCurrentSeqIdForTopic(byteString);
    }

    @Override // org.apache.hedwig.server.persistence.PersistenceManager
    public void persistMessage(PersistRequest persistRequest) {
        this.realPersistenceManager.persistMessage(new PersistRequest(persistRequest.getTopic(), persistRequest.getMessage(), this.persistCallbackInstance, persistRequest));
    }

    protected void enqueueWithoutFailureByTopic(ByteString byteString, final CacheRequest cacheRequest) {
        if (this.keepRunning) {
            try {
                this.numPendingRequests.incrementAndGet();
                this.cacheWorkers.submitOrdered(byteString, new SafeRunnable() { // from class: org.apache.hedwig.server.persistence.ReadAheadCache.2
                    public void safeRun() {
                        ReadAheadCache.this.numPendingRequests.decrementAndGet();
                        cacheRequest.performRequest();
                    }
                });
            } catch (RejectedExecutionException e) {
                logger.error("Failed to submit cache request for topic " + byteString.toStringUtf8() + " : ", e);
            }
        }
    }

    @Override // org.apache.hedwig.server.persistence.PersistenceManager
    public void scanSingleMessage(ScanRequest scanRequest) {
        enqueueWithoutFailureByTopic(scanRequest.getTopic(), new ScanRequestWrapper(scanRequest));
    }

    @Override // org.apache.hedwig.server.persistence.PersistenceManager
    public void deliveredUntil(ByteString byteString, Long l) {
        enqueueWithoutFailureByTopic(byteString, new DeliveredUntil(byteString, l));
    }

    @Override // org.apache.hedwig.server.persistence.PersistenceManager
    public void consumedUntil(ByteString byteString, Long l) {
        this.realPersistenceManager.consumedUntil(byteString, l);
    }

    @Override // org.apache.hedwig.server.persistence.PersistenceManager
    public void setMessageBound(ByteString byteString, Integer num) {
        this.realPersistenceManager.setMessageBound(byteString, num);
    }

    @Override // org.apache.hedwig.server.persistence.PersistenceManager
    public void clearMessageBound(ByteString byteString) {
        this.realPersistenceManager.clearMessageBound(byteString);
    }

    @Override // org.apache.hedwig.server.persistence.PersistenceManager
    public void consumeToBound(ByteString byteString) {
        this.realPersistenceManager.consumeToBound(byteString);
    }

    @Override // org.apache.hedwig.server.persistence.PersistenceManager
    public void stop() {
        try {
            this.keepRunning = false;
            this.cacheWorkers.shutdown();
        } catch (Exception e) {
            logger.warn("Failed to shut down cache workers : ", e);
        }
    }

    protected RangeScanRequest doReadAhead(ScanRequest scanRequest) {
        ByteString topic = scanRequest.getTopic();
        Long valueOf = Long.valueOf(scanRequest.getStartSeqId());
        int max = Math.max(1, this.cfg.getReadAheadCount());
        RangeScanRequest doReadAheadStartingFrom = doReadAheadStartingFrom(topic, valueOf.longValue(), max);
        return doReadAheadStartingFrom != null ? doReadAheadStartingFrom : doReadAheadStartingFrom(topic, Long.valueOf(this.realPersistenceManager.getSeqIdAfterSkipping(topic, valueOf.longValue(), max / 2)).longValue(), max / 2);
    }

    protected RangeScanRequest doReadAheadStartingFrom(ByteString byteString, long j, int i) {
        LinkedList linkedList = new LinkedList();
        int i2 = 0;
        while (i2 < i) {
            CacheKey cacheKey = new CacheKey(byteString, j);
            if (this.cache.containsKey(cacheKey)) {
                break;
            }
            if (null != this.cache.putIfAbsent(cacheKey, new CacheValue())) {
                logger.warn("It is unexpected that more than one threads are adding message to cache key {} at the same time.", cacheKey);
            }
            logger.debug("Adding cache stub for: {}", cacheKey);
            linkedList.add(cacheKey);
            j = this.realPersistenceManager.getSeqIdAfterSkipping(byteString, j, 1);
            i2++;
        }
        if (i2 == 0) {
            return null;
        }
        return new RangeScanRequest(byteString, j, i2, this.cfg.getReadAheadSizeBytes(), new ReadAheadScanCallback(linkedList, byteString), null);
    }

    protected void addMessageToCache(CacheKey cacheKey, PubSubProtocol.Message message, long j) {
        logger.debug("Adding msg {} to readahead cache", cacheKey);
        CacheValue cacheValue = this.cache.get(cacheKey);
        CacheValue cacheValue2 = cacheValue;
        if (cacheValue == null) {
            cacheValue2 = new CacheValue();
            CacheValue putIfAbsent = this.cache.putIfAbsent(cacheKey, cacheValue2);
            if (null != putIfAbsent) {
                logger.warn("Weird! Should not have two threads adding message to cache key {} at the same time.", cacheKey);
                cacheValue2 = putIfAbsent;
            }
        }
        CacheSegment cacheSegment = this.cacheSegment.get();
        if (cacheValue2.isStub()) {
            int size = message.getBody().size();
            cacheSegment.presentSegmentSize.addAndGet(size);
            this.presentCacheSize.addAndGet(size);
        }
        synchronized (cacheValue2) {
            cacheValue2.setMessageAndInvokeCallbacks(message, j);
        }
        MapMethods.addToMultiMap(this.orderedIndexOnSeqId, cacheKey.getTopic(), Long.valueOf(cacheKey.getSeqId()), TreeSetLongFactory.instance);
        MapMethods.addToMultiMap(cacheSegment.timeIndexOfAddition, Long.valueOf(j), cacheKey, HashSetCacheKeyFactory.instance);
        collectOldOrExpiredCacheEntries(cacheSegment);
    }

    protected void removeMessageFromCache(CacheKey cacheKey, Exception exc, boolean z, boolean z2) {
        CacheValue remove = this.cache.remove(cacheKey);
        if (remove == null) {
            return;
        }
        CacheSegment cacheSegment = this.cacheSegment.get();
        synchronized (remove) {
            if (remove.isStub()) {
                remove.setErrorAndInvokeCallbacks(exc);
                return;
            }
            int size = 0 - remove.getMessage().getBody().size();
            this.presentCacheSize.addAndGet(size);
            cacheSegment.presentSegmentSize.addAndGet(size);
            long timeOfAddition = remove.getTimeOfAddition();
            if (z2) {
                MapMethods.removeFromMultiMap(this.orderedIndexOnSeqId, cacheKey.getTopic(), Long.valueOf(cacheKey.getSeqId()));
            }
            if (z) {
                MapMethods.removeFromMultiMap(cacheSegment.timeIndexOfAddition, Long.valueOf(timeOfAddition), cacheKey);
            }
        }
    }

    protected void collectOldOrExpiredCacheEntries(CacheSegment cacheSegment) {
        if (this.cacheEntryTTL > 0) {
            while (!cacheSegment.timeIndexOfAddition.isEmpty()) {
                Long firstKey = cacheSegment.timeIndexOfAddition.firstKey();
                if (MathUtils.now() - firstKey.longValue() < this.cacheEntryTTL) {
                    break;
                } else {
                    collectCacheEntriesAtTimestamp(cacheSegment, firstKey.longValue());
                }
            }
        }
        while (cacheSegment.presentSegmentSize.get() > this.maxSegmentSize && !cacheSegment.timeIndexOfAddition.isEmpty()) {
            collectCacheEntriesAtTimestamp(cacheSegment, cacheSegment.timeIndexOfAddition.firstKey().longValue());
        }
    }

    private void collectCacheEntriesAtTimestamp(CacheSegment cacheSegment, long j) {
        for (CacheKey cacheKey : cacheSegment.timeIndexOfAddition.get(Long.valueOf(j))) {
            logger.debug("Removing {} from cache because it's the oldest.", cacheKey);
            removeMessageFromCache(cacheKey, this.readAheadExceptionInstance, false, true);
        }
        cacheSegment.timeIndexOfAddition.remove(Long.valueOf(j));
    }

    public void cancelScanRequest(ByteString byteString, CancelScanRequest cancelScanRequest) {
        enqueueWithoutFailureByTopic(byteString, new CancelScanRequestOp(cancelScanRequest));
    }

    @Override // org.apache.hedwig.server.jmx.HedwigJMXService
    public void registerJMX(HedwigMBeanInfo hedwigMBeanInfo) {
        try {
            this.jmxCacheBean = new ReadAheadCacheBean(this);
            HedwigMBeanRegistry.getInstance().register(this.jmxCacheBean, hedwigMBeanInfo);
        } catch (Exception e) {
            logger.warn("Failed to register readahead cache with JMX", e);
            this.jmxCacheBean = null;
        }
    }

    @Override // org.apache.hedwig.server.jmx.HedwigJMXService
    public void unregisterJMX() {
        try {
            if (this.jmxCacheBean != null) {
                HedwigMBeanRegistry.getInstance().unregister(this.jmxCacheBean);
            }
        } catch (Exception e) {
            logger.warn("Failed to unregister readahead cache with JMX", e);
        }
    }
}
