package org.apache.hedwig.server.proxy;

import java.io.File;
import java.lang.Thread;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.hedwig.client.HedwigClient;
import org.apache.hedwig.protocol.PubSubProtocol;
import org.apache.hedwig.server.common.TerminateJVMExceptionHandler;
import org.apache.hedwig.server.handlers.ChannelDisconnectListener;
import org.apache.hedwig.server.handlers.Handler;
import org.apache.hedwig.server.netty.PubSubServer;
import org.apache.hedwig.server.netty.PubSubServerPipelineFactory;
import org.apache.hedwig.server.netty.UmbrellaHandler;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.ServerSocketChannelFactory;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.logging.Log4JLoggerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hedwig/server/proxy/HedwigProxy.class */
public class HedwigProxy {
    static final Logger logger = LoggerFactory.getLogger(HedwigProxy.class);
    HedwigClient client;
    ServerSocketChannelFactory serverSocketChannelFactory;
    ChannelGroup allChannels;
    Map<PubSubProtocol.OperationType, Handler> handlers;
    ProxyConfiguration cfg;
    ChannelTracker tracker;
    ThreadGroup tg;

    public HedwigProxy(ProxyConfiguration proxyConfiguration, final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
        this.cfg = proxyConfiguration;
        this.tg = new ThreadGroup("hedwigproxy") { // from class: org.apache.hedwig.server.proxy.HedwigProxy.1
            @Override // java.lang.ThreadGroup, java.lang.Thread.UncaughtExceptionHandler
            public void uncaughtException(Thread thread, Throwable th) {
                uncaughtExceptionHandler.uncaughtException(thread, th);
            }
        };
    }

    public HedwigProxy(ProxyConfiguration proxyConfiguration) throws InterruptedException {
        this(proxyConfiguration, new TerminateJVMExceptionHandler());
    }

    public void start() throws InterruptedException {
        final LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        new Thread(this.tg, new Runnable() { // from class: org.apache.hedwig.server.proxy.HedwigProxy.2
            @Override // java.lang.Runnable
            public void run() {
                HedwigProxy.this.client = new HedwigClient(HedwigProxy.this.cfg);
                HedwigProxy.this.serverSocketChannelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
                HedwigProxy.this.initializeHandlers();
                HedwigProxy.this.initializeNetty();
                linkedBlockingQueue.offer(true);
            }
        }).start();
        linkedBlockingQueue.take();
    }

    public ChannelTracker getChannelTracker() {
        return this.tracker;
    }

    protected void initializeHandlers() {
        this.handlers = new HashMap();
        this.tracker = new ChannelTracker(this.client.getSubscriber());
        this.handlers.put(PubSubProtocol.OperationType.PUBLISH, new ProxyPublishHander(this.client.getPublisher()));
        this.handlers.put(PubSubProtocol.OperationType.SUBSCRIBE, new ProxySubscribeHandler(this.client.getSubscriber(), this.tracker));
        this.handlers.put(PubSubProtocol.OperationType.UNSUBSCRIBE, new ProxyUnsubscribeHandler(this.client.getSubscriber(), this.tracker));
        this.handlers.put(PubSubProtocol.OperationType.CONSUME, new ProxyConsumeHandler(this.client.getSubscriber()));
        this.handlers.put(PubSubProtocol.OperationType.STOP_DELIVERY, new ProxyStopDeliveryHandler(this.client.getSubscriber(), this.tracker));
        this.handlers.put(PubSubProtocol.OperationType.START_DELIVERY, new ProxyStartDeliveryHandler(this.client.getSubscriber(), this.tracker));
        this.handlers.put(PubSubProtocol.OperationType.CLOSESUBSCRIPTION, new ProxyCloseSubscriptionHandler(this.client.getSubscriber(), this.tracker));
    }

    protected void initializeNetty() {
        InternalLoggerFactory.setDefaultFactory(new Log4JLoggerFactory());
        this.allChannels = new DefaultChannelGroup("hedwigproxy");
        ServerBootstrap serverBootstrap = new ServerBootstrap(this.serverSocketChannelFactory);
        serverBootstrap.setPipelineFactory(new PubSubServerPipelineFactory(new UmbrellaHandler(this.allChannels, this.handlers, (ChannelDisconnectListener) this.handlers.get(PubSubProtocol.OperationType.SUBSCRIBE), false), null, this.cfg.getMaximumMessageSize()));
        serverBootstrap.setOption("child.tcpNoDelay", true);
        serverBootstrap.setOption("child.keepAlive", true);
        serverBootstrap.setOption("reuseAddress", true);
        this.allChannels.add(serverBootstrap.bind(new InetSocketAddress(this.cfg.getProxyPort())));
        logger.info("Going into receive loop");
    }

    public void shutdown() {
        this.allChannels.close().awaitUninterruptibly();
        this.client.close();
        this.serverSocketChannelFactory.releaseExternalResources();
    }

    public Handler getStartDeliveryHandler() {
        return this.handlers.get(PubSubProtocol.OperationType.START_DELIVERY);
    }

    public Handler getStopDeliveryHandler() {
        return this.handlers.get(PubSubProtocol.OperationType.STOP_DELIVERY);
    }

    public static void main(String[] strArr) {
        logger.info("Attempting to start Hedwig Proxy");
        ProxyConfiguration proxyConfiguration = new ProxyConfiguration();
        if (strArr.length > 0) {
            String str = strArr[0];
            try {
                proxyConfiguration.loadConf(new File(str).toURI().toURL());
            } catch (ConfigurationException e) {
                PubSubServer.errorMsgAndExit("Malformed configuration file: " + str, e, 2);
            } catch (MalformedURLException e2) {
                PubSubServer.errorMsgAndExit("Could not open configuration file: " + str, e2, 1);
            }
            logger.info("Using configuration file " + str);
        }
        try {
            new HedwigProxy(proxyConfiguration).start();
        } catch (Throwable th) {
            PubSubServer.errorMsgAndExit("Error during startup", th, 3);
        }
    }
}
