package org.apache.bookkeeper.bookie;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;

/* loaded from: input_file:org/apache/bookkeeper/bookie/Bookie.class */
public class Bookie extends Thread {
    final File journalDirectory;
    final File[] ledgerDirectories;
    static final String BOOKIE_REGISTRATION_PATH = "/ledgers/available/";
    static final String LEDGERS_PATH = "/ledgers";
    ZooKeeper zk;
    EntryLogger entryLogger;
    LedgerCache ledgerCache;
    public static final long preAllocSize = 4194304;
    static Logger LOG = Logger.getLogger(Bookie.class);
    public static final ByteBuffer zeros = ByteBuffer.allocate(512);
    HashMap<Long, LedgerDescriptor> ledgers = new HashMap<>();
    private volatile boolean running = false;
    SyncThread syncThread = new SyncThread();
    LinkedBlockingQueue<QueueEntry> queue = new LinkedBlockingQueue<>();
    private LastLogMark lastLogMark = new LastLogMark(0, 0);

    /* loaded from: input_file:org/apache/bookkeeper/bookie/Bookie$CounterCallback.class */
    static class CounterCallback implements BookkeeperInternalCallbacks.WriteCallback {
        int count;

        CounterCallback() {
        }

        @Override // org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback
        public synchronized void writeComplete(int i, long j, long j2, InetSocketAddress inetSocketAddress, Object obj) {
            this.count--;
            if (this.count == 0) {
                notifyAll();
            }
        }

        public synchronized void incCount() {
            this.count++;
        }

        public synchronized void waitZero() throws InterruptedException {
            while (this.count > 0) {
                wait();
            }
        }
    }

    /* loaded from: input_file:org/apache/bookkeeper/bookie/Bookie$LastLogMark.class */
    class LastLogMark {
        long txnLogId;
        long txnLogPosition;
        LastLogMark lastMark;

        LastLogMark(long j, long j2) {
            this.txnLogId = j;
            this.txnLogPosition = j2;
        }

        synchronized void setLastLogMark(long j, long j2) {
            this.txnLogId = j;
            this.txnLogPosition = j2;
        }

        synchronized void markLog() {
            this.lastMark = new LastLogMark(this.txnLogId, this.txnLogPosition);
        }

        synchronized void rollLog() {
            byte[] bArr = new byte[16];
            ByteBuffer wrap = ByteBuffer.wrap(bArr);
            wrap.putLong(this.txnLogId);
            wrap.putLong(this.txnLogPosition);
            for (File file : Bookie.this.ledgerDirectories) {
                File file2 = new File(file, "lastMark");
                try {
                    FileOutputStream fileOutputStream = new FileOutputStream(file2);
                    fileOutputStream.write(bArr);
                    fileOutputStream.getChannel().force(true);
                    fileOutputStream.close();
                } catch (IOException e) {
                    Bookie.LOG.error("Problems writing to " + file2, e);
                }
            }
        }

        synchronized void readLog() {
            byte[] bArr = new byte[16];
            ByteBuffer wrap = ByteBuffer.wrap(bArr);
            for (File file : Bookie.this.ledgerDirectories) {
                File file2 = new File(file, "lastMark");
                try {
                    FileInputStream fileInputStream = new FileInputStream(file2);
                    fileInputStream.read(bArr);
                    fileInputStream.close();
                    wrap.clear();
                    long j = wrap.getLong();
                    long j2 = wrap.getLong();
                    if (j > this.txnLogId) {
                        this.txnLogId = j;
                    }
                    if (j2 > this.txnLogPosition) {
                        this.txnLogPosition = j2;
                    }
                } catch (IOException e) {
                    Bookie.LOG.error("Problems reading from " + file2 + " (this is okay if it is the first time starting this bookie");
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/bookkeeper/bookie/Bookie$NoEntryException.class */
    public static class NoEntryException extends IOException {
        private static final long serialVersionUID = 1;
        private long ledgerId;
        private long entryId;

        public NoEntryException(long j, long j2) {
            super("Entry " + j2 + " not found in " + j);
            this.ledgerId = j;
            this.entryId = j2;
        }

        public long getLedger() {
            return this.ledgerId;
        }

        public long getEntry() {
            return this.entryId;
        }
    }

    /* loaded from: input_file:org/apache/bookkeeper/bookie/Bookie$NoLedgerException.class */
    public static class NoLedgerException extends IOException {
        private static final long serialVersionUID = 1;
        private long ledgerId;

        public NoLedgerException(long j) {
            this.ledgerId = j;
        }

        public long getLedgerId() {
            return this.ledgerId;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/bookkeeper/bookie/Bookie$QueueEntry.class */
    public static class QueueEntry {
        ByteBuffer entry;
        long ledgerId;
        long entryId;
        BookkeeperInternalCallbacks.WriteCallback cb;
        Object ctx;

        QueueEntry(ByteBuffer byteBuffer, long j, long j2, BookkeeperInternalCallbacks.WriteCallback writeCallback, Object obj) {
            this.entry = byteBuffer.duplicate();
            this.cb = writeCallback;
            this.ctx = obj;
            this.ledgerId = j;
            this.entryId = j2;
        }
    }

    /* loaded from: input_file:org/apache/bookkeeper/bookie/Bookie$SyncThread.class */
    class SyncThread extends Thread {
        volatile boolean running;

        public SyncThread() {
            super("SyncThread");
            this.running = true;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (this.running) {
                synchronized (this) {
                    try {
                        wait(100L);
                        if (Bookie.this.entryLogger.testAndClearSomethingWritten()) {
                            Bookie.this.lastLogMark.markLog();
                            try {
                                Bookie.this.ledgerCache.flushLedger(true);
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                            try {
                                Bookie.this.entryLogger.flush();
                            } catch (IOException e2) {
                                e2.printStackTrace();
                            }
                            Bookie.this.lastLogMark.rollLog();
                        }
                    } catch (InterruptedException e3) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:43:0x013c, code lost:
    
        continue;
     */
    /* JADX WARN: Code restructure failed: missing block: B:45:0x013c, code lost:
    
        continue;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public Bookie(int r10, java.lang.String r11, java.io.File r12, java.io.File[] r13) throws java.io.IOException {
        /*
            Method dump skipped, instructions count: 548
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.bookkeeper.bookie.Bookie.<init>(int, java.lang.String, java.io.File, java.io.File[]):void");
    }

    private void instantiateZookeeperClient(int i, String str) throws IOException {
        if (str == null) {
            LOG.warn("No ZK servers passed to Bookie constructor so BookKeeper clients won't know about this server!");
            this.zk = null;
            return;
        }
        this.zk = new ZooKeeper(str, 10000, new Watcher() { // from class: org.apache.bookkeeper.bookie.Bookie.1
            public void process(WatchedEvent watchedEvent) {
                if (Bookie.LOG.isDebugEnabled()) {
                    Bookie.LOG.debug("Process: " + watchedEvent.getType() + " " + watchedEvent.getPath());
                }
            }
        });
        try {
            this.zk.create(BOOKIE_REGISTRATION_PATH + InetAddress.getLocalHost().getHostAddress() + ":" + i, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        } catch (Exception e) {
            LOG.fatal("ZK exception registering ephemeral Znode for Bookie!", e);
            throw new IOException(e);
        }
    }

    private static int fullRead(FileChannel fileChannel, ByteBuffer byteBuffer) throws IOException {
        int i;
        int read;
        while (true) {
            int i2 = i;
            i = (byteBuffer.remaining() > 0 && (read = fileChannel.read(byteBuffer)) > 0) ? i2 + read : 0;
            return i2;
        }
    }

    private void putHandle(LedgerDescriptor ledgerDescriptor) {
        synchronized (this.ledgers) {
            ledgerDescriptor.decRef();
        }
    }

    private LedgerDescriptor getHandle(long j, boolean z, byte[] bArr) throws IOException {
        LedgerDescriptor ledgerDescriptor;
        synchronized (this.ledgers) {
            ledgerDescriptor = this.ledgers.get(Long.valueOf(j));
            if (ledgerDescriptor == null) {
                if (z) {
                    throw new NoLedgerException(j);
                }
                ledgerDescriptor = createHandle(j, z);
                this.ledgers.put(Long.valueOf(j), ledgerDescriptor);
                ledgerDescriptor.setMasterKey(ByteBuffer.wrap(bArr));
            }
            ledgerDescriptor.incRef();
        }
        return ledgerDescriptor;
    }

    private LedgerDescriptor getHandle(long j, boolean z) throws IOException {
        LedgerDescriptor ledgerDescriptor;
        synchronized (this.ledgers) {
            ledgerDescriptor = this.ledgers.get(Long.valueOf(j));
            if (ledgerDescriptor == null) {
                if (z) {
                    throw new NoLedgerException(j);
                }
                ledgerDescriptor = createHandle(j, z);
                this.ledgers.put(Long.valueOf(j), ledgerDescriptor);
            }
            ledgerDescriptor.incRef();
        }
        return ledgerDescriptor;
    }

    private LedgerDescriptor createHandle(long j, boolean z) throws IOException {
        return new LedgerDescriptor(j, this.entryLogger, this.ledgerCache);
    }

    public boolean isRunning() {
        return this.running;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        QueueEntry poll;
        LinkedList linkedList = new LinkedList();
        ByteBuffer allocate = ByteBuffer.allocate(4);
        try {
            long currentTimeMillis = System.currentTimeMillis();
            FileChannel openChannel = openChannel(currentTimeMillis);
            BufferedChannel bufferedChannel = new BufferedChannel(openChannel, 65536);
            zeros.clear();
            long j = 4194304;
            long j2 = 0;
            openChannel.write(zeros, preAllocSize);
            this.running = true;
            while (true) {
                if (linkedList.isEmpty()) {
                    poll = this.queue.take();
                } else {
                    poll = this.queue.poll();
                    if (poll == null || bufferedChannel.position() > j2 + 524288) {
                        bufferedChannel.flush(true);
                        j2 = bufferedChannel.position();
                        this.lastLogMark.setLastLogMark(currentTimeMillis, j2);
                        Iterator it = linkedList.iterator();
                        while (it.hasNext()) {
                            QueueEntry queueEntry = (QueueEntry) it.next();
                            queueEntry.cb.writeComplete(0, queueEntry.ledgerId, queueEntry.entryId, null, queueEntry.ctx);
                        }
                        linkedList.clear();
                    }
                }
                if (poll != null) {
                    allocate.clear();
                    allocate.putInt(poll.entry.remaining());
                    allocate.flip();
                    bufferedChannel.write(allocate);
                    bufferedChannel.write(poll.entry);
                    if (bufferedChannel.position() > j) {
                        j = ((openChannel.size() / preAllocSize) + 1) * preAllocSize;
                        zeros.clear();
                        openChannel.write(zeros, j);
                    }
                    linkedList.add(poll);
                }
            }
        } catch (Exception e) {
            LOG.fatal("Bookie thread exiting", e);
            this.running = false;
        }
    }

    private FileChannel openChannel(long j) throws FileNotFoundException {
        return new RandomAccessFile(new File(this.journalDirectory, Long.toHexString(j) + ".txn"), "rw").getChannel();
    }

    public void shutdown() throws InterruptedException {
        if (this.zk != null) {
            this.zk.close();
        }
        interrupt();
        join();
        this.syncThread.running = false;
        this.syncThread.join();
        Iterator<LedgerDescriptor> it = this.ledgers.values().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
    }

    public void addEntry(ByteBuffer byteBuffer, BookkeeperInternalCallbacks.WriteCallback writeCallback, Object obj, byte[] bArr) throws IOException, BookieException {
        long j = byteBuffer.getLong();
        LedgerDescriptor handle = getHandle(j, false, bArr);
        if (!handle.cmpMasterKey(ByteBuffer.wrap(bArr))) {
            throw BookieException.create(-1);
        }
        try {
            byteBuffer.rewind();
            long addEntry = handle.addEntry(byteBuffer);
            byteBuffer.rewind();
            if (LOG.isTraceEnabled()) {
                LOG.trace("Adding " + addEntry + "@" + j);
            }
            this.queue.add(new QueueEntry(byteBuffer, j, addEntry, writeCallback, obj));
            putHandle(handle);
        } catch (Throwable th) {
            putHandle(handle);
            throw th;
        }
    }

    public ByteBuffer readEntry(long j, long j2) throws IOException {
        LedgerDescriptor handle = getHandle(j, true);
        try {
            if (LOG.isTraceEnabled()) {
                LOG.trace("Reading " + j2 + "@" + j);
            }
            ByteBuffer readEntry = handle.readEntry(j2);
            putHandle(handle);
            return readEntry;
        } catch (Throwable th) {
            putHandle(handle);
            throw th;
        }
    }

    public static void main(String[] strArr) throws IOException, InterruptedException, BookieException {
        Bookie bookie = new Bookie(5000, null, new File("/tmp"), new File[]{new File("/tmp")});
        CounterCallback counterCallback = new CounterCallback();
        long currentTimeMillis = System.currentTimeMillis();
        for (int i = 0; i < 100000; i++) {
            ByteBuffer allocate = ByteBuffer.allocate(LedgerEntryPage.ENTRIES_PER_PAGES);
            allocate.putLong(1L);
            allocate.putLong(i);
            allocate.limit(LedgerEntryPage.ENTRIES_PER_PAGES);
            allocate.position(0);
            counterCallback.incCount();
            bookie.addEntry(allocate, counterCallback, null, new byte[0]);
        }
        counterCallback.waitZero();
        System.out.println("Took " + (System.currentTimeMillis() - currentTimeMillis) + "ms");
    }
}
