package org.apache.flume.source;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.io.Reader;
import java.io.Writer;
import java.net.InetSocketAddress;
import java.nio.CharBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flume.Context;
import org.apache.flume.CounterGroup;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.Source;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.conf.Configurables;
import org.apache.flume.event.EventBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flume/source/NetcatSource.class */
public class NetcatSource extends AbstractSource implements Configurable, EventDrivenSource {
    private static final Logger logger = LoggerFactory.getLogger(NetcatSource.class);
    private String hostName;
    private ServerSocketChannel serverSocket;
    private Thread acceptThread;
    private ExecutorService handlerService;
    private int port = 0;
    private CounterGroup counterGroup = new CounterGroup();
    private AtomicBoolean acceptThreadShouldStop = new AtomicBoolean();

    /* loaded from: input_file:org/apache/flume/source/NetcatSource$AcceptHandler.class */
    public static class AcceptHandler implements Runnable {
        private ServerSocketChannel serverSocket;
        private CounterGroup counterGroup;
        private ExecutorService handlerService;
        private EventDrivenSource source;
        private AtomicBoolean shouldStop;

        @Override // java.lang.Runnable
        public void run() {
            NetcatSource.logger.debug("Starting accept handler");
            while (!this.shouldStop.get()) {
                try {
                    SocketChannel accept = this.serverSocket.accept();
                    NetcatSocketHandler netcatSocketHandler = new NetcatSocketHandler();
                    netcatSocketHandler.socketChannel = accept;
                    netcatSocketHandler.counterGroup = this.counterGroup;
                    netcatSocketHandler.source = this.source;
                    this.handlerService.submit(netcatSocketHandler);
                    this.counterGroup.incrementAndGet("accept.succeeded");
                } catch (ClosedByInterruptException e) {
                } catch (IOException e2) {
                    NetcatSource.logger.error("Unable to accept connection. Exception follows.", e2);
                    this.counterGroup.incrementAndGet("accept.failed");
                }
            }
            NetcatSource.logger.debug("Accept handler exiting");
        }
    }

    /* loaded from: input_file:org/apache/flume/source/NetcatSource$NetcatSocketHandler.class */
    public static class NetcatSocketHandler implements Runnable {
        private Source source;
        private CounterGroup counterGroup;
        private SocketChannel socketChannel;

        @Override // java.lang.Runnable
        public void run() {
            try {
                Reader newReader = Channels.newReader(this.socketChannel, "utf-8");
                Writer newWriter = Channels.newWriter(this.socketChannel, "utf-8");
                CharBuffer allocate = CharBuffer.allocate(512);
                StringBuilder sb = new StringBuilder();
                while (newReader.read(allocate) != -1) {
                    allocate.flip();
                    NetcatSource.logger.debug("read {} characters", Integer.valueOf(allocate.remaining()));
                    this.counterGroup.addAndGet("characters.received", Long.valueOf(allocate.limit()));
                    sb.append(allocate.array(), allocate.position(), allocate.length());
                }
                if (sb.charAt(sb.length() - 1) == '\n') {
                    sb.deleteCharAt(sb.length() - 1);
                }
                Event withBody = EventBuilder.withBody(sb.toString().getBytes());
                Exception exc = null;
                Transaction transaction = this.source.getChannel().getTransaction();
                try {
                    try {
                        transaction.begin();
                        this.source.getChannel().put(withBody);
                        transaction.commit();
                        transaction.close();
                    } catch (Exception e) {
                        exc = e;
                        transaction.rollback();
                        transaction.close();
                    }
                    if (exc == null) {
                        newWriter.append((CharSequence) "OK\n");
                    } else {
                        newWriter.append((CharSequence) ("FAILED: " + exc.getMessage() + "\n"));
                    }
                    this.socketChannel.close();
                    this.counterGroup.incrementAndGet("events.success");
                } catch (Throwable th) {
                    transaction.close();
                    throw th;
                }
            } catch (IOException e2) {
                this.counterGroup.incrementAndGet("events.failed");
            }
        }
    }

    @Override // org.apache.flume.conf.Configurable
    public void configure(Context context) {
        Configurables.ensureRequiredNonNull(context, "bind", "port");
        this.hostName = (String) context.get("bind", String.class);
        this.port = Integer.parseInt((String) context.get("port", String.class));
    }

    @Override // org.apache.flume.source.AbstractSource, org.apache.flume.lifecycle.LifecycleAware
    public void start() {
        logger.info("Source starting");
        super.start();
        this.counterGroup.incrementAndGet("open.attempts");
        this.handlerService = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("netcat-handler-%d").build());
        try {
            InetSocketAddress inetSocketAddress = new InetSocketAddress(this.hostName, this.port);
            this.serverSocket = ServerSocketChannel.open();
            this.serverSocket.socket().setReuseAddress(true);
            this.serverSocket.socket().bind(inetSocketAddress);
            logger.info("Created serverSocket:{}", this.serverSocket);
            AcceptHandler acceptHandler = new AcceptHandler();
            acceptHandler.counterGroup = this.counterGroup;
            acceptHandler.handlerService = this.handlerService;
            acceptHandler.shouldStop = this.acceptThreadShouldStop;
            acceptHandler.source = this;
            acceptHandler.serverSocket = this.serverSocket;
            this.acceptThread = new Thread(acceptHandler);
            this.acceptThread.start();
            logger.debug("Source started");
        } catch (IOException e) {
            this.counterGroup.incrementAndGet("open.errors");
            logger.error("Unable to bind to socket. Exception follows.", e);
        }
    }

    @Override // org.apache.flume.source.AbstractSource, org.apache.flume.lifecycle.LifecycleAware
    public void stop() {
        logger.info("Source stopping");
        super.stop();
        this.acceptThreadShouldStop.set(true);
        if (this.acceptThread != null) {
            logger.debug("Stopping accept handler thread");
            while (this.acceptThread.isAlive()) {
                try {
                    logger.debug("Waiting for accept handler to finish");
                    this.acceptThread.interrupt();
                    this.acceptThread.join(500L);
                } catch (InterruptedException e) {
                    logger.debug("Interrupted while waiting for accept handler to finish");
                    Thread.currentThread().interrupt();
                }
            }
            logger.debug("Stopped accept handler thread");
        }
        if (this.serverSocket != null) {
            try {
                this.serverSocket.close();
            } catch (IOException e2) {
                logger.error("Unable to close socket. Exception follows.", e2);
                return;
            }
        }
        if (this.handlerService != null) {
            this.handlerService.shutdown();
            while (!this.handlerService.isTerminated()) {
                logger.debug("Waiting for handler service to stop");
                try {
                    this.handlerService.awaitTermination(500L, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e3) {
                    logger.debug("Interrupted while waiting for netcat handler service to stop");
                    this.handlerService.shutdownNow();
                    Thread.currentThread().interrupt();
                }
            }
            logger.debug("Handler service stopped");
        }
        logger.debug("Source stopped. Event metrics:{}", this.counterGroup);
    }
}
