package com.oceanbase.connector.flink.shaded.org.apache.hadoop.hbase.regionserver;

import com.oceanbase.connector.flink.shaded.com.google.common.base.Preconditions;
import com.oceanbase.connector.flink.shaded.org.apache.hadoop.conf.Configuration;
import com.oceanbase.connector.flink.shaded.org.apache.hadoop.hbase.DroppedSnapshotException;
import com.oceanbase.connector.flink.shaded.org.apache.hadoop.hbase.HConstants;
import com.oceanbase.connector.flink.shaded.org.apache.hadoop.hbase.RemoteExceptionHandler;
import com.oceanbase.connector.flink.shaded.org.apache.hadoop.hbase.util.Bytes;
import com.oceanbase.connector.flink.shaded.org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import com.oceanbase.connector.flink.shaded.org.apache.hadoop.hbase.util.HasThread;
import com.oceanbase.connector.flink.shaded.org.apache.hadoop.util.StringUtils;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.cliffc.high_scale_lib.Counter;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/oceanbase/connector/flink/shaded/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.class */
public class MemStoreFlusher extends HasThread implements FlushRequester {
    static final Log LOG = LogFactory.getLog(MemStoreFlusher.class);
    private final long threadWakeFrequency;
    private final HRegionServer server;
    protected final long globalMemStoreLimit;
    protected final long globalMemStoreLimitLowMark;
    private static final float DEFAULT_UPPER = 0.4f;
    private static final float DEFAULT_LOWER = 0.35f;
    private static final String UPPER_KEY = "hbase.regionserver.global.memstore.upperLimit";
    private static final String LOWER_KEY = "hbase.regionserver.global.memstore.lowerLimit";
    private long blockingWaitTime;
    private final BlockingQueue<FlushQueueEntry> flushQueue = new DelayQueue();
    private final Map<HRegion, FlushRegionEntry> regionsInQueue = new HashMap();
    private AtomicBoolean wakeupPending = new AtomicBoolean();
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition flushOccurred = this.lock.newCondition();
    private final Counter updatesBlockedMsHighWater = new Counter();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/oceanbase/connector/flink/shaded/org/apache/hadoop/hbase/regionserver/MemStoreFlusher$FlushQueueEntry.class */
    public interface FlushQueueEntry extends Delayed {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/oceanbase/connector/flink/shaded/org/apache/hadoop/hbase/regionserver/MemStoreFlusher$FlushRegionEntry.class */
    public static class FlushRegionEntry implements FlushQueueEntry {
        private final HRegion region;
        private int requeueCount = 0;
        private final long createTime = System.currentTimeMillis();
        private long whenToExpire = this.createTime;

        FlushRegionEntry(HRegion hRegion) {
            this.region = hRegion;
        }

        public boolean isMaximumWait(long j) {
            return System.currentTimeMillis() - this.createTime > j;
        }

        public int getRequeueCount() {
            return this.requeueCount;
        }

        public FlushRegionEntry requeue(long j) {
            this.whenToExpire = System.currentTimeMillis() + j;
            this.requeueCount++;
            return this;
        }

        @Override // java.util.concurrent.Delayed
        public long getDelay(TimeUnit timeUnit) {
            return timeUnit.convert(this.whenToExpire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }

        @Override // java.lang.Comparable
        public int compareTo(Delayed delayed) {
            return Long.valueOf(getDelay(TimeUnit.MILLISECONDS) - delayed.getDelay(TimeUnit.MILLISECONDS)).intValue();
        }

        public String toString() {
            return "[flush region " + Bytes.toStringBinary(this.region.getRegionName()) + "]";
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/oceanbase/connector/flink/shaded/org/apache/hadoop/hbase/regionserver/MemStoreFlusher$WakeupFlushThread.class */
    public static class WakeupFlushThread implements FlushQueueEntry {
        WakeupFlushThread() {
        }

        @Override // java.util.concurrent.Delayed
        public long getDelay(TimeUnit timeUnit) {
            return 0L;
        }

        @Override // java.lang.Comparable
        public int compareTo(Delayed delayed) {
            return -1;
        }
    }

    public MemStoreFlusher(Configuration configuration, HRegionServer hRegionServer) {
        this.server = hRegionServer;
        this.threadWakeFrequency = configuration.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10000L);
        long max = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage().getMax();
        this.globalMemStoreLimit = globalMemStoreLimit(max, DEFAULT_UPPER, UPPER_KEY, configuration);
        long globalMemStoreLimit = globalMemStoreLimit(max, DEFAULT_LOWER, LOWER_KEY, configuration);
        if (globalMemStoreLimit > this.globalMemStoreLimit) {
            globalMemStoreLimit = this.globalMemStoreLimit;
            LOG.info("Setting globalMemStoreLimitLowMark == globalMemStoreLimit because supplied hbase.regionserver.global.memstore.lowerLimit was > hbase.regionserver.global.memstore.upperLimit");
        }
        this.globalMemStoreLimitLowMark = globalMemStoreLimit;
        this.blockingWaitTime = configuration.getInt("hbase.hstore.blockingWaitTime", 90000);
        LOG.info("globalMemStoreLimit=" + StringUtils.humanReadableInt(this.globalMemStoreLimit) + ", globalMemStoreLimitLowMark=" + StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark) + ", maxHeap=" + StringUtils.humanReadableInt(max));
    }

    static long globalMemStoreLimit(long j, float f, String str, Configuration configuration) {
        return getMemStoreLimit(j, configuration.getFloat(str, f), f);
    }

    static long getMemStoreLimit(long j, float f, float f2) {
        float f3 = f;
        if (f >= 0.9f || f < 0.1f) {
            LOG.warn("Setting global memstore limit to default of " + f2 + " because supplied value outside allowed range of 0.1 -> 0.9");
            f3 = f2;
        }
        return ((float) j) * f3;
    }

    public Counter getUpdatesBlockedMsHighWater() {
        return this.updatesBlockedMsHighWater;
    }

    private boolean flushOneForGlobalPressure() {
        HRegion hRegion;
        SortedMap<Long, HRegion> copyOfOnlineRegionsSortedBySize = this.server.getCopyOfOnlineRegionsSortedBySize();
        HashSet hashSet = new HashSet();
        boolean z = false;
        while (!z) {
            HRegion biggestMemstoreRegion = getBiggestMemstoreRegion(copyOfOnlineRegionsSortedBySize, hashSet, true);
            HRegion biggestMemstoreRegion2 = getBiggestMemstoreRegion(copyOfOnlineRegionsSortedBySize, hashSet, false);
            if (biggestMemstoreRegion2 == null) {
                LOG.error("Above memory mark but there are no flushable regions!");
                return false;
            }
            if (biggestMemstoreRegion == null || biggestMemstoreRegion2.memstoreSize.get() <= 2 * biggestMemstoreRegion.memstoreSize.get()) {
                hRegion = biggestMemstoreRegion == null ? biggestMemstoreRegion2 : biggestMemstoreRegion;
            } else {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Under global heap pressure: Region " + biggestMemstoreRegion2.getRegionNameAsString() + " has too many store files, but is " + StringUtils.humanReadableInt(biggestMemstoreRegion2.memstoreSize.get()) + " vs best flushable region's " + StringUtils.humanReadableInt(biggestMemstoreRegion.memstoreSize.get()) + ". Choosing the bigger.");
                }
                hRegion = biggestMemstoreRegion2;
            }
            Preconditions.checkState(hRegion.memstoreSize.get() > 0);
            LOG.info("Flush of region " + hRegion + " due to global heap pressure");
            z = flushRegion(hRegion, true);
            if (!z) {
                LOG.info("Excluding unflushable region " + hRegion + " - trying to find a different region to flush.");
                hashSet.add(hRegion);
            }
        }
        return true;
    }

    @Override // com.oceanbase.connector.flink.shaded.org.apache.hadoop.hbase.util.HasThread, java.lang.Runnable
    public void run() {
        while (!this.server.isStopped()) {
            Object obj = null;
            try {
                this.wakeupPending.set(false);
                obj = (FlushQueueEntry) this.flushQueue.poll(this.threadWakeFrequency, TimeUnit.MILLISECONDS);
                if (obj == null || (obj instanceof WakeupFlushThread)) {
                    if (isAboveLowWaterMark()) {
                        LOG.debug("Flush thread woke up because memory above low water=" + StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark));
                        if (!flushOneForGlobalPressure()) {
                            this.lock.lock();
                            try {
                                Thread.sleep(1000L);
                                this.flushOccurred.signalAll();
                                this.lock.unlock();
                            } catch (Throwable th) {
                                throw th;
                                break;
                            }
                        }
                        wakeupFlushThread();
                    }
                } else if (!flushRegion((FlushRegionEntry) obj)) {
                    break;
                }
            } catch (InterruptedException e) {
            } catch (ConcurrentModificationException e2) {
            } catch (Exception e3) {
                LOG.error("Cache flusher failed for entry " + obj, e3);
                if (!this.server.checkFileSystem()) {
                    break;
                }
            }
        }
        this.regionsInQueue.clear();
        this.flushQueue.clear();
        this.lock.lock();
        try {
            this.flushOccurred.signalAll();
            this.lock.unlock();
            LOG.info(getName() + " exiting");
        } finally {
            this.lock.unlock();
        }
    }

    private void wakeupFlushThread() {
        if (this.wakeupPending.compareAndSet(false, true)) {
            this.flushQueue.add(new WakeupFlushThread());
        }
    }

    private HRegion getBiggestMemstoreRegion(SortedMap<Long, HRegion> sortedMap, Set<HRegion> set, boolean z) {
        synchronized (this.regionsInQueue) {
            for (HRegion hRegion : sortedMap.values()) {
                if (!set.contains(hRegion) && (!z || !isTooManyStoreFiles(hRegion))) {
                    return hRegion;
                }
            }
            return null;
        }
    }

    private boolean isAboveHighWaterMark() {
        return this.server.getRegionServerAccounting().getGlobalMemstoreSize() >= this.globalMemStoreLimit;
    }

    private boolean isAboveLowWaterMark() {
        return this.server.getRegionServerAccounting().getGlobalMemstoreSize() >= this.globalMemStoreLimitLowMark;
    }

    @Override // com.oceanbase.connector.flink.shaded.org.apache.hadoop.hbase.regionserver.FlushRequester
    public void requestFlush(HRegion hRegion) {
        synchronized (this.regionsInQueue) {
            if (!this.regionsInQueue.containsKey(hRegion)) {
                FlushRegionEntry flushRegionEntry = new FlushRegionEntry(hRegion);
                this.regionsInQueue.put(hRegion, flushRegionEntry);
                this.flushQueue.add(flushRegionEntry);
            }
        }
    }

    @Override // com.oceanbase.connector.flink.shaded.org.apache.hadoop.hbase.regionserver.FlushRequester
    public void requestDelayedFlush(HRegion hRegion, long j) {
        synchronized (this.regionsInQueue) {
            if (!this.regionsInQueue.containsKey(hRegion)) {
                FlushRegionEntry flushRegionEntry = new FlushRegionEntry(hRegion);
                flushRegionEntry.requeue(j);
                this.regionsInQueue.put(hRegion, flushRegionEntry);
                this.flushQueue.add(flushRegionEntry);
            }
        }
    }

    public int getFlushQueueSize() {
        return this.flushQueue.size();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void interruptIfNecessary() {
        this.lock.lock();
        try {
            interrupt();
            this.lock.unlock();
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    private boolean flushRegion(FlushRegionEntry flushRegionEntry) {
        HRegion hRegion = flushRegionEntry.region;
        if (!flushRegionEntry.region.getRegionInfo().isMetaRegion() && isTooManyStoreFiles(hRegion)) {
            if (!flushRegionEntry.isMaximumWait(this.blockingWaitTime)) {
                if (flushRegionEntry.getRequeueCount() <= 0) {
                    LOG.warn("Region " + hRegion.getRegionNameAsString() + " has too many store files; delaying flush up to " + this.blockingWaitTime + "ms");
                    if (!this.server.compactSplitThread.requestSplit(hRegion)) {
                        try {
                            this.server.compactSplitThread.requestCompaction(hRegion, getName());
                        } catch (IOException e) {
                            LOG.error("Cache flush failed" + (hRegion != null ? " for region " + Bytes.toStringBinary(hRegion.getRegionName()) : ""), RemoteExceptionHandler.checkIOException(e));
                        }
                    }
                }
                this.flushQueue.add(flushRegionEntry.requeue(this.blockingWaitTime / 100));
                return true;
            }
            LOG.info("Waited " + (System.currentTimeMillis() - flushRegionEntry.createTime) + "ms on a compaction to clean up 'too many store files'; waited long enough... proceeding with flush of " + hRegion.getRegionNameAsString());
        }
        return flushRegion(hRegion, false);
    }

    private boolean flushRegion(HRegion hRegion, boolean z) {
        synchronized (this.regionsInQueue) {
            FlushRegionEntry remove = this.regionsInQueue.remove(hRegion);
            if (remove != null && z) {
                this.flushQueue.remove(remove);
            }
            this.lock.lock();
        }
        try {
            try {
                try {
                    boolean isCompactionNeeded = hRegion.flushcache().isCompactionNeeded();
                    if (hRegion.checkSplit() != null) {
                        this.server.compactSplitThread.requestSplit(hRegion);
                    } else if (isCompactionNeeded) {
                        this.server.compactSplitThread.requestCompaction(hRegion, getName());
                    }
                    this.server.getMetrics().addFlush(hRegion.getRecentFlushInfo());
                    this.flushOccurred.signalAll();
                    this.lock.unlock();
                    return true;
                } catch (DroppedSnapshotException e) {
                    this.server.abort("Replay of HLog required. Forcing server shutdown", e);
                    this.flushOccurred.signalAll();
                    this.lock.unlock();
                    return false;
                }
            } catch (IOException e2) {
                LOG.error("Cache flush failed" + (hRegion != null ? " for region " + Bytes.toStringBinary(hRegion.getRegionName()) : ""), RemoteExceptionHandler.checkIOException(e2));
                if (this.server.checkFileSystem()) {
                    this.flushOccurred.signalAll();
                    this.lock.unlock();
                    return true;
                }
                this.flushOccurred.signalAll();
                this.lock.unlock();
                return false;
            }
        } catch (Throwable th) {
            this.flushOccurred.signalAll();
            this.lock.unlock();
            throw th;
        }
    }

    private boolean isTooManyStoreFiles(HRegion hRegion) {
        Iterator<Store> it = hRegion.stores.values().iterator();
        while (it.hasNext()) {
            if (it.next().hasTooManyStoreFiles()) {
                return true;
            }
        }
        return false;
    }

    public void reclaimMemStoreMemory() {
        if (!isAboveHighWaterMark()) {
            if (isAboveLowWaterMark()) {
                wakeupFlushThread();
                return;
            }
            return;
        }
        this.lock.lock();
        boolean z = false;
        long j = 0;
        while (isAboveHighWaterMark() && !this.server.isStopped()) {
            try {
                if (!z) {
                    j = EnvironmentEdgeManager.currentTimeMillis();
                    LOG.info("Blocking updates on " + this.server.toString() + ": the global memstore size " + StringUtils.humanReadableInt(this.server.getRegionServerAccounting().getGlobalMemstoreSize()) + " is >= than blocking " + StringUtils.humanReadableInt(this.globalMemStoreLimit) + " size");
                }
                z = true;
                wakeupFlushThread();
                try {
                    this.flushOccurred.await(5L, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            } finally {
                this.lock.unlock();
            }
        }
        if (z) {
            long currentTimeMillis = EnvironmentEdgeManager.currentTimeMillis() - j;
            if (currentTimeMillis > 0) {
                this.updatesBlockedMsHighWater.add(currentTimeMillis);
            }
            LOG.info("Unblocking updates for server " + this.server.toString());
        }
    }

    public String toString() {
        return "flush_queue=" + this.flushQueue.size();
    }

    public String dumpQueue() {
        StringBuilder sb = new StringBuilder();
        sb.append("Flush Queue Queue dump:\n");
        sb.append("  Flush Queue:\n");
        Iterator it = this.flushQueue.iterator();
        while (it.hasNext()) {
            sb.append("    " + ((FlushQueueEntry) it.next()).toString());
            sb.append("\n");
        }
        return sb.toString();
    }
}
