package org.apache.hedwig.server.netty;

import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.io.IOException;
import java.lang.Thread;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.hedwig.client.conf.ClientConfiguration;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.protocol.PubSubProtocol;
import org.apache.hedwig.server.common.ServerConfiguration;
import org.apache.hedwig.server.common.TerminateJVMExceptionHandler;
import org.apache.hedwig.server.delivery.DeliveryManager;
import org.apache.hedwig.server.delivery.FIFODeliveryManager;
import org.apache.hedwig.server.handlers.CloseSubscriptionHandler;
import org.apache.hedwig.server.handlers.ConsumeHandler;
import org.apache.hedwig.server.handlers.Handler;
import org.apache.hedwig.server.handlers.NettyHandlerBean;
import org.apache.hedwig.server.handlers.PublishHandler;
import org.apache.hedwig.server.handlers.SubscribeHandler;
import org.apache.hedwig.server.handlers.SubscriptionChannelManager;
import org.apache.hedwig.server.handlers.UnsubscribeHandler;
import org.apache.hedwig.server.jmx.HedwigMBeanRegistry;
import org.apache.hedwig.server.meta.MetadataManagerFactory;
import org.apache.hedwig.server.meta.ZkMetadataManagerFactory;
import org.apache.hedwig.server.persistence.BookkeeperPersistenceManager;
import org.apache.hedwig.server.persistence.LocalDBPersistenceManager;
import org.apache.hedwig.server.persistence.PersistenceManager;
import org.apache.hedwig.server.persistence.PersistenceManagerWithRangeScan;
import org.apache.hedwig.server.persistence.ReadAheadCache;
import org.apache.hedwig.server.regions.HedwigHubClientFactory;
import org.apache.hedwig.server.regions.RegionManager;
import org.apache.hedwig.server.ssl.SslServerContextFactory;
import org.apache.hedwig.server.subscriptions.InMemorySubscriptionManager;
import org.apache.hedwig.server.subscriptions.MMSubscriptionManager;
import org.apache.hedwig.server.subscriptions.SubscriptionManager;
import org.apache.hedwig.server.topics.MMTopicManager;
import org.apache.hedwig.server.topics.TopicManager;
import org.apache.hedwig.server.topics.TrivialOwnAllTopicManager;
import org.apache.hedwig.server.topics.ZkTopicManager;
import org.apache.hedwig.util.ConcurrencyUtils;
import org.apache.hedwig.util.Either;
import org.apache.hedwig.zookeeper.SafeAsyncCallback;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
import org.jboss.netty.channel.socket.ServerSocketChannelFactory;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.logging.Log4JLoggerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hedwig/server/netty/PubSubServer.class */
public class PubSubServer {
    static Logger logger = LoggerFactory.getLogger(PubSubServer.class);
    private static final String JMXNAME_PREFIX = "PubSubServer_";
    ServerSocketChannelFactory serverChannelFactory;
    ClientSocketChannelFactory clientChannelFactory;
    ServerConfiguration conf;
    ClientConfiguration clientConfiguration;
    ChannelGroup allChannels;
    PersistenceManager pm;
    DeliveryManager dm;
    TopicManager tm;
    SubscriptionManager sm;
    RegionManager rm;
    MetadataManagerFactory mm;
    ZooKeeper zk;
    BookKeeper bk;
    ScheduledExecutorService scheduler;
    NettyHandlerBean jmxNettyBean;
    PubSubServerBean jmxServerBean;
    final ThreadGroup tg;
    public static final int RC_INVALID_CONF_FILE = 1;
    public static final int RC_MISCONFIGURED = 2;
    public static final int RC_OTHER = 3;

    protected PersistenceManager instantiatePersistenceManager(TopicManager topicManager) throws IOException, InterruptedException {
        PersistenceManagerWithRangeScan bookkeeperPersistenceManager;
        if (this.conf.isStandalone()) {
            bookkeeperPersistenceManager = LocalDBPersistenceManager.instance();
        } else {
            try {
                org.apache.bookkeeper.conf.ClientConfiguration clientConfiguration = new org.apache.bookkeeper.conf.ClientConfiguration();
                clientConfiguration.addConfiguration(this.conf.getConf());
                this.bk = new BookKeeper(clientConfiguration, this.zk, this.clientChannelFactory);
                bookkeeperPersistenceManager = new BookkeeperPersistenceManager(this.bk, this.mm, topicManager, this.conf, this.scheduler);
            } catch (KeeperException e) {
                logger.error("Could not instantiate bookkeeper client", e);
                throw new IOException((Throwable) e);
            }
        }
        PersistenceManager persistenceManager = bookkeeperPersistenceManager;
        if (this.conf.getReadAheadEnabled()) {
            persistenceManager = new ReadAheadCache(bookkeeperPersistenceManager, this.conf).start();
        }
        return persistenceManager;
    }

    protected SubscriptionManager instantiateSubscriptionManager(TopicManager topicManager, PersistenceManager persistenceManager, DeliveryManager deliveryManager) {
        return this.conf.isStandalone() ? new InMemorySubscriptionManager(this.conf, topicManager, persistenceManager, deliveryManager, this.scheduler) : new MMSubscriptionManager(this.conf, this.mm, topicManager, persistenceManager, deliveryManager, this.scheduler);
    }

    protected RegionManager instantiateRegionManager(PersistenceManager persistenceManager, ScheduledExecutorService scheduledExecutorService) {
        return new RegionManager(persistenceManager, this.conf, this.zk, scheduledExecutorService, new HedwigHubClientFactory(this.conf, this.clientConfiguration, this.clientChannelFactory));
    }

    protected void instantiateZookeeperClient() throws Exception {
        if (this.conf.isStandalone()) {
            return;
        }
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        this.zk = new ZooKeeper(this.conf.getZkHost(), this.conf.getZkTimeout(), new Watcher() { // from class: org.apache.hedwig.server.netty.PubSubServer.1
            public void process(WatchedEvent watchedEvent) {
                if (Watcher.Event.KeeperState.SyncConnected.equals(watchedEvent.getState())) {
                    countDownLatch.countDown();
                }
            }
        });
        if (countDownLatch.await(this.conf.getZkTimeout() * 2, TimeUnit.MILLISECONDS)) {
            return;
        }
        logger.error("Could not establish connection with ZooKeeper after zk_timeout*2 = " + (this.conf.getZkTimeout() * 2) + " ms. (Default value for zk_timeout is 2000).");
        throw new Exception("Could not establish connection with ZooKeeper after zk_timeout*2 = " + (this.conf.getZkTimeout() * 2) + " ms. (Default value for zk_timeout is 2000).");
    }

    protected void instantiateMetadataManagerFactory() throws Exception {
        if (this.conf.isStandalone()) {
            return;
        }
        this.mm = MetadataManagerFactory.newMetadataManagerFactory(this.conf, this.zk);
    }

    protected TopicManager instantiateTopicManager() throws IOException {
        TopicManager zkTopicManager;
        if (this.conf.isStandalone()) {
            zkTopicManager = new TrivialOwnAllTopicManager(this.conf, this.scheduler);
        } else {
            try {
                if (this.conf.isMetadataManagerBasedTopicManagerEnabled()) {
                    zkTopicManager = new MMTopicManager(this.conf, this.zk, this.mm, this.scheduler);
                } else {
                    if (!(this.mm instanceof ZkMetadataManagerFactory)) {
                        throw new IOException("Uses " + this.mm.getClass().getName() + " to store hedwig metadata, but uses zookeeper ephemeral znodes to store topic ownership. Check your configuration as this could lead to scalability issues.");
                    }
                    zkTopicManager = new ZkTopicManager(this.zk, this.conf, this.scheduler);
                }
            } catch (PubSubException e) {
                logger.error("Could not instantiate TopicOwnershipManager based topic manager", e);
                throw new IOException((Throwable) e);
            }
        }
        return zkTopicManager;
    }

    protected Map<PubSubProtocol.OperationType, Handler> initializeNettyHandlers(TopicManager topicManager, DeliveryManager deliveryManager, PersistenceManager persistenceManager, SubscriptionManager subscriptionManager, SubscriptionChannelManager subscriptionChannelManager) {
        HashMap hashMap = new HashMap();
        hashMap.put(PubSubProtocol.OperationType.PUBLISH, new PublishHandler(topicManager, persistenceManager, this.conf));
        hashMap.put(PubSubProtocol.OperationType.SUBSCRIBE, new SubscribeHandler(this.conf, topicManager, deliveryManager, persistenceManager, subscriptionManager, subscriptionChannelManager));
        hashMap.put(PubSubProtocol.OperationType.UNSUBSCRIBE, new UnsubscribeHandler(this.conf, topicManager, subscriptionManager, deliveryManager, subscriptionChannelManager));
        hashMap.put(PubSubProtocol.OperationType.CONSUME, new ConsumeHandler(topicManager, subscriptionManager, this.conf));
        hashMap.put(PubSubProtocol.OperationType.CLOSESUBSCRIPTION, new CloseSubscriptionHandler(this.conf, topicManager, subscriptionManager, deliveryManager, subscriptionChannelManager));
        return Collections.unmodifiableMap(hashMap);
    }

    protected void initializeNetty(SslServerContextFactory sslServerContextFactory, Map<PubSubProtocol.OperationType, Handler> map, SubscriptionChannelManager subscriptionChannelManager) {
        boolean z = sslServerContextFactory != null;
        InternalLoggerFactory.setDefaultFactory(new Log4JLoggerFactory());
        ServerBootstrap serverBootstrap = new ServerBootstrap(this.serverChannelFactory);
        serverBootstrap.setPipelineFactory(new PubSubServerPipelineFactory(new UmbrellaHandler(this.allChannels, map, subscriptionChannelManager, z), sslServerContextFactory, this.conf.getMaximumMessageSize()));
        serverBootstrap.setOption("child.tcpNoDelay", true);
        serverBootstrap.setOption("child.keepAlive", true);
        serverBootstrap.setOption("reuseAddress", true);
        this.allChannels.add(serverBootstrap.bind(z ? new InetSocketAddress(this.conf.getSSLServerPort()) : new InetSocketAddress(this.conf.getServerPort())));
        logger.info("Going into receive loop");
    }

    public void shutdown() {
        this.tm.stop();
        this.rm.stop();
        this.dm.stop();
        this.pm.stop();
        this.sm.stop();
        if (null != this.mm) {
            try {
                this.mm.shutdown();
            } catch (IOException e) {
                logger.error("Error while shutdown metadata manager factory!", e);
            }
        }
        try {
            if (this.bk != null) {
                this.bk.close();
            }
            if (this.zk != null) {
                this.zk.close();
            }
        } catch (BKException e2) {
            logger.error("Error while closing BookKeeper client : ", e2);
        } catch (InterruptedException e3) {
            logger.error("Error while closing ZooKeeper client : ", e3);
        }
        this.allChannels.close().awaitUninterruptibly();
        this.serverChannelFactory.releaseExternalResources();
        this.clientChannelFactory.releaseExternalResources();
        this.scheduler.shutdown();
        unregisterJMX();
    }

    protected void registerJMX(SubscriptionChannelManager subscriptionChannelManager) {
        try {
            this.jmxServerBean = new PubSubServerBean(JMXNAME_PREFIX + this.conf.getServerPort() + "_" + this.conf.getSSLServerPort());
            HedwigMBeanRegistry.getInstance().register(this.jmxServerBean, null);
            try {
                this.jmxNettyBean = new NettyHandlerBean(subscriptionChannelManager);
                HedwigMBeanRegistry.getInstance().register(this.jmxNettyBean, this.jmxServerBean);
            } catch (Exception e) {
                logger.warn("Failed to register with JMX", e);
                this.jmxNettyBean = null;
            }
        } catch (Exception e2) {
            logger.warn("Failed to register with JMX", e2);
            this.jmxServerBean = null;
        }
        if (this.pm instanceof ReadAheadCache) {
            ((ReadAheadCache) this.pm).registerJMX(this.jmxServerBean);
        }
    }

    protected void unregisterJMX() {
        if (this.pm != null && (this.pm instanceof ReadAheadCache)) {
            ((ReadAheadCache) this.pm).unregisterJMX();
        }
        try {
            if (this.jmxNettyBean != null) {
                HedwigMBeanRegistry.getInstance().unregister(this.jmxNettyBean);
            }
        } catch (Exception e) {
            logger.warn("Failed to unregister with JMX", e);
        }
        try {
            if (this.jmxServerBean != null) {
                HedwigMBeanRegistry.getInstance().unregister(this.jmxServerBean);
            }
        } catch (Exception e2) {
            logger.warn("Failed to unregister with JMX", e2);
        }
        this.jmxNettyBean = null;
        this.jmxServerBean = null;
    }

    public PubSubServer(ServerConfiguration serverConfiguration, ClientConfiguration clientConfiguration, final Thread.UncaughtExceptionHandler uncaughtExceptionHandler) throws ConfigurationException {
        this.conf = serverConfiguration;
        serverConfiguration.validate();
        this.clientConfiguration = clientConfiguration;
        clientConfiguration.validate();
        this.tg = new ThreadGroup("hedwig") { // from class: org.apache.hedwig.server.netty.PubSubServer.2
            @Override // java.lang.ThreadGroup, java.lang.Thread.UncaughtExceptionHandler
            public void uncaughtException(Thread thread, Throwable th) {
                uncaughtExceptionHandler.uncaughtException(thread, th);
            }
        };
        SafeAsyncCallback.setUncaughtExceptionHandler(uncaughtExceptionHandler);
    }

    public void start() throws Exception {
        final SynchronousQueue synchronousQueue = new SynchronousQueue();
        new Thread(this.tg, new Runnable() { // from class: org.apache.hedwig.server.netty.PubSubServer.3
            @Override // java.lang.Runnable
            public void run() {
                try {
                    PubSubServer.this.scheduler = Executors.newSingleThreadScheduledExecutor();
                    PubSubServer.this.serverChannelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
                    PubSubServer.this.clientChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
                    PubSubServer.this.instantiateZookeeperClient();
                    PubSubServer.this.instantiateMetadataManagerFactory();
                    PubSubServer.this.tm = PubSubServer.this.instantiateTopicManager();
                    PubSubServer.this.pm = PubSubServer.this.instantiatePersistenceManager(PubSubServer.this.tm);
                    PubSubServer.this.dm = new FIFODeliveryManager(PubSubServer.this.pm, PubSubServer.this.conf);
                    PubSubServer.this.dm.start();
                    PubSubServer.this.sm = PubSubServer.this.instantiateSubscriptionManager(PubSubServer.this.tm, PubSubServer.this.pm, PubSubServer.this.dm);
                    PubSubServer.this.rm = PubSubServer.this.instantiateRegionManager(PubSubServer.this.pm, PubSubServer.this.scheduler);
                    PubSubServer.this.sm.addListener(PubSubServer.this.rm);
                    PubSubServer.this.allChannels = new DefaultChannelGroup("hedwig");
                    SubscriptionChannelManager subscriptionChannelManager = new SubscriptionChannelManager();
                    subscriptionChannelManager.addSubChannelDisconnectedListener((SubscriptionChannelManager.SubChannelDisconnectedListener) PubSubServer.this.dm);
                    Map<PubSubProtocol.OperationType, Handler> initializeNettyHandlers = PubSubServer.this.initializeNettyHandlers(PubSubServer.this.tm, PubSubServer.this.dm, PubSubServer.this.pm, PubSubServer.this.sm, subscriptionChannelManager);
                    PubSubServer.this.initializeNetty(null, initializeNettyHandlers, subscriptionChannelManager);
                    if (PubSubServer.this.conf.isSSLEnabled()) {
                        PubSubServer.this.initializeNetty(new SslServerContextFactory(PubSubServer.this.conf), initializeNettyHandlers, subscriptionChannelManager);
                    }
                    PubSubServer.this.registerJMX(subscriptionChannelManager);
                    ConcurrencyUtils.put(synchronousQueue, Either.of(new Object(), (Exception) null));
                } catch (Exception e) {
                    ConcurrencyUtils.put(synchronousQueue, Either.right(e));
                }
            }
        }).start();
        Either either = (Either) ConcurrencyUtils.take(synchronousQueue);
        if (either.left() == null) {
            throw ((Exception) either.right());
        }
    }

    public PubSubServer(ServerConfiguration serverConfiguration, ClientConfiguration clientConfiguration) throws Exception {
        this(serverConfiguration, clientConfiguration, new TerminateJVMExceptionHandler());
    }

    public PubSubServer(ServerConfiguration serverConfiguration) throws Exception {
        this(serverConfiguration, new ClientConfiguration());
    }

    @VisibleForTesting
    public DeliveryManager getDeliveryManager() {
        return this.dm;
    }

    public static void errorMsgAndExit(String str, Throwable th, int i) {
        logger.error(str, th);
        System.err.println(str);
        System.exit(i);
    }

    public static void main(String[] strArr) {
        logger.info("Attempting to start Hedwig");
        ServerConfiguration serverConfiguration = new ServerConfiguration();
        ClientConfiguration clientConfiguration = new ClientConfiguration();
        if (strArr.length > 0) {
            String str = strArr[0];
            try {
                serverConfiguration.loadConf(new File(str).toURI().toURL());
            } catch (MalformedURLException e) {
                errorMsgAndExit("Could not open server configuration file: " + str, e, 1);
            } catch (ConfigurationException e2) {
                errorMsgAndExit("Malformed server configuration file: " + str, e2, 2);
            }
            logger.info("Using configuration file " + str);
        }
        if (strArr.length > 1) {
            String str2 = strArr[1];
            try {
                clientConfiguration.loadConf(new File(str2).toURI().toURL());
            } catch (MalformedURLException e3) {
                errorMsgAndExit("Could not open client configuration file: " + str2, e3, 1);
            } catch (ConfigurationException e4) {
                errorMsgAndExit("Malformed client configuration file: " + str2, e4, 2);
            }
        }
        try {
            new PubSubServer(serverConfiguration, clientConfiguration).start();
        } catch (Throwable th) {
            errorMsgAndExit("Error during startup", th, 3);
        }
    }
}
