package com.bigdata.ha.pipeline;

import com.bigdata.ha.pipeline.HAWriteMessageBase;
import com.bigdata.util.ChecksumError;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.zip.Adler32;
import org.apache.log4j.Logger;

/* loaded from: input_file:WEB-INF/lib/bigdata-0.83.2.jar:com/bigdata/ha/pipeline/HAReceiveService.class */
public class HAReceiveService<M extends HAWriteMessageBase> extends Thread {
    protected static final Logger log;
    private final InetSocketAddress addrSelf;
    private final IHAReceiveCallback<M> callback;
    private final HASendService downstream;
    private final ExecutorService executor;
    private final Lock lock;
    private final Condition futureReady;
    private final Condition messageReady;
    private RunState runState;
    private HAWriteMessageBase message;
    private ByteBuffer localBuffer;
    private FutureTask<Void> readFuture;
    private FutureTask<Void> waitFuture;
    private volatile InetSocketAddress addrNext;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/bigdata-0.83.2.jar:com/bigdata/ha/pipeline/HAReceiveService$Client.class */
    public static class Client {
        final SocketChannel client;
        final Selector clientSelector;
        final SelectionKey clientKey;
        final HASendService downstream;

        public Client(ServerSocketChannel serverSocketChannel, HASendService hASendService, InetSocketAddress inetSocketAddress) throws IOException {
            try {
                this.client = serverSocketChannel.accept();
                this.client.configureBlocking(false);
                this.clientSelector = Selector.open();
                this.clientKey = this.client.register(this.clientSelector, 1);
                this.downstream = hASendService;
                if (inetSocketAddress != null) {
                    hASendService.start(inetSocketAddress);
                }
            } catch (IOException e) {
                close();
                throw e;
            }
        }

        public void close() throws IOException {
            this.clientKey.cancel();
            try {
                this.client.close();
                try {
                    this.clientSelector.close();
                    if (this.downstream != null) {
                        this.downstream.terminate();
                    }
                } finally {
                }
            } catch (Throwable th) {
                try {
                    this.clientSelector.close();
                    if (this.downstream != null) {
                        this.downstream.terminate();
                    }
                    throw th;
                } finally {
                }
            }
        }
    }

    /* loaded from: input_file:WEB-INF/lib/bigdata-0.83.2.jar:com/bigdata/ha/pipeline/HAReceiveService$IHAReceiveCallback.class */
    public interface IHAReceiveCallback<M extends HAWriteMessageBase> {
        void callback(M m, ByteBuffer byteBuffer) throws Exception;
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/bigdata-0.83.2.jar:com/bigdata/ha/pipeline/HAReceiveService$ReadTask.class */
    public static class ReadTask<M extends HAWriteMessageBase> implements Callable<Void> {
        private final ServerSocketChannel server;
        private final AtomicReference<Client> clientRef;
        private final M message;
        private final ByteBuffer localBuffer;
        private final HASendService downstream;
        private final InetSocketAddress addrNext;
        private final IHAReceiveCallback<M> callback;
        private final Adler32 chk = new Adler32();
        private final byte[] a = new byte[512];
        static final /* synthetic */ boolean $assertionsDisabled;

        public ReadTask(ServerSocketChannel serverSocketChannel, AtomicReference<Client> atomicReference, M m, ByteBuffer byteBuffer, HASendService hASendService, InetSocketAddress inetSocketAddress, IHAReceiveCallback<M> iHAReceiveCallback) {
            if (serverSocketChannel == null) {
                throw new IllegalArgumentException();
            }
            if (atomicReference == null) {
                throw new IllegalArgumentException();
            }
            if (m == null) {
                throw new IllegalArgumentException();
            }
            if (byteBuffer == null) {
                throw new IllegalArgumentException();
            }
            if (hASendService == null) {
                throw new IllegalArgumentException();
            }
            this.server = serverSocketChannel;
            this.clientRef = atomicReference;
            this.message = m;
            this.localBuffer = byteBuffer;
            this.downstream = hASendService;
            this.addrNext = inetSocketAddress;
            this.callback = iHAReceiveCallback;
        }

        /* JADX WARN: Finally extract failed */
        protected void awaitAccept() throws IOException {
            Selector open = Selector.open();
            try {
                SelectionKey register = this.server.register(open, 16);
                try {
                    open.select();
                    Iterator<SelectionKey> it2 = open.selectedKeys().iterator();
                    if (it2.hasNext()) {
                        SelectionKey next = it2.next();
                        it2.remove();
                        if (next != register) {
                            throw new AssertionError();
                        }
                    }
                    register.cancel();
                } catch (Throwable th) {
                    register.cancel();
                    throw th;
                }
            } finally {
                open.close();
            }
        }

        private void updateChk(int i) {
            ByteBuffer asReadOnlyBuffer = this.localBuffer.asReadOnlyBuffer();
            int position = asReadOnlyBuffer.position();
            asReadOnlyBuffer.position(position - i);
            int i2 = position - i;
            while (true) {
                int i3 = i2;
                if (i3 >= position) {
                    return;
                }
                int min = Math.min(position - i3, this.a.length);
                asReadOnlyBuffer.get(this.a, 0, min);
                this.chk.update(this.a, 0, min);
                i2 = i3 + this.a.length;
            }
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Void call() throws Exception {
            Client client = this.clientRef.get();
            if (client == null) {
                awaitAccept();
                client = new Client(this.server, this.downstream, this.addrNext);
                this.clientRef.set(client);
            }
            int size = this.message.getSize();
            while (size > 0) {
                client.clientSelector.select();
                Iterator<SelectionKey> it2 = client.clientSelector.selectedKeys().iterator();
                while (it2.hasNext()) {
                    it2.next();
                    it2.remove();
                    int read = client.client.read(this.localBuffer);
                    if (HAReceiveService.log.isTraceEnabled()) {
                        HAReceiveService.log.trace("Read " + read + " bytes");
                    }
                    if (read > 0) {
                        updateChk(read);
                    }
                    if (read == -1) {
                        break;
                    }
                    size -= read;
                    if (this.addrNext != null) {
                        if (HAReceiveService.log.isTraceEnabled()) {
                            HAReceiveService.log.trace("Incremental send of " + read + " bytes");
                        }
                        ByteBuffer asReadOnlyBuffer = this.localBuffer.asReadOnlyBuffer();
                        asReadOnlyBuffer.position(this.localBuffer.position() - read);
                        asReadOnlyBuffer.limit(this.localBuffer.position());
                        this.downstream.send(asReadOnlyBuffer).get();
                    }
                }
            }
            if (!$assertionsDisabled && this.localBuffer.position() != this.message.getSize()) {
                throw new AssertionError("localBuffer.pos=" + this.localBuffer.position() + ", message.size=" + this.message.getSize());
            }
            this.localBuffer.flip();
            if (this.message.getChk() != ((int) this.chk.getValue())) {
                throw new ChecksumError("msg=" + this.message.toString() + ", actual=" + this.chk.getValue());
            }
            if (this.callback == null) {
                return null;
            }
            this.callback.callback(this.message, this.localBuffer);
            return null;
        }

        static {
            $assertionsDisabled = !HAReceiveService.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/bigdata-0.83.2.jar:com/bigdata/ha/pipeline/HAReceiveService$RunState.class */
    public enum RunState {
        Start(0),
        Running(1),
        ShuttingDown(2),
        Shutdown(3);

        private final int level;

        RunState(int i) {
            this.level = i;
        }
    }

    @Override // java.lang.Thread
    public String toString() {
        return super.toString() + "{addrSelf=" + this.addrSelf + ", addrNext=" + this.addrNext + "}";
    }

    public HAReceiveService(InetSocketAddress inetSocketAddress, InetSocketAddress inetSocketAddress2) {
        this(inetSocketAddress, inetSocketAddress2, null);
    }

    public HAReceiveService(InetSocketAddress inetSocketAddress, InetSocketAddress inetSocketAddress2, IHAReceiveCallback<M> iHAReceiveCallback) {
        this.executor = Executors.newSingleThreadExecutor();
        this.lock = new ReentrantLock();
        this.futureReady = this.lock.newCondition();
        this.messageReady = this.lock.newCondition();
        this.runState = RunState.Start;
        if (inetSocketAddress == null) {
            throw new IllegalArgumentException();
        }
        this.addrSelf = inetSocketAddress;
        this.addrNext = inetSocketAddress2;
        this.callback = iHAReceiveCallback;
        this.downstream = new HASendService();
        setDaemon(true);
        if (log.isInfoEnabled()) {
            log.info("Created: " + this);
        }
    }

    protected void finalize() throws Throwable {
        terminate();
        super.finalize();
    }

    public void terminate() {
        this.lock.lock();
        try {
            switch (this.runState) {
                case ShuttingDown:
                case Shutdown:
                    return;
                default:
                    this.runState = RunState.ShuttingDown;
                    interrupt();
                    this.lock.unlock();
                    if (this.downstream != null) {
                        this.downstream.terminate();
                    }
                    this.executor.shutdownNow();
                    return;
            }
        } finally {
            this.lock.unlock();
        }
        this.lock.unlock();
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:4:0x0014. Please report as an issue. */
    public void awaitShutdown() throws InterruptedException {
        this.lock.lockInterruptibly();
        while (true) {
            try {
                switch (this.runState) {
                    case ShuttingDown:
                    case Start:
                    case Running:
                        this.futureReady.await();
                    case Shutdown:
                        return;
                    default:
                        throw new AssertionError();
                }
            } finally {
                this.lock.unlock();
            }
        }
    }

    @Override // java.lang.Thread
    public void start() {
        super.start();
        this.lock.lock();
        while (this.runState == RunState.Start) {
            try {
                try {
                    this.futureReady.await();
                } catch (InterruptedException e) {
                }
            } finally {
                this.lock.unlock();
            }
        }
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        this.lock.lock();
        try {
            this.runState = RunState.Running;
            this.futureReady.signalAll();
            this.messageReady.signalAll();
            this.lock.unlock();
            ServerSocketChannel serverSocketChannel = null;
            try {
                try {
                    try {
                        serverSocketChannel = ServerSocketChannel.open();
                        serverSocketChannel.socket().bind(this.addrSelf);
                        serverSocketChannel.configureBlocking(false);
                        if (log.isInfoEnabled()) {
                            log.info("Listening on" + this.addrSelf);
                        }
                        runNoBlock(serverSocketChannel);
                        if (serverSocketChannel != null) {
                            try {
                                serverSocketChannel.close();
                            } catch (IOException e) {
                                log.error(e, e);
                            }
                        }
                        this.lock.lock();
                        try {
                            this.runState = RunState.Shutdown;
                            this.messageReady.signalAll();
                            this.futureReady.signalAll();
                            this.lock.unlock();
                        } finally {
                        }
                    } catch (InterruptedException e2) {
                        log.info("Shutdown");
                        if (serverSocketChannel != null) {
                            try {
                                serverSocketChannel.close();
                            } catch (IOException e3) {
                                log.error(e3, e3);
                            }
                        }
                        this.lock.lock();
                        try {
                            this.runState = RunState.Shutdown;
                            this.messageReady.signalAll();
                            this.futureReady.signalAll();
                            this.lock.unlock();
                        } finally {
                            this.lock.unlock();
                        }
                    }
                } catch (Throwable th) {
                    if (serverSocketChannel != null) {
                        try {
                            serverSocketChannel.close();
                        } catch (IOException e4) {
                            log.error(e4, e4);
                        }
                    }
                    this.lock.lock();
                    try {
                        this.runState = RunState.Shutdown;
                        this.messageReady.signalAll();
                        this.futureReady.signalAll();
                        this.lock.unlock();
                        throw th;
                    } finally {
                    }
                }
            } catch (Throwable th2) {
                log.error(th2, th2);
                throw new RuntimeException(th2);
            }
        } finally {
            this.lock.unlock();
        }
    }

    /* JADX WARN: Failed to find 'out' block for switch in B:8:0x0023. Please report as an issue. */
    /* JADX WARN: Removed duplicated region for block: B:55:0x013f  */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void runNoBlock(java.nio.channels.ServerSocketChannel r14) throws java.io.IOException, java.lang.InterruptedException, java.util.concurrent.ExecutionException {
        /*
            Method dump skipped, instructions count: 327
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: com.bigdata.ha.pipeline.HAReceiveService.runNoBlock(java.nio.channels.ServerSocketChannel):void");
    }

    public Future<Void> receiveData(M m, ByteBuffer byteBuffer) throws InterruptedException {
        if (m == null) {
            throw new IllegalArgumentException();
        }
        if (byteBuffer == null) {
            throw new IllegalArgumentException();
        }
        this.lock.lockInterruptibly();
        try {
            this.message = m;
            this.localBuffer = byteBuffer;
            this.localBuffer.limit(this.message.getSize());
            this.localBuffer.position(0);
            this.messageReady.signal();
            if (log.isTraceEnabled()) {
                log.trace("Will accept data for message: msg=" + m);
            }
            while (this.waitFuture == null) {
                switch (this.runState) {
                    case ShuttingDown:
                    case Shutdown:
                        throw new RuntimeException("Service closed.");
                    case Start:
                    case Running:
                        this.futureReady.await();
                    default:
                        throw new AssertionError();
                }
            }
            if (!$assertionsDisabled && this.waitFuture == null) {
                throw new AssertionError();
            }
            FutureTask<Void> futureTask = this.waitFuture;
            this.waitFuture = null;
            this.lock.unlock();
            return futureTask;
        } catch (Throwable th) {
            this.waitFuture = null;
            this.lock.unlock();
            throw th;
        }
    }

    public void changeDownStream(InetSocketAddress inetSocketAddress) {
        this.lock.lock();
        try {
            if (this.readFuture != null) {
                this.readFuture.cancel(true);
            }
            this.downstream.terminate();
            this.addrNext = inetSocketAddress;
            this.lock.unlock();
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    static {
        $assertionsDisabled = !HAReceiveService.class.desiredAssertionStatus();
        log = Logger.getLogger(HAReceiveService.class);
    }
}
