package org.apache.hadoop.hive.llap;

import com.google.common.base.Preconditions;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
import org.apache.hadoop.hive.llap.security.SecretManager;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.util.StringUtils;
import org.apache.hudi.org.apache.hadoop.hive.conf.HiveConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/hive/llap/LlapOutputFormatService.class */
public class LlapOutputFormatService {
    private static final Logger LOG = LoggerFactory.getLogger(LlapOutputFormat.class);
    private static final AtomicBoolean started = new AtomicBoolean(false);
    private static final AtomicBoolean initing = new AtomicBoolean(false);
    private static LlapOutputFormatService INSTANCE;
    private final Object lock = new Object();
    private final Map<String, RecordWriter<?, ?>> writers = new HashMap();
    private final Map<String, String> errors = new HashMap();
    private final Configuration conf;
    private static final int WAIT_TIME = 5;
    private EventLoopGroup eventLoopGroup;
    private ServerBootstrap serverBootstrap;
    private ChannelFuture listeningChannelFuture;
    private int port;
    private final SecretManager sm;
    private final long writerTimeoutMs;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/hadoop/hive/llap/LlapOutputFormatService$LlapOutputFormatChannelCloseListener.class */
    public class LlapOutputFormatChannelCloseListener implements ChannelFutureListener {
        private String id;

        LlapOutputFormatChannelCloseListener(String str) {
            this.id = str;
        }

        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            RecordWriter recordWriter;
            synchronized (LlapOutputFormatService.INSTANCE) {
                recordWriter = (RecordWriter) LlapOutputFormatService.this.writers.remove(this.id);
            }
            if (recordWriter == null) {
                LlapOutputFormatService.LOG.warn("Did not find a writer for ID " + this.id);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/hadoop/hive/llap/LlapOutputFormatService$LlapOutputFormatServiceChannelHandler.class */
    public class LlapOutputFormatServiceChannelHandler extends ChannelInitializer<SocketChannel> {
        private final int sendBufferSize;

        public LlapOutputFormatServiceChannelHandler(int i) {
            this.sendBufferSize = i;
        }

        public void initChannel(SocketChannel socketChannel) throws Exception {
            socketChannel.pipeline().addLast(new ChannelHandler[]{new ProtobufVarint32FrameDecoder(), new ProtobufDecoder(LlapDaemonProtocolProtos.LlapOutputSocketInitMessage.getDefaultInstance()), new StringEncoder(), new LlapOutputFormatServiceHandler(this.sendBufferSize)});
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/hadoop/hive/llap/LlapOutputFormatService$LlapOutputFormatServiceHandler.class */
    public class LlapOutputFormatServiceHandler extends SimpleChannelInboundHandler<LlapDaemonProtocolProtos.LlapOutputSocketInitMessage> {
        private final int sendBufferSize;

        public LlapOutputFormatServiceHandler(int i) {
            this.sendBufferSize = i;
        }

        public void channelRead0(ChannelHandlerContext channelHandlerContext, LlapDaemonProtocolProtos.LlapOutputSocketInitMessage llapOutputSocketInitMessage) {
            String fragmentId = llapOutputSocketInitMessage.getFragmentId();
            try {
                registerReader(channelHandlerContext, fragmentId, llapOutputSocketInitMessage.hasToken() ? llapOutputSocketInitMessage.getToken().toByteArray() : null);
            } catch (Throwable th) {
                failChannel(channelHandlerContext, fragmentId, StringUtils.stringifyException(th));
            }
        }

        private void registerReader(ChannelHandlerContext channelHandlerContext, String str, byte[] bArr) {
            if (LlapOutputFormatService.this.sm != null) {
                try {
                    LlapOutputFormatService.this.sm.verifyToken(bArr);
                } catch (IOException | SecurityException e) {
                    failChannel(channelHandlerContext, str, e.getMessage());
                    return;
                }
            }
            LlapOutputFormatService.LOG.debug("registering socket for: " + str);
            LlapRecordWriter llapRecordWriter = new LlapRecordWriter(new ChannelOutputStream(channelHandlerContext, str, this.sendBufferSize, HiveConf.getIntVar(LlapOutputFormatService.this.conf, HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_MAX_PENDING_WRITES)));
            boolean z = true;
            synchronized (LlapOutputFormatService.this.lock) {
                if (!LlapOutputFormatService.this.writers.containsKey(str)) {
                    z = false;
                    LlapOutputFormatService.this.writers.put(str, llapRecordWriter);
                    channelHandlerContext.channel().closeFuture().addListener(new LlapOutputFormatChannelCloseListener(str));
                    LlapOutputFormatService.this.lock.notifyAll();
                }
            }
            if (z) {
                failChannel(channelHandlerContext, str, "Writer already registered for " + str);
            }
        }

        private void failChannel(ChannelHandlerContext channelHandlerContext, String str, String str2) {
            channelHandlerContext.close();
            synchronized (LlapOutputFormatService.this.lock) {
                LlapOutputFormatService.this.errors.put(str, str2);
                LlapOutputFormatService.this.lock.notifyAll();
            }
            LlapOutputFormatService.LOG.error(str2);
        }
    }

    private LlapOutputFormatService(Configuration configuration, SecretManager secretManager) throws IOException {
        this.sm = secretManager;
        this.conf = configuration;
        this.writerTimeoutMs = HiveConf.getTimeVar(configuration, HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_STREAM_TIMEOUT, TimeUnit.MILLISECONDS);
    }

    public static void initializeAndStart(Configuration configuration, SecretManager secretManager) throws Exception {
        if (initing.getAndSet(true)) {
            return;
        }
        INSTANCE = new LlapOutputFormatService(configuration, secretManager);
        INSTANCE.start();
        started.set(true);
    }

    public static LlapOutputFormatService get() throws IOException {
        Preconditions.checkState(started.get(), "LlapOutputFormatService must be started before invoking get");
        return INSTANCE;
    }

    public void start() throws IOException {
        LOG.info("Starting LlapOutputFormatService");
        int intVar = HiveConf.getIntVar(this.conf, HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_PORT);
        int intVar2 = HiveConf.getIntVar(this.conf, HiveConf.ConfVars.LLAP_DAEMON_OUTPUT_SERVICE_SEND_BUFFER_SIZE);
        this.eventLoopGroup = new NioEventLoopGroup(1);
        this.serverBootstrap = new ServerBootstrap();
        this.serverBootstrap.group(this.eventLoopGroup);
        this.serverBootstrap.channel(NioServerSocketChannel.class);
        this.serverBootstrap.childHandler(new LlapOutputFormatServiceChannelHandler(intVar2));
        try {
            this.listeningChannelFuture = this.serverBootstrap.bind(intVar).sync();
            this.port = ((InetSocketAddress) this.listeningChannelFuture.channel().localAddress()).getPort();
            LOG.info("LlapOutputFormatService: Binding to port: {} with send buffer size: {} ", Integer.valueOf(this.port), Integer.valueOf(intVar2));
        } catch (InterruptedException e) {
            throw new IOException("LlapOutputFormatService: Error binding to port " + intVar, e);
        }
    }

    public void stop() throws IOException, InterruptedException {
        LOG.info("Stopping LlapOutputFormatService");
        if (this.listeningChannelFuture != null) {
            this.listeningChannelFuture.channel().close().sync();
            this.listeningChannelFuture = null;
        } else {
            LOG.warn("LlapOutputFormatService does not appear to have a listening port to close.");
        }
        this.eventLoopGroup.shutdownGracefully(1L, 5L, TimeUnit.SECONDS).sync();
    }

    /* JADX WARN: Multi-variable type inference failed */
    public <K, V> RecordWriter<K, V> getWriter(String str) throws IOException, InterruptedException {
        RecordWriter<?, ?> recordWriter;
        synchronized (this.lock) {
            long nanoTime = System.nanoTime();
            boolean z = true;
            while (true) {
                recordWriter = this.writers.get(str);
                if (recordWriter == null) {
                    String remove = this.errors.remove(str);
                    if (remove != null) {
                        throw new IOException(remove);
                    }
                    if (z) {
                        LOG.info("Waiting for writer for " + str);
                        z = false;
                    }
                    if ((System.nanoTime() - nanoTime) / 1000000 > this.writerTimeoutMs) {
                        throw new IOException("The writer for " + str + " has timed out after " + this.writerTimeoutMs + "ms");
                    }
                    this.lock.wait(this.writerTimeoutMs);
                }
            }
        }
        LOG.info("Returning writer for: " + str);
        return recordWriter;
    }

    public int getPort() {
        return this.port;
    }
}
