package org.apache.inlong.dataproxy.source;

import com.google.common.base.Preconditions;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Constructor;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.concurrent.Executors;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.FlumeException;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.conf.Configurables;
import org.apache.flume.source.AbstractSource;
import org.apache.inlong.dataproxy.base.NamedThreadFactory;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.util.ThreadNameDeterminer;
import org.jboss.netty.util.ThreadRenamingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/dataproxy/source/SimpleTcpSource.class */
public class SimpleTcpSource extends AbstractSource implements Configurable, EventDrivenSource {
    private static final Logger logger = LoggerFactory.getLogger(SimpleTcpSource.class);
    public static ArrayList<String> blacklist = new ArrayList<>();
    private static final String blacklistFilePath = "blacklist.properties";
    private static long propsLastModified;
    private static final String CONNECTIONS = "connections";
    protected int port;
    protected String msgFactoryName;
    protected String serviceDecoderName;
    protected String messageHandlerName;
    protected int maxMsgLength;
    protected boolean isCompressed;
    private CheckBlackListThread checkBlackListThread;
    private int receiveBufferSize;
    private int highWaterMark;
    private int sendBufferSize;
    private int trafficClass;
    protected String topic;
    protected String attr;
    protected boolean filterEmptyMsg;
    protected int maxConnections = Integer.MAX_VALUE;
    private ServerBootstrap serverBootstrap = null;
    protected String host = null;
    private int maxThreads = 32;
    private boolean tcpNoDelay = true;
    private boolean keepAlive = true;
    private Channel nettyChannel = null;
    protected ChannelGroup allChannels = new DefaultChannelGroup();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/inlong/dataproxy/source/SimpleTcpSource$CheckBlackListThread.class */
    public class CheckBlackListThread extends Thread {
        private boolean shutdown;

        private CheckBlackListThread() {
            this.shutdown = false;
        }

        public void shutdouwn() {
            this.shutdown = true;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            SimpleTcpSource.logger.info("CheckBlackListThread thread {} start.", Thread.currentThread().getName());
            while (!this.shutdown) {
                try {
                    File file = new File("conf/blacklist.properties");
                    if (file.lastModified() > SimpleTcpSource.propsLastModified) {
                        SimpleTcpSource.blacklist = SimpleTcpSource.this.load(SimpleTcpSource.blacklistFilePath);
                        long unused = SimpleTcpSource.propsLastModified = file.lastModified();
                        SimpleTcpSource.logger.info("blacklist.properties:{}\n{}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(file.lastModified())), SimpleTcpSource.blacklist);
                    }
                    Thread.sleep(5000L);
                    SimpleTcpSource.checkBlackList(SimpleTcpSource.blacklist, SimpleTcpSource.this.allChannels);
                } catch (InterruptedException e) {
                    SimpleTcpSource.logger.info("ConfigReloader thread exit!");
                    return;
                } catch (Throwable th) {
                    SimpleTcpSource.logger.error("ConfigReloader thread error!", th);
                }
            }
        }
    }

    public static void checkBlackList(ArrayList arrayList, ChannelGroup channelGroup) {
        if (arrayList != null) {
            Iterator it = channelGroup.iterator();
            while (it.hasNext()) {
                Channel channel = (Channel) it.next();
                String str = null;
                SocketAddress remoteAddress = channel.getRemoteAddress();
                if (null != remoteAddress) {
                    str = remoteAddress.toString();
                    try {
                        str = str.substring(1, str.indexOf(58));
                    } catch (Exception e) {
                        logger.warn("fail to get the remote IP, and strIP={},remoteSocketAddress={}", str, remoteAddress);
                    }
                }
                if (str != null && arrayList.contains(str)) {
                    logger.error(str + " is in blacklist, so disconnect it !");
                    channel.disconnect();
                    channel.close();
                    channelGroup.remove(channel);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ArrayList<String> load(String str) {
        ArrayList<String> arrayList = new ArrayList<>();
        if (str == null) {
            logger.error("fail to loadProperties, filename is null");
            return arrayList;
        }
        FileReader fileReader = null;
        BufferedReader bufferedReader = null;
        try {
            try {
                fileReader = new FileReader("conf/" + str);
                bufferedReader = new BufferedReader(fileReader);
                while (true) {
                    String readLine = bufferedReader.readLine();
                    if (readLine == null) {
                        break;
                    }
                    arrayList.add(readLine);
                }
                IOUtils.closeQuietly(fileReader);
                IOUtils.closeQuietly(bufferedReader);
            } catch (UnsupportedEncodingException e) {
                logger.error("fail to loadPropery, file ={}, and e= {}", str, e);
                IOUtils.closeQuietly(fileReader);
                IOUtils.closeQuietly(bufferedReader);
            } catch (Exception e2) {
                logger.error("fail to loadProperty, file ={}, and e= {}", str, e2);
                IOUtils.closeQuietly(fileReader);
                IOUtils.closeQuietly(bufferedReader);
            }
            return arrayList;
        } catch (Throwable th) {
            IOUtils.closeQuietly(fileReader);
            IOUtils.closeQuietly(bufferedReader);
            throw th;
        }
    }

    public synchronized void start() {
        logger.info("start " + getName());
        this.checkBlackListThread = new CheckBlackListThread();
        this.checkBlackListThread.start();
        super.start();
        ThreadRenamingRunnable.setThreadNameDeterminer(ThreadNameDeterminer.CURRENT);
        NioServerSocketChannelFactory nioServerSocketChannelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(new NamedThreadFactory("tcpSource-nettyBoss-threadGroup")), 1, Executors.newCachedThreadPool(new NamedThreadFactory("tcpSource-nettyWorker-threadGroup")), this.maxThreads);
        logger.info("Set max workers : {} ;", Integer.valueOf(this.maxThreads));
        this.serverBootstrap = new ServerBootstrap(nioServerSocketChannelFactory);
        this.serverBootstrap.setOption("child.tcpNoDelay", Boolean.valueOf(this.tcpNoDelay));
        this.serverBootstrap.setOption("child.keepAlive", Boolean.valueOf(this.keepAlive));
        this.serverBootstrap.setOption("child.receiveBufferSize", Integer.valueOf(this.receiveBufferSize));
        this.serverBootstrap.setOption("child.sendBufferSize", Integer.valueOf(this.sendBufferSize));
        this.serverBootstrap.setOption("child.trafficClass", Integer.valueOf(this.trafficClass));
        this.serverBootstrap.setOption("child.writeBufferHighWaterMark", Integer.valueOf(this.highWaterMark));
        logger.info("load msgFactory=" + this.msgFactoryName + " and serviceDecoderName=" + this.serviceDecoderName);
        try {
            ServiceDecoder serviceDecoder = (ServiceDecoder) Class.forName(this.serviceDecoderName).newInstance();
            Constructor<?> constructor = Class.forName(this.msgFactoryName).getConstructor(ChannelProcessor.class, ChannelGroup.class, String.class, ServiceDecoder.class, String.class, Integer.class, String.class, String.class, Boolean.class, Integer.class, Boolean.class, String.class);
            logger.info("Using channel processor:{}", getChannelProcessor().getClass().getName());
            this.serverBootstrap.setPipelineFactory((ChannelPipelineFactory) constructor.newInstance(getChannelProcessor(), this.allChannels, ConfigConstants.TCP_PROTOCOL, serviceDecoder, this.messageHandlerName, Integer.valueOf(this.maxMsgLength), this.topic, this.attr, Boolean.valueOf(this.filterEmptyMsg), Integer.valueOf(this.maxConnections), Boolean.valueOf(this.isCompressed), getName()));
            try {
                if (this.host == null) {
                    this.nettyChannel = this.serverBootstrap.bind(new InetSocketAddress(this.port));
                } else {
                    this.nettyChannel = this.serverBootstrap.bind(new InetSocketAddress(this.host, this.port));
                }
            } catch (Exception e) {
                logger.error("Simple TCP Source error bind host {} port {},program will exit!", this.host, Integer.valueOf(this.port));
                System.exit(-1);
            }
            this.allChannels.add(this.nettyChannel);
            logger.info("Simple TCP Source started at host {}, port {}", this.host, Integer.valueOf(this.port));
        } catch (Exception e2) {
            logger.error("Simple Tcp Source start error, fail to construct ChannelPipelineFactory with name {}, ex {}", this.msgFactoryName, e2);
            stop();
            throw new FlumeException(e2.getMessage());
        }
    }

    public synchronized void stop() {
        logger.info("[STOP SOURCE]{} stopping...", super.getName());
        this.checkBlackListThread.shutdouwn();
        if (this.allChannels != null) {
            try {
            } catch (Exception e) {
                logger.warn("Simple TCP Source netty server stop ex", e);
            } finally {
                this.allChannels.clear();
            }
            if (!this.allChannels.isEmpty()) {
                this.allChannels.unbind().awaitUninterruptibly();
                this.allChannels.close().awaitUninterruptibly();
            }
        }
        try {
        } catch (Exception e2) {
            logger.warn("Simple TCP Source serverBootstrap stop ex ", e2);
        } finally {
            this.serverBootstrap = null;
        }
        if (this.serverBootstrap != null) {
            this.serverBootstrap.releaseExternalResources();
        }
        super.stop();
        logger.info("[STOP SOURCE]{} stopped", super.getName());
    }

    public void configure(Context context) {
        logger.info("context is {}", context);
        this.port = context.getInteger(ConfigConstants.CONFIG_PORT).intValue();
        this.host = context.getString(ConfigConstants.CONFIG_HOST, "0.0.0.0");
        this.tcpNoDelay = context.getBoolean(ConfigConstants.TCP_NO_DELAY, true).booleanValue();
        this.keepAlive = context.getBoolean(ConfigConstants.KEEP_ALIVE, true).booleanValue();
        this.highWaterMark = context.getInteger(ConfigConstants.HIGH_WATER_MARK, 65536).intValue();
        this.receiveBufferSize = context.getInteger(ConfigConstants.RECEIVE_BUFFER_SIZE, 65536).intValue();
        if (this.receiveBufferSize > 16777216) {
            this.receiveBufferSize = 16777216;
        }
        Preconditions.checkArgument(this.receiveBufferSize > 0, "receiveBufferSize must be > 0");
        this.sendBufferSize = context.getInteger(ConfigConstants.SEND_BUFFER_SIZE, 65536).intValue();
        if (this.sendBufferSize > 16777216) {
            this.sendBufferSize = 16777216;
        }
        Preconditions.checkArgument(this.sendBufferSize > 0, "sendBufferSize must be > 0");
        this.trafficClass = context.getInteger(ConfigConstants.TRAFFIC_CLASS, 0).intValue();
        Preconditions.checkArgument(this.trafficClass == 0 || this.trafficClass == 96, "trafficClass must be == 0 or == 96");
        try {
            this.maxThreads = context.getInteger(ConfigConstants.MAX_THREADS, 32).intValue();
        } catch (NumberFormatException e) {
            logger.warn("Simple TCP Source max-threads property must specify an integer value. {}", context.getString(ConfigConstants.MAX_THREADS));
        }
        try {
            this.maxConnections = context.getInteger(CONNECTIONS, 5000).intValue();
        } catch (NumberFormatException e2) {
            logger.warn("BaseSource's \"connections\" property must specify an integer value.", context.getString(CONNECTIONS));
        }
        this.topic = context.getString("topic");
        this.attr = context.getString(ConfigConstants.ATTR);
        Configurables.ensureRequiredNonNull(context, new String[]{"topic", ConfigConstants.ATTR});
        this.topic = this.topic.trim();
        Preconditions.checkArgument(!this.topic.isEmpty(), "topic is empty");
        this.attr = this.attr.trim();
        Preconditions.checkArgument(!this.attr.isEmpty(), "attr is empty");
        this.filterEmptyMsg = context.getBoolean(ConfigConstants.FILTER_EMPTY_MSG, false).booleanValue();
        this.msgFactoryName = context.getString(ConfigConstants.MSG_FACTORY_NAME, "org.apache.inlong.dataproxy.source.ServerMessageFactory");
        this.msgFactoryName = this.msgFactoryName.trim();
        Preconditions.checkArgument(StringUtils.isNotBlank(this.msgFactoryName), "msgFactoryName is empty");
        this.serviceDecoderName = context.getString(ConfigConstants.SERVICE_PROCESSOR_NAME, "org.apache.inlong.dataproxy.source.DefaultServiceDecoder");
        this.serviceDecoderName = this.serviceDecoderName.trim();
        Preconditions.checkArgument(StringUtils.isNotBlank(this.serviceDecoderName), "serviceProcessorName is empty");
        this.messageHandlerName = context.getString(ConfigConstants.MESSAGE_HANDLER_NAME, "org.apache.inlong.dataproxy.source.ServerMessageHandler");
        this.messageHandlerName = this.messageHandlerName.trim();
        Preconditions.checkArgument(StringUtils.isNotBlank(this.messageHandlerName), "messageHandlerName is empty");
        this.maxMsgLength = context.getInteger(ConfigConstants.MAX_MSG_LENGTH, 65536).intValue();
        Preconditions.checkArgument(this.maxMsgLength >= 4 && this.maxMsgLength <= 20971520, "maxMsgLength must be >= 4 and <= 20971520");
        this.isCompressed = context.getBoolean(ConfigConstants.MSG_COMPRESSED, true).booleanValue();
    }
}
