package com.indeed.lsmtree.recordcache;

import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.UnmodifiableIterator;
import com.indeed.lsmtree.core.StorageType;
import com.indeed.lsmtree.core.Store;
import com.indeed.lsmtree.core.StoreBuilder;
import com.indeed.lsmtree.recordcache.RecordLogDirectoryPoller;
import com.indeed.lsmtree.recordlog.RecordFile;
import com.indeed.lsmtree.recordlog.RecordLogDirectory;
import com.indeed.util.core.Either;
import com.indeed.util.core.threads.NamedThreadFactory;
import com.indeed.util.serialization.LongSerializer;
import com.indeed.util.serialization.Serializer;
import com.indeed.util.varexport.Export;
import fj.P;
import fj.P2;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.commons.collections.comparators.ComparableComparator;
import org.apache.log4j.Logger;

/* loaded from: input_file:com/indeed/lsmtree/recordcache/PersistentRecordCache.class */
public final class PersistentRecordCache<K, V> implements RecordCache<K, V> {
    private static final Logger log = Logger.getLogger(PersistentRecordCache.class);
    private final Store<K, Long> index;
    private final RecordLogDirectory<Operation> recordLogDirectory;
    private final RecordLogDirectoryPoller.Functions indexUpdateFunctions;
    private final AtomicInteger repairedSegments;
    private final Comparator<K> comparator;

    /* loaded from: input_file:com/indeed/lsmtree/recordcache/PersistentRecordCache$Builder.class */
    public static class Builder<K, V> {
        private File indexDir;
        private File checkpointDir;
        private Serializer<K> keySerializer;
        private boolean dedicatedIndexPartition;
        private RecordLogDirectory<Operation> recordLogDirectory;
        private Comparator<K> comparator = new ComparableComparator();
        private boolean mlockIndex = false;
        private boolean mlockBloomFilters = false;
        private long bloomFilterMemory = -1;

        public PersistentRecordCache<K, V> build() throws IOException {
            if (this.indexDir == null) {
                throw new IllegalArgumentException("indexDir must be set");
            }
            if (this.recordLogDirectory == null) {
                throw new IllegalArgumentException("fileCache must be set");
            }
            if (this.keySerializer == null) {
                throw new IllegalArgumentException("keySerializer must be set");
            }
            StoreBuilder storeBuilder = new StoreBuilder(this.indexDir, this.keySerializer, new LongSerializer());
            storeBuilder.setMaxVolatileGenerationSize(8388608L);
            storeBuilder.setStorageType(StorageType.INLINE);
            storeBuilder.setComparator(this.comparator);
            storeBuilder.setDedicatedPartition(this.dedicatedIndexPartition);
            storeBuilder.setMlockFiles(this.mlockIndex);
            storeBuilder.setMlockBloomFilters(this.mlockBloomFilters);
            if (this.bloomFilterMemory >= 0) {
                storeBuilder.setBloomFilterMemory(this.bloomFilterMemory);
            }
            return new PersistentRecordCache<>(storeBuilder.build(), this.recordLogDirectory, this.checkpointDir);
        }

        public Builder<K, V> setIndexDir(File file) {
            this.indexDir = file;
            return this;
        }

        public Builder<K, V> setKeySerializer(Serializer<K> serializer) {
            this.keySerializer = serializer;
            return this;
        }

        public Builder<K, V> setComparator(Comparator<K> comparator) {
            this.comparator = comparator;
            return this;
        }

        public Builder<K, V> setRecordLogDirectory(RecordLogDirectory<Operation> recordLogDirectory) {
            this.recordLogDirectory = recordLogDirectory;
            return this;
        }

        public Builder<K, V> setCheckpointDir(File file) {
            this.checkpointDir = file;
            return this;
        }

        public boolean isDedicatedIndexPartition() {
            return this.dedicatedIndexPartition;
        }

        public Builder<K, V> setDedicatedIndexPartition(boolean z) {
            this.dedicatedIndexPartition = z;
            return this;
        }

        public boolean isMlockIndex() {
            return this.mlockIndex;
        }

        public Builder<K, V> setMlockIndex(boolean z) {
            this.mlockIndex = z;
            return this;
        }

        public boolean isMlockBloomFilters() {
            return this.mlockBloomFilters;
        }

        public Builder<K, V> setMlockBloomFilters(boolean z) {
            this.mlockBloomFilters = z;
            return this;
        }

        public long getBloomFilterMemory() {
            return this.bloomFilterMemory;
        }

        public Builder<K, V> setBloomFilterMemory(long j) {
            this.bloomFilterMemory = j;
            return this;
        }
    }

    /* loaded from: input_file:com/indeed/lsmtree/recordcache/PersistentRecordCache$RecordLookupTask.class */
    private final class RecordLookupTask implements Callable<List<Either<Exception, P2<K, V>>>> {
        private final List<Long> addresses;

        private RecordLookupTask(List<Long> list) {
            this.addresses = list;
        }

        @Override // java.util.concurrent.Callable
        public List<Either<Exception, P2<K, V>>> call() {
            ArrayList newArrayList = Lists.newArrayList();
            for (Long l : this.addresses) {
                Put put = null;
                try {
                    try {
                        put = PersistentRecordCache.this.lookupAddress(null, l);
                    } catch (Exception e) {
                        PersistentRecordCache.log.info("exception looking up address: " + l + ", attempting repair", e);
                        PersistentRecordCache.this.reindex(l.longValue());
                        PersistentRecordCache.log.info("reindex successful");
                    }
                } catch (Exception e2) {
                    newArrayList.add(Either.Left.of(e2));
                }
                if (put == null) {
                    throw new IOException("record for address " + l + " does not exist for some reason");
                    break;
                }
                newArrayList.add(Either.Right.of(P.p(put.getKey(), put.getValue())));
            }
            return newArrayList;
        }
    }

    private PersistentRecordCache(final Store<K, Long> store, RecordLogDirectory<Operation> recordLogDirectory, final File file) throws IOException {
        this.repairedSegments = new AtomicInteger(0);
        this.index = store;
        this.comparator = store.getComparator();
        this.recordLogDirectory = recordLogDirectory;
        this.indexUpdateFunctions = new RecordLogDirectoryPoller.Functions() { // from class: com.indeed.lsmtree.recordcache.PersistentRecordCache.1
            AtomicLong indexPutTime = new AtomicLong(0);
            AtomicLong indexDeleteTime = new AtomicLong(0);
            AtomicInteger indexPuts = new AtomicInteger(0);
            AtomicInteger indexDeletes = new AtomicInteger(0);
            AtomicInteger count = new AtomicInteger(0);

            @Override // com.indeed.lsmtree.recordcache.RecordLogDirectoryPoller.Functions
            public void process(long j, Operation operation) throws IOException {
                this.count.incrementAndGet();
                if (this.count.get() % 1000 == 0) {
                    if (this.indexPuts.get() > 0) {
                        PersistentRecordCache.log.debug("avg index put time: " + ((this.indexPutTime.get() / r0) / 1000.0d) + " us");
                    }
                    if (this.indexDeletes.get() > 0) {
                        PersistentRecordCache.log.debug("avg index delete time: " + ((this.indexDeleteTime.get() / r0) / 1000.0d) + " us");
                    }
                }
                if (operation.getClass() == Put.class) {
                    Put put = (Put) operation;
                    long nanoTime = System.nanoTime();
                    synchronized (store) {
                        store.put(put.getKey(), Long.valueOf(j));
                    }
                    this.indexPutTime.addAndGet(System.nanoTime() - nanoTime);
                    this.indexPuts.incrementAndGet();
                    return;
                }
                if (operation.getClass() != Delete.class) {
                    if (operation.getClass() != Checkpoint.class) {
                        PersistentRecordCache.log.warn("operation class unknown");
                        return;
                    }
                    Checkpoint checkpoint = (Checkpoint) operation;
                    if (file != null) {
                        sync();
                        store.checkpoint(new File(file, String.valueOf(checkpoint.getTimestamp())));
                        return;
                    }
                    return;
                }
                for (K k : ((Delete) operation).getKeys()) {
                    long nanoTime2 = System.nanoTime();
                    synchronized (store) {
                        store.delete(k);
                    }
                    this.indexDeleteTime.addAndGet(System.nanoTime() - nanoTime2);
                    this.indexDeletes.incrementAndGet();
                }
            }

            public void sync() throws IOException {
                long nanoTime = System.nanoTime();
                store.sync();
                PersistentRecordCache.log.debug("sync time: " + ((System.nanoTime() - nanoTime) / 1000.0d) + " us");
            }
        };
    }

    @Export(name = "repaired-segments")
    public int getRepairedSegments() {
        return this.repairedSegments.get();
    }

    @Export(name = "index-active-space-usage")
    public long getIndexActiveSpaceUsage() throws IOException {
        return this.index.getActiveSpaceUsage();
    }

    @Export(name = "index-total-space-usage")
    public long getIndexTotalSpaceUsage() throws IOException {
        return this.index.getTotalSpaceUsage();
    }

    @Export(name = "index-reserverd-space-usage")
    public long getIndexReservedSpaceUsage() {
        return this.index.getReservedSpaceUsage();
    }

    @Export(name = "index-free-space")
    public long getIndexFreeSpace() throws IOException {
        return this.index.getFreeSpace();
    }

    @Override // com.indeed.lsmtree.recordcache.RecordCache
    @Nullable
    public V get(@Nonnull K k, @Nonnull CacheStats cacheStats) {
        Map<K, V> all = getAll(Collections.singleton(k), cacheStats);
        if (all.size() > 0) {
            return all.get(k);
        }
        return null;
    }

    @Override // com.indeed.lsmtree.recordcache.RecordCache
    @Nonnull
    public Map<K, V> getAll(@Nonnull Collection<K> collection, @Nonnull CacheStats cacheStats) {
        Put<K, V> lookupAddress;
        HashMap newHashMap = Maps.newHashMap();
        for (K k : collection) {
            try {
                long nanoTime = System.nanoTime();
                try {
                    Long l = (Long) this.index.get(k);
                    cacheStats.indexTime += System.nanoTime() - nanoTime;
                    if (l != null) {
                        try {
                            lookupAddress = lookupAddress(cacheStats, l);
                        } catch (Exception e) {
                            log.info("exception looking up key: " + k + ", attempting repair ", e);
                            try {
                                reindex(l.longValue());
                                try {
                                    lookupAddress = lookupAddress(cacheStats, (Long) this.index.get(k));
                                    log.info("reindex successful");
                                } catch (Exception e2) {
                                    log.error("index read error while fetching key " + k, e2);
                                    cacheStats.indexReadErrors++;
                                    throw e2;
                                }
                            } catch (IndexReadException e3) {
                                log.error("index read error while fetching key " + k, e3);
                                cacheStats.indexReadErrors++;
                                throw e3;
                            }
                        }
                        if (this.comparator.compare(lookupAddress.getKey(), k) != 0) {
                            throw new IOException("keys do not match - expected: " + k + " actual: " + lookupAddress.getKey());
                            break;
                        }
                        newHashMap.put(k, lookupAddress.getValue());
                    }
                } catch (Exception e4) {
                    log.error("index read error while fetching key " + k, e4);
                    cacheStats.indexReadErrors++;
                    throw e4;
                }
            } catch (Exception e5) {
                log.error("error fetching key: " + k, e5);
                cacheStats.recordLogReadErrors++;
            }
        }
        cacheStats.persistentStoreHits = newHashMap.size();
        log.debug("persistent store hits: " + newHashMap.size());
        cacheStats.misses = collection.size() - newHashMap.size();
        log.debug("misses: " + (collection.size() - newHashMap.size()));
        return newHashMap;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Put<K, V> lookupAddress(@Nullable CacheStats cacheStats, Long l) throws IOException {
        long nanoTime = System.nanoTime();
        Operation operation = (Operation) this.recordLogDirectory.get(l.longValue());
        if (cacheStats != null) {
            cacheStats.recordLogTime += System.nanoTime() - nanoTime;
        }
        if (operation.getClass() != Put.class) {
            throw new IOException("class is not Put");
        }
        Put<K, V> put = (Put) operation;
        put.getValue();
        return put;
    }

    public Iterator<Either<Exception, P2<K, V>>> getStreaming(@Nonnull Iterator<K> it, @Nullable final AtomicInteger atomicInteger, @Nullable AtomicInteger atomicInteger2) {
        log.info("starting store lookups");
        LongArrayList longArrayList = new LongArrayList();
        int i = 0;
        while (it.hasNext()) {
            try {
                Long l = (Long) this.index.get(it.next());
                if (l != null) {
                    longArrayList.add(l);
                } else {
                    i++;
                }
            } catch (IOException e) {
                log.error("error", e);
                return Iterators.singletonIterator(Either.Left.of(new IndexReadException(e)));
            }
        }
        if (atomicInteger != null) {
            atomicInteger.addAndGet(i);
        }
        if (atomicInteger2 != null) {
            atomicInteger2.addAndGet(i);
        }
        log.info("store lookups complete, sorting addresses");
        Arrays.sort(longArrayList.elements(), 0, longArrayList.size());
        log.info("initializing store lookup iterator");
        final ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(100);
        final UnmodifiableIterator partition = Iterators.partition(longArrayList.iterator(), 1000);
        final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, arrayBlockingQueue, new NamedThreadFactory("store priming thread", true, log), new RejectedExecutionHandler() { // from class: com.indeed.lsmtree.recordcache.PersistentRecordCache.2
            @Override // java.util.concurrent.RejectedExecutionHandler
            public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor2) {
                try {
                    arrayBlockingQueue.put(runnable);
                } catch (InterruptedException e2) {
                    PersistentRecordCache.log.error("error", e2);
                    throw new RuntimeException(e2);
                }
            }
        });
        final ArrayBlockingQueue arrayBlockingQueue2 = new ArrayBlockingQueue(10);
        final AtomicLong atomicLong = new AtomicLong(0L);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        new Thread(new Runnable() { // from class: com.indeed.lsmtree.recordcache.PersistentRecordCache.3
            @Override // java.lang.Runnable
            public void run() {
                while (partition.hasNext()) {
                    atomicLong.incrementAndGet();
                    threadPoolExecutor.submit(new FutureTask<List<Either<Exception, P2<K, V>>>>(new RecordLookupTask((List) partition.next())) { // from class: com.indeed.lsmtree.recordcache.PersistentRecordCache.3.1
                        @Override // java.util.concurrent.FutureTask
                        protected void done() {
                            try {
                                List<Either<Exception, P2<K, V>>> list = get();
                                if (atomicInteger != null) {
                                    atomicInteger.addAndGet(list.size());
                                }
                                arrayBlockingQueue2.put(list);
                            } catch (InterruptedException e2) {
                                PersistentRecordCache.log.error("error", e2);
                                throw new RuntimeException(e2);
                            } catch (ExecutionException e3) {
                                PersistentRecordCache.log.error("error", e3);
                                throw new RuntimeException(e3);
                            }
                        }
                    });
                }
                atomicBoolean.set(false);
            }
        }, "RecordLookupTaskSubmitterThread").start();
        return new Iterator<Either<Exception, P2<K, V>>>() { // from class: com.indeed.lsmtree.recordcache.PersistentRecordCache.4
            Iterator<Either<Exception, P2<K, V>>> currentIterator;

            @Override // java.util.Iterator
            public boolean hasNext() {
                if (this.currentIterator != null && this.currentIterator.hasNext()) {
                    return true;
                }
                while (true) {
                    if (!atomicBoolean.get() && atomicLong.get() <= 0) {
                        threadPoolExecutor.shutdown();
                        return false;
                    }
                    try {
                        List list = (List) arrayBlockingQueue2.poll(1L, TimeUnit.SECONDS);
                        if (list != null) {
                            PersistentRecordCache.log.debug("remaining: " + atomicLong.decrementAndGet());
                            this.currentIterator = list.iterator();
                            if (this.currentIterator.hasNext()) {
                                return true;
                            }
                        }
                    } catch (InterruptedException e2) {
                        PersistentRecordCache.log.error("error", e2);
                        throw new RuntimeException(e2);
                    }
                }
            }

            @Override // java.util.Iterator
            public Either<Exception, P2<K, V>> next() {
                return this.currentIterator.next();
            }

            @Override // java.util.Iterator
            public void remove() {
                throw new UnsupportedOperationException();
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Finally extract failed */
    public void reindex(long j) throws IndexReadException {
        int segmentNum = this.recordLogDirectory.getSegmentNum(j);
        try {
            Iterator it = this.recordLogDirectory.getFileReader(segmentNum).iterator();
            while (it.hasNext()) {
                RecordFile.Reader reader = (RecordFile.Reader) it.next();
                while (reader.next()) {
                    try {
                        Operation operation = (Operation) reader.get();
                        if (operation.getClass() == Put.class) {
                            Object key = ((Put) operation).getKey();
                            long position = reader.getPosition();
                            synchronized (this.index) {
                                try {
                                    Long l = (Long) this.index.get(key);
                                    if (l != null && l.longValue() != position) {
                                        if (this.recordLogDirectory.getSegmentNum(l.longValue()) == segmentNum) {
                                            this.index.put(key, Long.valueOf(position));
                                        }
                                    }
                                } catch (Exception e) {
                                    throw new IndexReadException(e);
                                }
                            }
                        }
                    } catch (Throwable th) {
                        reader.close();
                        throw th;
                    }
                }
                reader.close();
            }
        } catch (IndexReadException e2) {
            log.error("error", e2);
            throw e2;
        } catch (Exception e3) {
            log.error("error reindexing segment number " + segmentNum, e3);
        }
        this.repairedSegments.incrementAndGet();
    }

    @Override // com.indeed.lsmtree.recordcache.RecordCache
    public RecordLogDirectoryPoller.Functions getFunctions() {
        return this.indexUpdateFunctions;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.index.close();
    }

    public void waitForCompactions() throws InterruptedException {
        this.index.waitForCompactions();
    }
}
