package org.apache.bk_v4_0_0.bookkeeper.proto;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.bk_v4_0_0.bookkeeper.conf.ServerConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/bk_v4_0_0/bookkeeper/proto/NIOServerFactory.class */
public class NIOServerFactory extends Thread {
    ServerStats stats;
    Logger LOG;
    ServerSocketChannel ss;
    Selector selector;
    ByteBuffer directBuffer;
    HashSet<Cnxn> cnxns;
    int outstandingLimit;
    PacketProcessor processor;
    long minLatency;
    ServerConfiguration conf;
    static final ByteBuffer closeConn = ByteBuffer.allocate(0);

    /* loaded from: input_file:org/apache/bk_v4_0_0/bookkeeper/proto/NIOServerFactory$Cnxn.class */
    public class Cnxn {
        private SocketChannel sock;
        private SelectionKey sk;
        boolean initialized;
        ByteBuffer incomingBuffer;
        int sessionTimeout;
        int packetsSent;
        int packetsReceived;
        int outstandingRequests;
        String peerName;
        boolean closed;
        ByteBuffer lenBuffer = ByteBuffer.allocate(4);
        LinkedBlockingQueue<ByteBuffer> outgoingBuffers = new LinkedBlockingQueue<>();
        private CnxnStats cnxnStats = new CnxnStats();

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/bk_v4_0_0/bookkeeper/proto/NIOServerFactory$Cnxn$CnxnStats.class */
        public class CnxnStats {
            long packetsReceived;
            long packetsSent;

            private CnxnStats() {
            }

            public long getOutstandingRequests() {
                return Cnxn.this.outstandingRequests;
            }

            public long getPacketsReceived() {
                return this.packetsReceived;
            }

            public long getPacketsSent() {
                return this.packetsSent;
            }

            public String toString() {
                StringBuilder sb = new StringBuilder();
                SelectableChannel channel = Cnxn.this.sk.channel();
                if (channel instanceof SocketChannel) {
                    sb.append(" ").append(((SocketChannel) channel).socket().getRemoteSocketAddress()).append("[").append(Integer.toHexString(Cnxn.this.sk.interestOps())).append("](queued=").append(getOutstandingRequests()).append(",recved=").append(getPacketsReceived()).append(",sent=").append(getPacketsSent()).append(")\n");
                }
                return sb.toString();
            }
        }

        void doIO(SelectionKey selectionKey) throws InterruptedException {
            try {
                if (this.sock == null) {
                    return;
                }
                if (selectionKey.isReadable()) {
                    if (this.sock.read(this.incomingBuffer) < 0) {
                        throw new IOException("Read error");
                    }
                    if (this.incomingBuffer.remaining() == 0) {
                        this.incomingBuffer.flip();
                        if (this.incomingBuffer == this.lenBuffer) {
                            readLength(selectionKey);
                        } else {
                            this.cnxnStats.packetsReceived++;
                            NIOServerFactory.this.stats.incrementPacketsReceived();
                            try {
                                readRequest();
                                this.lenBuffer.clear();
                                this.incomingBuffer = this.lenBuffer;
                            } catch (Throwable th) {
                                this.lenBuffer.clear();
                                this.incomingBuffer = this.lenBuffer;
                                throw th;
                            }
                        }
                    }
                }
                if (selectionKey.isWritable()) {
                    if (this.outgoingBuffers.size() > 0) {
                        NIOServerFactory.this.directBuffer.clear();
                        Iterator<ByteBuffer> it = this.outgoingBuffers.iterator();
                        while (it.hasNext()) {
                            ByteBuffer next = it.next();
                            if (NIOServerFactory.this.directBuffer.remaining() < next.remaining()) {
                                next = (ByteBuffer) next.slice().limit(NIOServerFactory.this.directBuffer.remaining());
                            }
                            int position = next.position();
                            NIOServerFactory.this.directBuffer.put(next);
                            next.position(position);
                            if (NIOServerFactory.this.directBuffer.remaining() == 0) {
                                break;
                            }
                        }
                        NIOServerFactory.this.directBuffer.flip();
                        int write = this.sock.write(NIOServerFactory.this.directBuffer);
                        while (true) {
                            if (this.outgoingBuffers.size() <= 0) {
                                break;
                            }
                            ByteBuffer peek = this.outgoingBuffers.peek();
                            if (peek == NIOServerFactory.closeConn) {
                                throw new IOException("closing");
                            }
                            if (peek.remaining() - write > 0) {
                                peek.position(peek.position() + write);
                                break;
                            }
                            this.cnxnStats.packetsSent++;
                            write -= peek.remaining();
                            ServerStats.getInstance().incrementPacketsSent();
                            this.outgoingBuffers.remove();
                        }
                    }
                    synchronized (this) {
                        if (this.outgoingBuffers.size() != 0) {
                            this.sk.interestOps(this.sk.interestOps() | 4);
                        } else {
                            if (!this.initialized && (this.sk.interestOps() & 1) == 0) {
                                throw new IOException("Responded to info probe");
                            }
                            this.sk.interestOps(this.sk.interestOps() & (-5));
                        }
                    }
                }
            } catch (IOException e) {
                close();
            } catch (CancelledKeyException e2) {
                close();
            }
        }

        private void readRequest() throws IOException {
            this.incomingBuffer = this.incomingBuffer.slice();
            NIOServerFactory.this.processor.processPacket(this.incomingBuffer, this);
        }

        public void disableRecv() {
            this.sk.interestOps(this.sk.interestOps() & (-2));
        }

        public void enableRecv() {
            if (this.sk.isValid()) {
                int interestOps = this.sk.interestOps();
                if ((interestOps & 1) == 0) {
                    this.sk.interestOps(interestOps | 1);
                }
            }
        }

        private void readLength(SelectionKey selectionKey) throws IOException {
            int i = this.lenBuffer.getInt();
            if (i < 0 || i > 1048575) {
                throw new IOException("Len error " + i);
            }
            this.incomingBuffer = ByteBuffer.allocate(i);
        }

        public int getSessionTimeout() {
            return this.sessionTimeout;
        }

        public Cnxn(SocketChannel socketChannel, SelectionKey selectionKey) throws IOException {
            this.incomingBuffer = this.lenBuffer;
            this.peerName = null;
            this.sock = socketChannel;
            this.sk = selectionKey;
            socketChannel.socket().setTcpNoDelay(NIOServerFactory.this.conf.getServerTcpNoDelay());
            socketChannel.socket().setSoLinger(true, 2);
            selectionKey.interestOps(1);
            if (NIOServerFactory.this.LOG.isTraceEnabled()) {
                this.peerName = socketChannel.socket().toString();
            }
            this.lenBuffer.clear();
            this.incomingBuffer = this.lenBuffer;
        }

        public String toString() {
            return "NIOServerCnxn object with sock = " + this.sock + " and sk = " + this.sk;
        }

        public String getPeerName() {
            if (this.peerName == null) {
                this.peerName = this.sock.socket().toString();
            }
            return this.peerName;
        }

        public void close() {
            if (this.closed) {
                return;
            }
            this.closed = true;
            synchronized (NIOServerFactory.this.cnxns) {
                NIOServerFactory.this.cnxns.remove(this);
            }
            NIOServerFactory.this.LOG.debug("close  NIOServerCnxn: " + this.sock);
            try {
                this.sock.socket().shutdownOutput();
            } catch (IOException e) {
            }
            try {
                this.sock.socket().shutdownInput();
            } catch (IOException e2) {
            }
            try {
                this.sock.socket().close();
            } catch (IOException e3) {
                NIOServerFactory.this.LOG.error("FIXMSG", e3);
            }
            try {
                this.sock.close();
            } catch (IOException e4) {
                NIOServerFactory.this.LOG.error("FIXMSG", e4);
            }
            this.sock = null;
            if (this.sk != null) {
                try {
                    this.sk.cancel();
                } catch (Exception e5) {
                }
            }
        }

        private void makeWritable(SelectionKey selectionKey) {
            try {
                NIOServerFactory.this.selector.wakeup();
                if (selectionKey.isValid()) {
                    selectionKey.interestOps(selectionKey.interestOps() | 4);
                }
            } catch (RuntimeException e) {
                NIOServerFactory.this.LOG.error("Problem setting writable", e);
                throw e;
            }
        }

        private void sendBuffers(ByteBuffer[] byteBufferArr) {
            ByteBuffer allocate = ByteBuffer.allocate(4);
            int i = 0;
            for (int i2 = 0; i2 < byteBufferArr.length; i2++) {
                if (byteBufferArr[i2] != null) {
                    i += byteBufferArr[i2].remaining();
                }
            }
            if (NIOServerFactory.this.LOG.isTraceEnabled()) {
                NIOServerFactory.this.LOG.debug("Sending response of size " + i + " to " + this.peerName);
            }
            allocate.putInt(i);
            allocate.flip();
            this.outgoingBuffers.add(allocate);
            for (int i3 = 0; i3 < byteBufferArr.length; i3++) {
                if (byteBufferArr[i3] != null) {
                    this.outgoingBuffers.add(byteBufferArr[i3]);
                }
            }
            makeWritable(this.sk);
        }

        public synchronized void sendResponse(ByteBuffer... byteBufferArr) {
            if (this.closed) {
                return;
            }
            sendBuffers(byteBufferArr);
            synchronized (NIOServerFactory.this) {
                this.outstandingRequests--;
                if (this.outstandingRequests < NIOServerFactory.this.outstandingLimit) {
                    this.sk.selector().wakeup();
                    enableRecv();
                }
            }
        }

        public InetSocketAddress getRemoteAddress() {
            return (InetSocketAddress) this.sock.socket().getRemoteSocketAddress();
        }

        public CnxnStats getStats() {
            return this.cnxnStats;
        }
    }

    /* loaded from: input_file:org/apache/bk_v4_0_0/bookkeeper/proto/NIOServerFactory$PacketProcessor.class */
    public interface PacketProcessor {
        void processPacket(ByteBuffer byteBuffer, Cnxn cnxn);
    }

    public NIOServerFactory(ServerConfiguration serverConfiguration, PacketProcessor packetProcessor) throws IOException {
        super("NIOServerFactory");
        this.stats = new ServerStats();
        this.LOG = LoggerFactory.getLogger(NIOServerFactory.class);
        this.selector = Selector.open();
        this.directBuffer = ByteBuffer.allocateDirect(65536);
        this.cnxns = new HashSet<>();
        this.outstandingLimit = 2000;
        this.minLatency = 99999999L;
        setDaemon(true);
        this.processor = packetProcessor;
        this.conf = serverConfiguration;
        this.ss = ServerSocketChannel.open();
        this.ss.socket().bind(new InetSocketAddress(serverConfiguration.getBookiePort()));
        this.ss.configureBlocking(false);
        this.ss.register(this.selector, 16);
        start();
    }

    public InetSocketAddress getLocalAddress() {
        return (InetSocketAddress) this.ss.socket().getLocalSocketAddress();
    }

    private void addCnxn(Cnxn cnxn) {
        synchronized (this.cnxns) {
            this.cnxns.add(cnxn);
        }
    }

    public boolean isRunning() {
        return !this.ss.socket().isClosed();
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        Set<SelectionKey> selectedKeys;
        while (!this.ss.socket().isClosed()) {
            try {
                this.selector.select(1000L);
                synchronized (this) {
                    selectedKeys = this.selector.selectedKeys();
                }
                ArrayList arrayList = new ArrayList(selectedKeys);
                Collections.shuffle(arrayList);
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    SelectionKey selectionKey = (SelectionKey) it.next();
                    if ((selectionKey.readyOps() & 16) != 0) {
                        SocketChannel accept = ((ServerSocketChannel) selectionKey.channel()).accept();
                        accept.configureBlocking(false);
                        SelectionKey register = accept.register(this.selector, 1);
                        Cnxn cnxn = new Cnxn(accept, register);
                        register.attach(cnxn);
                        addCnxn(cnxn);
                    } else if ((selectionKey.readyOps() & 5) != 0) {
                        ((Cnxn) selectionKey.attachment()).doIO(selectionKey);
                    }
                }
                selectedKeys.clear();
            } catch (Exception e) {
                this.LOG.warn("Exception in server socket loop: " + this.ss.socket().getInetAddress(), e);
            }
        }
        this.LOG.info("NIOServerCnxn factory exitedloop.");
        clear();
    }

    public synchronized void clear() {
        this.selector.wakeup();
        synchronized (this.cnxns) {
            Iterator<Cnxn> it = this.cnxns.iterator();
            while (it.hasNext()) {
                Cnxn next = it.next();
                it.remove();
                try {
                    next.close();
                } catch (Exception e) {
                }
            }
        }
    }

    public void shutdown() {
        try {
            this.ss.close();
            clear();
            interrupt();
            join();
        } catch (InterruptedException e) {
            this.LOG.warn("Interrupted", e);
        } catch (Exception e2) {
            this.LOG.error("Unexpected exception", e2);
        }
    }
}
