package org.apache.bookkeeper.bookie;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.bookie.CheckpointSource;
import org.apache.bookkeeper.bookie.LedgerDirsManager;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.shaded.com.google.common.base.Stopwatch;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.DaemonThreadFactory;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.bookkeeper.util.MathUtils;
import org.apache.bookkeeper.util.ZeroBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/bookkeeper/bookie/Journal.class */
public class Journal extends BookieCriticalThread implements CheckpointSource {
    private static final Logger LOG = LoggerFactory.getLogger(Journal.class);
    static final int PADDING_MASK = -256;
    static final long MB = 1048576;
    static final int KB = 1024;
    final long maxJournalSize;
    final long journalPreAllocSize;
    final int journalWriteBufferSize;
    final int maxBackupJournals;
    final File journalDirectory;
    final ServerConfiguration conf;
    final ForceWriteThread forceWriteThread;
    private final long maxGroupWaitInNanos;
    private final long bufferedEntriesThreshold;
    private final long bufferedWritesThreshold;
    private final boolean flushWhenQueueEmpty;
    private final boolean removePagesFromCache;
    private final boolean syncData;
    private final LastLogMark lastLogMark;
    private final ExecutorService cbThreadPool;
    final LinkedBlockingQueue<QueueEntry> queue;
    final LinkedBlockingQueue<ForceWriteRequest> forceWriteRequests;
    volatile boolean running;
    private final LedgerDirsManager ledgerDirsManager;
    private final OpStatsLogger journalAddEntryStats;
    private final OpStatsLogger journalSyncStats;
    private final OpStatsLogger journalCreationStats;
    private final OpStatsLogger journalFlushStats;
    private final OpStatsLogger journalProcessTimeStats;
    private final OpStatsLogger journalQueueStats;
    private final OpStatsLogger forceWriteGroupingCountStats;
    private final OpStatsLogger forceWriteBatchEntriesStats;
    private final OpStatsLogger forceWriteBatchBytesStats;
    private final Counter journalQueueSize;
    private final Counter forceWriteQueueSize;
    private final Counter flushMaxWaitCounter;
    private final Counter flushMaxOutstandingBytesCounter;
    private final Counter flushEmptyQueueCounter;
    private final Counter journalWriteBytes;

    /* loaded from: input_file:org/apache/bookkeeper/bookie/Journal$ForceWriteRequest.class */
    private class ForceWriteRequest {
        private final JournalChannel logFile;
        private final LinkedList<QueueEntry> forceWriteWaiters;
        private boolean shouldClose;
        private final boolean isMarker;
        private final long lastFlushedPosition;
        private final long logId;

        private ForceWriteRequest(JournalChannel journalChannel, long j, long j2, LinkedList<QueueEntry> linkedList, boolean z, boolean z2) {
            this.forceWriteWaiters = linkedList;
            this.logFile = journalChannel;
            this.logId = j;
            this.lastFlushedPosition = j2;
            this.shouldClose = z;
            this.isMarker = z2;
            Journal.this.forceWriteQueueSize.inc();
        }

        public int process(boolean z) throws IOException {
            Journal.this.forceWriteQueueSize.dec();
            if (this.isMarker) {
                return 0;
            }
            if (z) {
                try {
                    long nowInNano = MathUtils.nowInNano();
                    this.logFile.forceWrite(false);
                    Journal.this.journalSyncStats.registerSuccessfulEvent(MathUtils.elapsedNanos(nowInNano), TimeUnit.NANOSECONDS);
                } catch (Throwable th) {
                    closeFileIfNecessary();
                    throw th;
                }
            }
            Journal.this.lastLogMark.setCurLogMark(this.logId, this.lastFlushedPosition);
            Iterator<QueueEntry> it = this.forceWriteWaiters.iterator();
            while (it.hasNext()) {
                Journal.this.cbThreadPool.execute(it.next());
            }
            int size = this.forceWriteWaiters.size();
            closeFileIfNecessary();
            return size;
        }

        public void closeFileIfNecessary() {
            if (this.shouldClose) {
                try {
                    this.logFile.close();
                    this.shouldClose = false;
                } catch (IOException e) {
                    Journal.LOG.error("I/O exception while closing file", e);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/bookkeeper/bookie/Journal$ForceWriteThread.class */
    public class ForceWriteThread extends BookieCriticalThread {
        volatile boolean running;
        Thread threadToNotifyOnEx;
        private final boolean enableGroupForceWrites;

        public ForceWriteThread(Thread thread, boolean z) {
            super("ForceWriteThread");
            this.running = true;
            this.threadToNotifyOnEx = thread;
            this.enableGroupForceWrites = z;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            Journal.LOG.info("ForceWrite Thread started");
            boolean z = true;
            int i = 0;
            while (this.running) {
                ForceWriteRequest forceWriteRequest = null;
                try {
                    forceWriteRequest = Journal.this.forceWriteRequests.take();
                    if (!forceWriteRequest.isMarker && z) {
                        if (this.enableGroupForceWrites) {
                            Journal.this.forceWriteRequests.put(new ForceWriteRequest(forceWriteRequest.logFile, 0L, 0L, null, false, true));
                        }
                        if (i > 0) {
                            Journal.this.forceWriteGroupingCountStats.registerSuccessfulValue(i);
                            i = 0;
                        }
                    }
                    i += forceWriteRequest.process(z);
                    z = !this.enableGroupForceWrites || forceWriteRequest.isMarker || forceWriteRequest.shouldClose;
                } catch (IOException e) {
                    Journal.LOG.error("I/O exception in ForceWrite thread", e);
                    this.running = false;
                } catch (InterruptedException e2) {
                    Journal.LOG.error("ForceWrite thread interrupted", e2);
                    if (null != forceWriteRequest) {
                        forceWriteRequest.shouldClose = true;
                        forceWriteRequest.closeFileIfNecessary();
                    }
                    this.running = false;
                }
            }
            this.threadToNotifyOnEx.interrupt();
        }

        void shutdown() throws InterruptedException {
            this.running = false;
            interrupt();
            join();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/bookkeeper/bookie/Journal$JournalIdFilter.class */
    public interface JournalIdFilter {
        boolean accept(long j);
    }

    /* loaded from: input_file:org/apache/bookkeeper/bookie/Journal$JournalRollingFilter.class */
    private static class JournalRollingFilter implements JournalIdFilter {
        final LastLogMark lastMark;

        JournalRollingFilter(LastLogMark lastLogMark) {
            this.lastMark = lastLogMark;
        }

        @Override // org.apache.bookkeeper.bookie.Journal.JournalIdFilter
        public boolean accept(long j) {
            return j < this.lastMark.getCurMark().getLogFileId();
        }
    }

    /* loaded from: input_file:org/apache/bookkeeper/bookie/Journal$JournalScanner.class */
    public interface JournalScanner {
        void process(int i, long j, ByteBuffer byteBuffer) throws IOException;
    }

    /* loaded from: input_file:org/apache/bookkeeper/bookie/Journal$LastLogMark.class */
    public class LastLogMark {
        private final LogMark curMark;

        LastLogMark(long j, long j2) {
            this.curMark = new LogMark(j, j2);
        }

        void setCurLogMark(long j, long j2) {
            this.curMark.setLogMark(j, j2);
        }

        LastLogMark markLog() {
            return new LastLogMark(this.curMark.getLogFileId(), this.curMark.getLogFileOffset());
        }

        public LogMark getCurMark() {
            return this.curMark;
        }

        /* JADX WARN: Finally extract failed */
        void rollLog(LastLogMark lastLogMark) throws LedgerDirsManager.NoWritableLedgerDirException {
            byte[] bArr = new byte[16];
            lastLogMark.getCurMark().writeLogMark(ByteBuffer.wrap(bArr));
            if (Journal.LOG.isDebugEnabled()) {
                Journal.LOG.debug("RollLog to persist last marked log : {}", lastLogMark.getCurMark());
            }
            Iterator<File> it = Journal.this.ledgerDirsManager.getWritableLedgerDirs().iterator();
            while (it.hasNext()) {
                File file = new File(it.next(), "lastMark");
                Closeable closeable = null;
                try {
                    try {
                        FileOutputStream fileOutputStream = new FileOutputStream(file);
                        fileOutputStream.write(bArr);
                        fileOutputStream.getChannel().force(true);
                        fileOutputStream.close();
                        closeable = null;
                        IOUtils.close(Journal.LOG, null);
                    } catch (IOException e) {
                        Journal.LOG.error("Problems writing to " + file, e);
                        IOUtils.close(Journal.LOG, closeable);
                    }
                } catch (Throwable th) {
                    IOUtils.close(Journal.LOG, closeable);
                    throw th;
                }
            }
        }

        void readLog() {
            FileInputStream fileInputStream;
            int read;
            byte[] bArr = new byte[16];
            ByteBuffer wrap = ByteBuffer.wrap(bArr);
            LogMark logMark = new LogMark();
            Iterator<File> it = Journal.this.ledgerDirsManager.getAllLedgerDirs().iterator();
            while (it.hasNext()) {
                File file = new File(it.next(), "lastMark");
                try {
                    fileInputStream = new FileInputStream(file);
                    try {
                        read = fileInputStream.read(bArr);
                    } catch (Throwable th) {
                        fileInputStream.close();
                        throw th;
                        break;
                    }
                } catch (IOException e) {
                    Journal.LOG.error("Problems reading from " + file + " (this is okay if it is the first time starting this bookie");
                }
                if (read != 16) {
                    throw new IOException("Couldn't read enough bytes from lastMark. Wanted 16, got " + read);
                    break;
                }
                fileInputStream.close();
                wrap.clear();
                logMark.readLogMark(wrap);
                if (this.curMark.compare(logMark) < 0) {
                    this.curMark.setLogMark(logMark.getLogFileId(), logMark.getLogFileOffset());
                }
            }
        }

        public String toString() {
            return this.curMark.toString();
        }
    }

    /* loaded from: input_file:org/apache/bookkeeper/bookie/Journal$LogMarkCheckpoint.class */
    private static class LogMarkCheckpoint implements CheckpointSource.Checkpoint {
        final LastLogMark mark;

        public LogMarkCheckpoint(LastLogMark lastLogMark) {
            this.mark = lastLogMark;
        }

        @Override // java.lang.Comparable
        public int compareTo(CheckpointSource.Checkpoint checkpoint) {
            if (checkpoint == CheckpointSource.Checkpoint.MAX) {
                return -1;
            }
            if (checkpoint == CheckpointSource.Checkpoint.MIN) {
                return 1;
            }
            return this.mark.getCurMark().compare(((LogMarkCheckpoint) checkpoint).mark.getCurMark());
        }

        public boolean equals(Object obj) {
            return (obj instanceof LogMarkCheckpoint) && 0 == compareTo((CheckpointSource.Checkpoint) obj);
        }

        public int hashCode() {
            return this.mark.hashCode();
        }

        public String toString() {
            return this.mark.toString();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/bookkeeper/bookie/Journal$QueueEntry.class */
    public class QueueEntry implements Runnable {
        ByteBuf entry;
        long ledgerId;
        long entryId;
        BookkeeperInternalCallbacks.WriteCallback cb;
        Object ctx;
        long enqueueTime;

        QueueEntry(ByteBuf byteBuf, long j, long j2, BookkeeperInternalCallbacks.WriteCallback writeCallback, Object obj, long j3) {
            this.entry = byteBuf.duplicate();
            this.cb = writeCallback;
            this.ctx = obj;
            this.ledgerId = j;
            this.entryId = j2;
            this.enqueueTime = j3;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (Journal.LOG.isDebugEnabled()) {
                Journal.LOG.debug("Acknowledge Ledger: {}, Entry: {}", Long.valueOf(this.ledgerId), Long.valueOf(this.entryId));
            }
            Journal.this.journalAddEntryStats.registerSuccessfulEvent(MathUtils.elapsedNanos(this.enqueueTime), TimeUnit.NANOSECONDS);
            this.cb.writeComplete(0, this.ledgerId, this.entryId, null, this.ctx);
        }
    }

    private static List<Long> listJournalIds(File file, JournalIdFilter journalIdFilter) {
        File[] listFiles = file.listFiles();
        if (listFiles == null || listFiles.length == 0) {
            return Collections.emptyList();
        }
        ArrayList arrayList = new ArrayList();
        for (File file2 : listFiles) {
            String name = file2.getName();
            if (name.endsWith(".txn")) {
                long parseLong = Long.parseLong(name.split("\\.")[0], 16);
                if (journalIdFilter == null) {
                    arrayList.add(Long.valueOf(parseLong));
                } else if (journalIdFilter.accept(parseLong)) {
                    arrayList.add(Long.valueOf(parseLong));
                }
            }
        }
        Collections.sort(arrayList);
        return arrayList;
    }

    static void writePaddingBytes(JournalChannel journalChannel, ByteBuffer byteBuffer, int i) throws IOException {
        int position = (int) (journalChannel.bc.position() % i);
        if (0 != position) {
            int i2 = i - position;
            int i3 = i2 < 8 ? i - (8 - i2) : i2 - 8;
            byteBuffer.clear();
            byteBuffer.putInt(PADDING_MASK);
            byteBuffer.putInt(i3);
            byteBuffer.position(8 + i3);
            byteBuffer.flip();
            journalChannel.preAllocIfNeeded(byteBuffer.limit());
            journalChannel.bc.write(byteBuffer);
        }
    }

    public Journal(File file, ServerConfiguration serverConfiguration, LedgerDirsManager ledgerDirsManager) {
        this(file, serverConfiguration, ledgerDirsManager, NullStatsLogger.INSTANCE);
    }

    public Journal(File file, ServerConfiguration serverConfiguration, LedgerDirsManager ledgerDirsManager, StatsLogger statsLogger) {
        super("BookieJournal-" + serverConfiguration.getBookiePort());
        this.lastLogMark = new LastLogMark(0L, 0L);
        this.queue = new LinkedBlockingQueue<>();
        this.forceWriteRequests = new LinkedBlockingQueue<>();
        this.running = true;
        this.ledgerDirsManager = ledgerDirsManager;
        this.conf = serverConfiguration;
        this.journalDirectory = file;
        this.maxJournalSize = serverConfiguration.getMaxJournalSizeMB() * MB;
        this.journalPreAllocSize = serverConfiguration.getJournalPreAllocSizeMB() * MB;
        this.journalWriteBufferSize = serverConfiguration.getJournalWriteBufferSizeKB() * KB;
        this.syncData = serverConfiguration.getJournalSyncData();
        this.maxBackupJournals = serverConfiguration.getMaxBackupJournals();
        this.forceWriteThread = new ForceWriteThread(this, serverConfiguration.getJournalAdaptiveGroupWrites());
        this.maxGroupWaitInNanos = TimeUnit.MILLISECONDS.toNanos(serverConfiguration.getJournalMaxGroupWaitMSec());
        this.bufferedWritesThreshold = serverConfiguration.getJournalBufferedWritesThreshold();
        this.bufferedEntriesThreshold = serverConfiguration.getJournalBufferedEntriesThreshold();
        this.cbThreadPool = Executors.newFixedThreadPool(serverConfiguration.getNumJournalCallbackThreads(), new DaemonThreadFactory());
        this.flushWhenQueueEmpty = this.maxGroupWaitInNanos <= 0 || serverConfiguration.getJournalFlushWhenQueueEmpty();
        this.removePagesFromCache = serverConfiguration.getJournalRemovePagesFromCache();
        this.lastLogMark.readLog();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Last Log Mark : {}", this.lastLogMark.getCurMark());
        }
        this.journalAddEntryStats = statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_ADD_ENTRY);
        this.journalSyncStats = statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_SYNC);
        this.journalCreationStats = statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_CREATION_LATENCY);
        this.journalFlushStats = statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_FLUSH_LATENCY);
        this.journalQueueStats = statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_QUEUE_LATENCY);
        this.journalProcessTimeStats = statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_PROCESS_TIME_LATENCY);
        this.forceWriteGroupingCountStats = statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_FORCE_WRITE_GROUPING_COUNT);
        this.forceWriteBatchEntriesStats = statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_FORCE_WRITE_BATCH_ENTRIES);
        this.forceWriteBatchBytesStats = statsLogger.getOpStatsLogger(BookKeeperServerStats.JOURNAL_FORCE_WRITE_BATCH_BYTES);
        this.journalQueueSize = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_QUEUE_SIZE);
        this.forceWriteQueueSize = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_FORCE_WRITE_QUEUE_SIZE);
        this.flushMaxWaitCounter = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_NUM_FLUSH_MAX_WAIT);
        this.flushMaxOutstandingBytesCounter = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_NUM_FLUSH_MAX_OUTSTANDING_BYTES);
        this.flushEmptyQueueCounter = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_NUM_FLUSH_EMPTY_QUEUE);
        this.journalWriteBytes = statsLogger.getCounter(BookKeeperServerStats.JOURNAL_WRITE_BYTES);
    }

    public File getJournalDirectory() {
        return this.journalDirectory;
    }

    public LastLogMark getLastLogMark() {
        return this.lastLogMark;
    }

    @Override // org.apache.bookkeeper.bookie.CheckpointSource
    public CheckpointSource.Checkpoint newCheckpoint() {
        return new LogMarkCheckpoint(this.lastLogMark.markLog());
    }

    @Override // org.apache.bookkeeper.bookie.CheckpointSource
    public void checkpointComplete(CheckpointSource.Checkpoint checkpoint, boolean z) throws IOException {
        if (checkpoint instanceof LogMarkCheckpoint) {
            LastLogMark lastLogMark = ((LogMarkCheckpoint) checkpoint).mark;
            lastLogMark.rollLog(lastLogMark);
            if (z) {
                List<Long> listJournalIds = listJournalIds(this.journalDirectory, new JournalRollingFilter(lastLogMark));
                if (listJournalIds.size() >= this.maxBackupJournals) {
                    int size = listJournalIds.size() - this.maxBackupJournals;
                    for (int i = 0; i < size; i++) {
                        long longValue = listJournalIds.get(i).longValue();
                        if (longValue < lastLogMark.getCurMark().getLogFileId()) {
                            File file = new File(this.journalDirectory, Long.toHexString(longValue) + ".txn");
                            if (!file.delete()) {
                                LOG.warn("Could not delete old journal file {}", file);
                            }
                            LOG.info("garbage collected journal " + file.getName());
                        }
                    }
                }
            }
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:45:0x00e4, code lost:
    
        throw new java.io.IOException("Invalid record found with negative length : " + r23);
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void scanJournal(long r12, long r14, org.apache.bookkeeper.bookie.Journal.JournalScanner r16) throws java.io.IOException {
        /*
            Method dump skipped, instructions count: 321
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.bookkeeper.bookie.Journal.scanJournal(long, long, org.apache.bookkeeper.bookie.Journal$JournalScanner):void");
    }

    public void replay(JournalScanner journalScanner) throws IOException {
        final LogMark curMark = this.lastLogMark.getCurMark();
        List<Long> listJournalIds = listJournalIds(this.journalDirectory, new JournalIdFilter() { // from class: org.apache.bookkeeper.bookie.Journal.1
            @Override // org.apache.bookkeeper.bookie.Journal.JournalIdFilter
            public boolean accept(long j) {
                return j >= curMark.getLogFileId();
            }
        });
        if (curMark.getLogFileId() > 0 && (listJournalIds.size() == 0 || listJournalIds.get(0).longValue() != curMark.getLogFileId())) {
            throw new IOException("Recovery log " + curMark.getLogFileId() + " is missing");
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Try to relay journal logs : {}", listJournalIds);
        }
        for (Long l : listJournalIds) {
            long j = 0;
            if (l.longValue() == curMark.getLogFileId()) {
                j = curMark.getLogFileOffset();
            }
            LOG.info("Replaying journal {} from position {}", l, Long.valueOf(j));
            scanJournal(l.longValue(), j, journalScanner);
        }
    }

    public void logAddEntry(ByteBuffer byteBuffer, BookkeeperInternalCallbacks.WriteCallback writeCallback, Object obj) {
        logAddEntry(Unpooled.wrappedBuffer(byteBuffer), writeCallback, obj);
    }

    public void logAddEntry(ByteBuf byteBuf, BookkeeperInternalCallbacks.WriteCallback writeCallback, Object obj) {
        long j = byteBuf.getLong(byteBuf.readerIndex() + 0);
        long j2 = byteBuf.getLong(byteBuf.readerIndex() + 8);
        this.journalQueueSize.inc();
        byteBuf.retain();
        this.queue.add(new QueueEntry(byteBuf, j, j2, writeCallback, obj, MathUtils.nowInNano()));
    }

    public int getJournalQueueLength() {
        return this.queue.size();
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        LOG.info("Starting journal on {}", this.journalDirectory);
        LinkedList linkedList = new LinkedList();
        ByteBuffer allocate = ByteBuffer.allocate(4);
        ByteBuffer allocate2 = ByteBuffer.allocate(2 * this.conf.getJournalAlignmentSize());
        ZeroBuffer.put(allocate2);
        int journalFormatVersionToWrite = this.conf.getJournalFormatVersionToWrite();
        int journalAlignmentSize = this.conf.getJournalAlignmentSize();
        JournalChannel journalChannel = null;
        this.forceWriteThread.start();
        Stopwatch createUnstarted = Stopwatch.createUnstarted();
        Stopwatch createUnstarted2 = Stopwatch.createUnstarted();
        long j = 0;
        try {
            try {
                List<Long> listJournalIds = listJournalIds(this.journalDirectory, null);
                long currentTimeMillis = listJournalIds.isEmpty() ? System.currentTimeMillis() : listJournalIds.get(listJournalIds.size() - 1).longValue();
                BufferedChannel bufferedChannel = null;
                long j2 = 0;
                boolean z = false;
                long j3 = 0;
                QueueEntry queueEntry = null;
                while (true) {
                    if (null == journalChannel) {
                        currentTimeMillis++;
                        createUnstarted.reset().start();
                        journalChannel = new JournalChannel(this.journalDirectory, currentTimeMillis, this.journalPreAllocSize, this.journalWriteBufferSize, journalAlignmentSize, this.removePagesFromCache, journalFormatVersionToWrite);
                        this.journalCreationStats.registerSuccessfulEvent(createUnstarted.stop().elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
                        bufferedChannel = journalChannel.getBufferedChannel();
                        j2 = bufferedChannel.position();
                    }
                    if (queueEntry == null) {
                        if (j3 != 0) {
                            this.journalProcessTimeStats.registerSuccessfulEvent(MathUtils.elapsedNanos(j3), TimeUnit.NANOSECONDS);
                        }
                        if (linkedList.isEmpty()) {
                            queueEntry = this.queue.take();
                            j3 = MathUtils.nowInNano();
                            this.journalQueueStats.registerSuccessfulEvent(MathUtils.elapsedNanos(queueEntry.enqueueTime), TimeUnit.NANOSECONDS);
                        } else {
                            long elapsedNanos = this.maxGroupWaitInNanos - MathUtils.elapsedNanos(((QueueEntry) linkedList.get(0)).enqueueTime);
                            if (this.flushWhenQueueEmpty || elapsedNanos < 0) {
                                elapsedNanos = 0;
                            }
                            queueEntry = this.queue.poll(elapsedNanos, TimeUnit.NANOSECONDS);
                            j3 = MathUtils.nowInNano();
                            if (queueEntry != null) {
                                this.journalQueueStats.registerSuccessfulEvent(MathUtils.elapsedNanos(queueEntry.enqueueTime), TimeUnit.NANOSECONDS);
                            }
                            boolean z2 = false;
                            if (this.maxGroupWaitInNanos > 0 && !z && MathUtils.elapsedNanos(((QueueEntry) linkedList.get(0)).enqueueTime) > this.maxGroupWaitInNanos) {
                                z = true;
                            } else if (this.maxGroupWaitInNanos > 0 && z && queueEntry != null && MathUtils.elapsedNanos(queueEntry.enqueueTime) < this.maxGroupWaitInNanos) {
                                z = false;
                                z2 = true;
                                this.flushMaxWaitCounter.inc();
                            } else if (queueEntry != null && ((this.bufferedEntriesThreshold > 0 && linkedList.size() > this.bufferedEntriesThreshold) || bufferedChannel.position() > j2 + this.bufferedWritesThreshold)) {
                                z2 = true;
                                this.flushMaxOutstandingBytesCounter.inc();
                            } else if (queueEntry == null) {
                                z2 = true;
                                this.flushEmptyQueueCounter.inc();
                            }
                            if (z2) {
                                if (journalFormatVersionToWrite >= 5) {
                                    writePaddingBytes(journalChannel, allocate2, journalAlignmentSize);
                                }
                                createUnstarted2.reset().start();
                                bufferedChannel.flush(false);
                                j2 = bufferedChannel.position();
                                this.journalFlushStats.registerSuccessfulEvent(createUnstarted2.stop().elapsed(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
                                if (LOG.isDebugEnabled()) {
                                    Iterator it = linkedList.iterator();
                                    while (it.hasNext()) {
                                        QueueEntry queueEntry2 = (QueueEntry) it.next();
                                        LOG.debug("Written and queuing for flush Ledger:" + queueEntry2.ledgerId + " Entry:" + queueEntry2.entryId);
                                    }
                                }
                                this.forceWriteBatchEntriesStats.registerSuccessfulValue(linkedList.size());
                                this.forceWriteBatchBytesStats.registerSuccessfulValue(j);
                                boolean z3 = j2 > this.maxJournalSize;
                                if (this.syncData) {
                                    this.forceWriteRequests.put(new ForceWriteRequest(journalChannel, currentTimeMillis, j2, linkedList, z3, false));
                                    linkedList = new LinkedList();
                                } else {
                                    this.lastLogMark.setCurLogMark(currentTimeMillis, j2);
                                    for (int i = 0; i < linkedList.size(); i++) {
                                        this.cbThreadPool.execute((QueueEntry) linkedList.get(i));
                                    }
                                    linkedList.clear();
                                    if (z3) {
                                        this.forceWriteRequests.put(new ForceWriteRequest(journalChannel, currentTimeMillis, j2, new LinkedList(), z3, false));
                                    }
                                }
                                j = 0;
                                if (bufferedChannel.position() > this.maxJournalSize) {
                                    journalChannel = null;
                                }
                            }
                        }
                    }
                    if (!this.running) {
                        break;
                    }
                    if (queueEntry != null) {
                        this.journalWriteBytes.add(queueEntry.entry.readableBytes());
                        this.journalQueueSize.dec();
                        j += 4 + queueEntry.entry.readableBytes();
                        allocate.clear();
                        allocate.putInt(queueEntry.entry.readableBytes());
                        allocate.flip();
                        journalChannel.preAllocIfNeeded(4 + queueEntry.entry.readableBytes());
                        bufferedChannel.write(allocate);
                        bufferedChannel.write(queueEntry.entry.nioBuffer());
                        queueEntry.entry.release();
                        linkedList.add(queueEntry);
                        queueEntry = null;
                    }
                }
                LOG.info("Journal Manager is asked to shut down, quit.");
                journalChannel.close();
                journalChannel = null;
                IOUtils.close(LOG, null);
            } catch (IOException e) {
                LOG.error("I/O exception in Journal thread!", e);
                IOUtils.close(LOG, journalChannel);
            } catch (InterruptedException e2) {
                LOG.warn("Journal exits when shutting down", e2);
                IOUtils.close(LOG, journalChannel);
            }
            LOG.info("Journal exited loop!");
        } catch (Throwable th) {
            IOUtils.close(LOG, journalChannel);
            throw th;
        }
    }

    public synchronized void shutdown() {
        try {
            if (this.running) {
                LOG.info("Shutting down Journal");
                this.forceWriteThread.shutdown();
                this.cbThreadPool.shutdown();
                if (!this.cbThreadPool.awaitTermination(5L, TimeUnit.SECONDS)) {
                    LOG.warn("Couldn't shutdown journal callback thread gracefully. Forcing");
                }
                this.cbThreadPool.shutdownNow();
                this.running = false;
                interrupt();
                join();
                LOG.info("Finished Shutting down Journal thread");
            }
        } catch (InterruptedException e) {
            LOG.warn("Interrupted during shutting down journal : ", e);
        }
    }

    private static int fullRead(JournalChannel journalChannel, ByteBuffer byteBuffer) throws IOException {
        int i;
        int read;
        while (true) {
            int i2 = i;
            i = (byteBuffer.remaining() > 0 && (read = journalChannel.read(byteBuffer)) > 0) ? i2 + read : 0;
            return i2;
        }
    }
}
