package com.bigdata.ha.pipeline;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.log4j.Logger;

/* loaded from: input_file:WEB-INF/lib/bigdata-0.83.2.jar:com/bigdata/ha/pipeline/HASendService.class */
public class HASendService {
    protected static final Logger log = Logger.getLogger(HASendService.class);
    private final AtomicReference<InetSocketAddress> addr = new AtomicReference<>();
    private final AtomicReference<ExecutorService> executorRef = new AtomicReference<>();
    private final AtomicReference<SocketChannel> socketChannel = new AtomicReference<>();

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:WEB-INF/lib/bigdata-0.83.2.jar:com/bigdata/ha/pipeline/HASendService$IncSendTask.class */
    public static class IncSendTask implements Callable<Void> {
        private final SocketChannel socketChannel;
        private final ByteBuffer data;
        static final /* synthetic */ boolean $assertionsDisabled;

        public IncSendTask(SocketChannel socketChannel, ByteBuffer byteBuffer) {
            if (socketChannel == null) {
                throw new IllegalArgumentException();
            }
            if (byteBuffer == null) {
                throw new IllegalArgumentException();
            }
            this.socketChannel = socketChannel;
            this.data = byteBuffer;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Void call() throws Exception {
            int remaining = this.data.remaining();
            this.socketChannel.write(this.data);
            if (HASendService.log.isTraceEnabled()) {
                HASendService.log.trace("Sent " + remaining + " bytes");
            }
            if ($assertionsDisabled || this.data.remaining() == 0) {
                return null;
            }
            throw new AssertionError("remaining=" + this.data.remaining());
        }

        static {
            $assertionsDisabled = !HASendService.class.desiredAssertionStatus();
        }
    }

    public String toString() {
        return super.toString() + "{addr=" + this.addr + "}";
    }

    public InetSocketAddress getAddr() {
        return this.addr.get();
    }

    protected void finalize() throws Throwable {
        terminate();
        super.finalize();
    }

    public synchronized void start(InetSocketAddress inetSocketAddress) {
        if (inetSocketAddress == null) {
            throw new IllegalArgumentException();
        }
        if (this.executorRef.get() != null) {
            throw new IllegalStateException();
        }
        if (log.isInfoEnabled()) {
            log.info(toString());
        }
        this.addr.set(inetSocketAddress);
        this.socketChannel.set(null);
        this.executorRef.set(Executors.newSingleThreadExecutor());
    }

    public synchronized void terminate() {
        if (log.isInfoEnabled()) {
            log.info(toString());
        }
        ExecutorService andSet = this.executorRef.getAndSet(null);
        if (andSet == null) {
            return;
        }
        try {
            SocketChannel socketChannel = this.socketChannel.get();
            try {
                if (socketChannel != null) {
                    try {
                        socketChannel.close();
                        this.socketChannel.set(null);
                    } catch (IOException e) {
                        log.error("Ignoring exception during close: " + e, e);
                        this.socketChannel.set(null);
                    }
                }
            } catch (Throwable th) {
                this.socketChannel.set(null);
                throw th;
            }
        } finally {
            andSet.shutdownNow();
            this.addr.set(null);
        }
    }

    public Future<Void> send(ByteBuffer byteBuffer) {
        if (byteBuffer == null) {
            throw new IllegalArgumentException();
        }
        if (byteBuffer.remaining() == 0) {
            throw new IllegalArgumentException();
        }
        ExecutorService executorService = this.executorRef.get();
        if (executorService == null) {
            throw new IllegalStateException();
        }
        if (log.isTraceEnabled()) {
            log.trace("Will send " + byteBuffer.remaining() + " bytes");
        }
        synchronized (this.socketChannel) {
            if (this.socketChannel.get() == null) {
                try {
                    this.socketChannel.set(openChannel(this.addr.get()));
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }
        return executorService.submit(newIncSendTask(byteBuffer.asReadOnlyBuffer()));
    }

    protected Callable<Void> newIncSendTask(ByteBuffer byteBuffer) {
        return new IncSendTask(this.socketChannel.get(), byteBuffer);
    }

    protected static SocketChannel openChannel(InetSocketAddress inetSocketAddress) throws IOException {
        SocketChannel open = SocketChannel.open();
        try {
            open.configureBlocking(true);
            if (log.isTraceEnabled()) {
                log.trace("Connecting to " + inetSocketAddress);
            }
            open.connect(inetSocketAddress);
            open.finishConnect();
            return open;
        } catch (IOException e) {
            log.error(e);
            throw e;
        }
    }
}
