package com.cinchapi.concourse.server.plugin.io;

import com.cinchapi.common.base.CheckedExceptions;
import com.cinchapi.concourse.server.plugin.concurrent.FileLocks;
import com.cinchapi.concourse.util.FileOps;
import com.cinchapi.concourse.util.Strings;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

/* loaded from: input_file:com/cinchapi/concourse/server/plugin/io/MessageQueue.class */
public class MessageQueue implements InterProcessCommunication, AutoCloseable {
    private static final String SOCKET_HOST = "localhost";
    private final FileChannel metadata;
    private final int port;
    private final ServerSocketChannel channel;
    private final Thread acceptor;
    private final BlockingQueue<ByteBuffer> messages;
    private final Map<Integer, SocketChannel> readers;

    public MessageQueue() {
        this(FileOps.tempFile("con", ".mq"));
    }

    public MessageQueue(String str) {
        this.readers = Maps.newHashMap();
        try {
            this.metadata = FileChannel.open(Paths.get(str, new String[0]).toAbsolutePath(), StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
            this.messages = new LinkedBlockingQueue();
            this.channel = ServerSocketChannel.open();
            Selector open = Selector.open();
            this.channel.bind((SocketAddress) new InetSocketAddress(SOCKET_HOST, 0));
            this.port = ((InetSocketAddress) this.channel.getLocalAddress()).getPort();
            this.channel.configureBlocking(false);
            this.channel.register(open, this.channel.validOps());
            register();
            this.acceptor = new Thread(() -> {
                while (true) {
                    try {
                        open.select();
                        Iterator<SelectionKey> it = open.selectedKeys().iterator();
                        while (it.hasNext()) {
                            SelectionKey next = it.next();
                            if (next.isAcceptable()) {
                                SocketChannel accept = this.channel.accept();
                                accept.configureBlocking(false);
                                accept.register(open, 1);
                            }
                            if (next.isReadable()) {
                                SocketChannel socketChannel = (SocketChannel) next.channel();
                                ByteBuffer allocate = ByteBuffer.allocate(4);
                                socketChannel.read(allocate);
                                allocate.flip();
                                if (allocate.limit() > 0) {
                                    ByteBuffer allocate2 = ByteBuffer.allocate(allocate.getInt());
                                    while (allocate2.hasRemaining()) {
                                        socketChannel.read(allocate2);
                                    }
                                    allocate2.flip();
                                    this.messages.add(allocate2);
                                }
                            }
                            it.remove();
                        }
                    } catch (ClosedByInterruptException e) {
                    } catch (IOException e2) {
                        throw CheckedExceptions.throwAsRuntimeException(e2);
                    }
                }
            });
            this.acceptor.setDaemon(true);
            this.acceptor.setUncaughtExceptionHandler((thread, th) -> {
                new RuntimeException(Strings.format("Uncaught exception in Thread {}: {}", new Object[]{thread, th}), th).printStackTrace();
            });
            this.acceptor.start();
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                try {
                    close();
                } catch (Exception e) {
                    throw CheckedExceptions.throwAsRuntimeException(e);
                }
            }));
        } catch (IOException e) {
            throw CheckedExceptions.throwAsRuntimeException(e);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.acceptor.interrupt();
        this.channel.close();
        Iterator<Map.Entry<Integer, SocketChannel>> it = this.readers.entrySet().iterator();
        while (it.hasNext()) {
            it.next().getValue().close();
        }
    }

    @Override // com.cinchapi.concourse.server.plugin.io.InterProcessCommunication
    public void compact() {
    }

    @Override // com.cinchapi.concourse.server.plugin.io.InterProcessCommunication
    public ByteBuffer read() {
        try {
            return this.messages.take();
        } catch (InterruptedException e) {
            throw CheckedExceptions.throwAsRuntimeException(e);
        }
    }

    @Override // com.cinchapi.concourse.server.plugin.io.InterProcessCommunication
    public InterProcessCommunication write(ByteBuffer byteBuffer) {
        FileLock lock = lock();
        try {
            try {
                ByteBuffer allocate = ByteBuffer.allocate((int) this.metadata.size());
                this.metadata.position(0L);
                this.metadata.read(allocate);
                allocate.flip();
                while (allocate.hasRemaining()) {
                    int i = allocate.getInt();
                    SocketChannel socketChannel = this.readers.get(Integer.valueOf(i));
                    if (socketChannel == null) {
                        socketChannel = SocketChannel.open(new InetSocketAddress(SOCKET_HOST, i));
                        this.readers.put(Integer.valueOf(i), socketChannel);
                    }
                    ByteBuffer putInt = ByteBuffer.allocate(4).putInt(byteBuffer.capacity());
                    putInt.flip();
                    socketChannel.write(putInt);
                    while (byteBuffer.hasRemaining()) {
                        socketChannel.write(byteBuffer);
                    }
                    byteBuffer.flip();
                }
                return this;
            } catch (IOException e) {
                throw CheckedExceptions.throwAsRuntimeException(e);
            }
        } finally {
            FileLocks.release(lock);
        }
    }

    private FileLock lock() {
        return FileLocks.lock(this.metadata, 0L, Long.MAX_VALUE, false);
    }

    private void register() {
        FileLock lock = lock();
        try {
            try {
                ByteBuffer allocate = ByteBuffer.allocate(4);
                allocate.putInt(this.port);
                allocate.flip();
                this.metadata.write(allocate, this.metadata.size());
                this.metadata.force(true);
                FileLocks.release(lock);
            } catch (IOException e) {
                throw CheckedExceptions.throwAsRuntimeException(e);
            }
        } catch (Throwable th) {
            FileLocks.release(lock);
            throw th;
        }
    }

    public String toString() {
        return Strings.format("MessageQueue[pending = {}]", new Object[]{this.messages});
    }
}
