package org.apache.rocketmq.store.ha;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.remoting.common.RemotingUtil;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/*  JADX ERROR: NullPointerException in pass: ClassModifier
    java.lang.NullPointerException
    */
/* loaded from: input_file:BOOT-INF/lib/rocketmq-store-4.0.0-incubating.jar:org/apache/rocketmq/store/ha/HAConnection.class */
public class HAConnection {
    private static final Logger log = LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
    private final HAService haService;
    private final SocketChannel socketChannel;
    private final String clientAddr;
    private WriteSocketService writeSocketService;
    private ReadSocketService readSocketService;
    private volatile long slaveRequestOffset = -1;
    private volatile long slaveAckOffset = -1;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/rocketmq-store-4.0.0-incubating.jar:org/apache/rocketmq/store/ha/HAConnection$ReadSocketService.class */
    public class ReadSocketService extends ServiceThread {
        private static final int READ_MAX_BUFFER_SIZE = 1048576;
        private final SocketChannel socketChannel;
        private final ByteBuffer byteBufferRead = ByteBuffer.allocate(1048576);
        private int processPostion = 0;
        private volatile long lastReadTimestamp = System.currentTimeMillis();
        private final Selector selector = RemotingUtil.openSelector();

        public ReadSocketService(SocketChannel socketChannel) throws IOException {
            this.socketChannel = socketChannel;
            this.socketChannel.register(this.selector, 1);
            this.thread.setDaemon(true);
        }

        @Override // java.lang.Runnable
        public void run() {
            HAConnection.log.info(getServiceName() + " service started");
            while (!isStopped()) {
                try {
                    this.selector.select(1000L);
                    if (processReadEvent()) {
                        long now = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now() - this.lastReadTimestamp;
                        if (now > HAConnection.this.haService.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) {
                            HAConnection.log.warn("ha housekeeping, found this connection[" + HAConnection.this.clientAddr + "] expired, " + now);
                        }
                    } else {
                        HAConnection.log.error("processReadEvent error");
                    }
                    break;
                } catch (Exception e) {
                    HAConnection.log.error(getServiceName() + " service has exception.", (Throwable) e);
                }
            }
            makeStop();
            HAConnection.this.writeSocketService.makeStop();
            HAConnection.this.haService.removeConnection(HAConnection.this);
            HAConnection.this.haService.getConnectionCount().decrementAndGet();
            SelectionKey keyFor = this.socketChannel.keyFor(this.selector);
            if (keyFor != null) {
                keyFor.cancel();
            }
            try {
                this.selector.close();
                this.socketChannel.close();
            } catch (IOException e2) {
                HAConnection.log.error("", (Throwable) e2);
            }
            HAConnection.log.info(getServiceName() + " service end");
        }

        @Override // org.apache.rocketmq.common.ServiceThread
        public String getServiceName() {
            return ReadSocketService.class.getSimpleName();
        }

        /*  JADX ERROR: JadxRuntimeException in pass: InlineMethods
            jadx.core.utils.exceptions.JadxRuntimeException: Failed to process method for inline: org.apache.rocketmq.store.ha.HAConnection.access$402(org.apache.rocketmq.store.ha.HAConnection, long):long
            	at jadx.core.dex.visitors.InlineMethods.processInvokeInsn(InlineMethods.java:74)
            	at jadx.core.dex.visitors.InlineMethods.visit(InlineMethods.java:49)
            Caused by: jadx.core.utils.exceptions.JadxRuntimeException: Class not yet loaded at codegen stage: org.apache.rocketmq.store.ha.HAConnection
            	at jadx.core.dex.nodes.ClassNode.reloadAtCodegenStage(ClassNode.java:883)
            	at jadx.core.dex.visitors.InlineMethods.processInvokeInsn(InlineMethods.java:66)
            	... 1 more
            */
        private boolean processReadEvent() {
            /*
                Method dump skipped, instructions count: 296
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.rocketmq.store.ha.HAConnection.ReadSocketService.processReadEvent():boolean");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:BOOT-INF/lib/rocketmq-store-4.0.0-incubating.jar:org/apache/rocketmq/store/ha/HAConnection$WriteSocketService.class */
    public class WriteSocketService extends ServiceThread {
        private final SocketChannel socketChannel;
        private SelectMappedBufferResult selectMappedBufferResult;
        private final int headerSize = 12;
        private final ByteBuffer byteBufferHeader = ByteBuffer.allocate(12);
        private long nextTransferFromWhere = -1;
        private boolean lastWriteOver = true;
        private long lastWriteTimestamp = System.currentTimeMillis();
        private final Selector selector = RemotingUtil.openSelector();

        public WriteSocketService(SocketChannel socketChannel) throws IOException {
            this.socketChannel = socketChannel;
            this.socketChannel.register(this.selector, 4);
            this.thread.setDaemon(true);
        }

        /* JADX WARN: Removed duplicated region for block: B:30:0x017c A[Catch: Exception -> 0x020f, TryCatch #0 {Exception -> 0x020f, blocks: (B:5:0x0025, B:49:0x003e, B:8:0x0047, B:10:0x0052, B:12:0x005e, B:15:0x008d, B:16:0x00a0, B:17:0x0095, B:18:0x00dc, B:38:0x00e3, B:40:0x010f, B:28:0x0166, B:30:0x017c, B:32:0x0195, B:33:0x01a6, B:36:0x01fc, B:21:0x0154), top: B:4:0x0025 }] */
        /* JADX WARN: Removed duplicated region for block: B:36:0x01fc A[Catch: Exception -> 0x020f, TryCatch #0 {Exception -> 0x020f, blocks: (B:5:0x0025, B:49:0x003e, B:8:0x0047, B:10:0x0052, B:12:0x005e, B:15:0x008d, B:16:0x00a0, B:17:0x0095, B:18:0x00dc, B:38:0x00e3, B:40:0x010f, B:28:0x0166, B:30:0x017c, B:32:0x0195, B:33:0x01a6, B:36:0x01fc, B:21:0x0154), top: B:4:0x0025 }] */
        @Override // java.lang.Runnable
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public void run() {
            /*
                Method dump skipped, instructions count: 684
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.rocketmq.store.ha.HAConnection.WriteSocketService.run():void");
        }

        private boolean transferData() throws Exception {
            int i = 0;
            while (this.byteBufferHeader.hasRemaining()) {
                int write = this.socketChannel.write(this.byteBufferHeader);
                if (write > 0) {
                    i = 0;
                    this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
                } else {
                    if (write != 0) {
                        throw new Exception("ha master write header error < 0");
                    }
                    i++;
                    if (i >= 3) {
                        break;
                    }
                }
            }
            if (null == this.selectMappedBufferResult) {
                return !this.byteBufferHeader.hasRemaining();
            }
            int i2 = 0;
            if (!this.byteBufferHeader.hasRemaining()) {
                while (this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
                    int write2 = this.socketChannel.write(this.selectMappedBufferResult.getByteBuffer());
                    if (write2 > 0) {
                        i2 = 0;
                        this.lastWriteTimestamp = HAConnection.this.haService.getDefaultMessageStore().getSystemClock().now();
                    } else {
                        if (write2 != 0) {
                            throw new Exception("ha master write body error < 0");
                        }
                        i2++;
                        if (i2 >= 3) {
                            break;
                        }
                    }
                }
            }
            boolean z = (this.byteBufferHeader.hasRemaining() || this.selectMappedBufferResult.getByteBuffer().hasRemaining()) ? false : true;
            if (!this.selectMappedBufferResult.getByteBuffer().hasRemaining()) {
                this.selectMappedBufferResult.release();
                this.selectMappedBufferResult = null;
            }
            return z;
        }

        @Override // org.apache.rocketmq.common.ServiceThread
        public String getServiceName() {
            return WriteSocketService.class.getSimpleName();
        }

        @Override // org.apache.rocketmq.common.ServiceThread
        public void shutdown() {
            super.shutdown();
        }
    }

    public HAConnection(HAService hAService, SocketChannel socketChannel) throws IOException {
        this.haService = hAService;
        this.socketChannel = socketChannel;
        this.clientAddr = this.socketChannel.socket().getRemoteSocketAddress().toString();
        this.socketChannel.configureBlocking(false);
        this.socketChannel.socket().setSoLinger(false, -1);
        this.socketChannel.socket().setTcpNoDelay(true);
        this.socketChannel.socket().setReceiveBufferSize(65536);
        this.socketChannel.socket().setSendBufferSize(65536);
        this.writeSocketService = new WriteSocketService(this.socketChannel);
        this.readSocketService = new ReadSocketService(this.socketChannel);
        this.haService.getConnectionCount().incrementAndGet();
    }

    public void start() {
        this.readSocketService.start();
        this.writeSocketService.start();
    }

    public void shutdown() {
        this.writeSocketService.shutdown(true);
        this.readSocketService.shutdown(true);
        close();
    }

    public void close() {
        if (this.socketChannel != null) {
            try {
                this.socketChannel.close();
            } catch (IOException e) {
                log.error("", (Throwable) e);
            }
        }
    }

    public SocketChannel getSocketChannel() {
        return this.socketChannel;
    }

    /*  JADX ERROR: Failed to decode insn: 0x0002: MOVE_MULTI, method: org.apache.rocketmq.store.ha.HAConnection.access$402(org.apache.rocketmq.store.ha.HAConnection, long):long
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    static /* synthetic */ long access$402(org.apache.rocketmq.store.ha.HAConnection r6, long r7) {
        /*
            r0 = r6
            r1 = r7
            // decode failed: arraycopy: source index -1 out of bounds for object array[6]
            r0.slaveAckOffset = r1
            return r-1
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.rocketmq.store.ha.HAConnection.access$402(org.apache.rocketmq.store.ha.HAConnection, long):long");
    }

    static /* synthetic */ long access$500(HAConnection hAConnection) {
        return hAConnection.slaveRequestOffset;
    }

    /*  JADX ERROR: Failed to decode insn: 0x0002: MOVE_MULTI, method: org.apache.rocketmq.store.ha.HAConnection.access$502(org.apache.rocketmq.store.ha.HAConnection, long):long
        java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[6]
        	at java.base/java.lang.System.arraycopy(Native Method)
        	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
        	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
        	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
        	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
        	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
        	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
        	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
        	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
        	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
        	at jadx.core.ProcessClass.process(ProcessClass.java:70)
        	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
        	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
        	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
        	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
        */
    static /* synthetic */ long access$502(org.apache.rocketmq.store.ha.HAConnection r6, long r7) {
        /*
            r0 = r6
            r1 = r7
            // decode failed: arraycopy: source index -1 out of bounds for object array[6]
            r0.slaveRequestOffset = r1
            return r-1
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.rocketmq.store.ha.HAConnection.access$502(org.apache.rocketmq.store.ha.HAConnection, long):long");
    }

    static /* synthetic */ long access$400(HAConnection hAConnection) {
        return hAConnection.slaveAckOffset;
    }

    static {
    }
}
