package org.apache.hedwig.server.handlers;

import com.google.protobuf.ByteString;
import org.apache.bookkeeper.util.MathUtils;
import org.apache.bookkeeper.util.ReflectionUtils;
import org.apache.hedwig.client.data.TopicSubscriber;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.filter.PipelineFilter;
import org.apache.hedwig.filter.ServerMessageFilter;
import org.apache.hedwig.protocol.PubSubProtocol;
import org.apache.hedwig.protoextensions.PubSubResponseUtils;
import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
import org.apache.hedwig.server.common.ServerConfiguration;
import org.apache.hedwig.server.delivery.ChannelEndPoint;
import org.apache.hedwig.server.delivery.DeliveryManager;
import org.apache.hedwig.server.netty.ServerStats;
import org.apache.hedwig.server.netty.UmbrellaHandler;
import org.apache.hedwig.server.persistence.PersistenceManager;
import org.apache.hedwig.server.subscriptions.AllToAllTopologyFilter;
import org.apache.hedwig.server.subscriptions.SubscriptionManager;
import org.apache.hedwig.server.topics.TopicManager;
import org.apache.hedwig.util.Callback;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hedwig/server/handlers/SubscribeHandler.class */
public class SubscribeHandler extends BaseHandler {
    static Logger logger = LoggerFactory.getLogger(SubscribeHandler.class);
    private final DeliveryManager deliveryMgr;
    private final PersistenceManager persistenceMgr;
    private final SubscriptionManager subMgr;
    private final SubscriptionChannelManager subChannelMgr;
    private final ServerStats.OpStats subStats;

    public SubscribeHandler(ServerConfiguration serverConfiguration, TopicManager topicManager, DeliveryManager deliveryManager, PersistenceManager persistenceManager, SubscriptionManager subscriptionManager, SubscriptionChannelManager subscriptionChannelManager) {
        super(topicManager, serverConfiguration);
        this.deliveryMgr = deliveryManager;
        this.persistenceMgr = persistenceManager;
        this.subMgr = subscriptionManager;
        this.subChannelMgr = subscriptionChannelManager;
        this.subStats = ServerStats.getInstance().getOpStats(PubSubProtocol.OperationType.SUBSCRIBE);
    }

    @Override // org.apache.hedwig.server.handlers.BaseHandler
    public void handleRequestAtOwner(final PubSubProtocol.PubSubRequest pubSubRequest, final Channel channel) {
        if (!pubSubRequest.hasSubscribeRequest()) {
            UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, pubSubRequest.getTxnId(), "Missing subscribe request data");
            this.subStats.incrementFailedOps();
            return;
        }
        final ByteString topic = pubSubRequest.getTopic();
        try {
            PubSubProtocol.MessageSeqId currentSeqIdForTopic = this.persistenceMgr.getCurrentSeqIdForTopic(topic);
            final PubSubProtocol.SubscribeRequest subscribeRequest = pubSubRequest.getSubscribeRequest();
            final ByteString subscriberId = subscribeRequest.getSubscriberId();
            PubSubProtocol.MessageSeqId build = PubSubProtocol.MessageSeqId.newBuilder(currentSeqIdForTopic).setLocalComponent(currentSeqIdForTopic.getLocalComponent()).build();
            final long now = MathUtils.now();
            this.subMgr.serveSubscribeRequest(topic, subscribeRequest, build, new Callback<PubSubProtocol.SubscriptionData>() { // from class: org.apache.hedwig.server.handlers.SubscribeHandler.1
                public void operationFailed(Object obj, PubSubException pubSubException) {
                    channel.write(PubSubResponseUtils.getResponseForException(pubSubException, pubSubRequest.getTxnId())).addListener(ChannelFutureListener.CLOSE);
                    SubscribeHandler.logger.error("Error serving subscribe request (" + pubSubRequest.getTxnId() + ") for (topic: " + topic.toStringUtf8() + " , subscriber: " + subscriberId.toStringUtf8() + ")", pubSubException);
                    SubscribeHandler.this.subStats.incrementFailedOps();
                }

                public void operationFinished(Object obj, final PubSubProtocol.SubscriptionData subscriptionData) {
                    TopicSubscriber topicSubscriber = new TopicSubscriber(topic, subscriberId);
                    synchronized (channel) {
                        if (!channel.isConnected()) {
                            SubscribeHandler.this.subStats.incrementFailedOps();
                            return;
                        }
                        ServerMessageFilter pipelineFilter = new PipelineFilter();
                        try {
                            pipelineFilter.addLast(new AllToAllTopologyFilter());
                            if (subscriptionData.hasPreferences() && subscriptionData.getPreferences().hasMessageFilter()) {
                                pipelineFilter.addLast(ReflectionUtils.newInstance(subscriptionData.getPreferences().getMessageFilter(), ServerMessageFilter.class));
                            }
                            pipelineFilter.initialize(SubscribeHandler.this.cfg.getConf());
                            pipelineFilter.setSubscriptionPreferences(topic, subscriberId, subscriptionData.getPreferences());
                            boolean z = false;
                            if (subscribeRequest.hasForceAttach()) {
                                z = subscribeRequest.getForceAttach();
                            }
                            Channel put = SubscribeHandler.this.subChannelMgr.put(topicSubscriber, channel, z);
                            if (null != put) {
                                PubSubException.TopicBusyException topicBusyException = new PubSubException.TopicBusyException("Subscriber " + subscriberId.toStringUtf8() + " for topic " + topic.toStringUtf8() + " is already being served on a different channel " + put + ".");
                                SubscribeHandler.this.subStats.incrementFailedOps();
                                channel.write(PubSubResponseUtils.getResponseForException(topicBusyException, pubSubRequest.getTxnId())).addListener(ChannelFutureListener.CLOSE);
                            } else {
                                PubSubProtocol.MessageSeqId msgId = subscriptionData.getState().getMsgId();
                                SubscribeHandler.this.deliveryMgr.startServingSubscription(topic, subscriberId, subscriptionData.getPreferences(), PubSubProtocol.MessageSeqId.newBuilder(msgId).setLocalComponent(msgId.getLocalComponent() + 1).build(), new ChannelEndPoint(channel), pipelineFilter, new Callback<Void>() { // from class: org.apache.hedwig.server.handlers.SubscribeHandler.1.1
                                    public void operationFinished(Object obj2, Void r8) {
                                        channel.write(PubSubResponseUtils.getSuccessResponse(pubSubRequest.getTxnId(), PubSubProtocol.ResponseBody.newBuilder().setSubscribeResponse(PubSubProtocol.SubscribeResponse.newBuilder().setPreferences(subscriptionData.getPreferences())).build()));
                                        SubscribeHandler.logger.info("Subscribe request (" + pubSubRequest.getTxnId() + ") for (topic:" + topic.toStringUtf8() + ", subscriber:" + subscriberId.toStringUtf8() + ") from channel " + channel.getRemoteAddress() + " succeed - its subscription data is " + SubscriptionStateUtils.toString(subscriptionData));
                                        SubscribeHandler.this.subStats.updateLatency(MathUtils.now() - now);
                                    }

                                    public void operationFailed(Object obj2, PubSubException pubSubException) {
                                    }
                                }, null);
                            }
                        } catch (RuntimeException e) {
                            String str = "RuntimeException caught when instantiating message filter for (topic:" + topic.toStringUtf8() + ", subscriber:" + subscriberId.toStringUtf8() + ").It might be introduced by programming error in message filter.";
                            SubscribeHandler.logger.error(str, e);
                            PubSubException.InvalidMessageFilterException invalidMessageFilterException = new PubSubException.InvalidMessageFilterException(str, e);
                            SubscribeHandler.this.subStats.incrementFailedOps();
                            channel.write(PubSubResponseUtils.getResponseForException(invalidMessageFilterException, pubSubRequest.getTxnId()));
                        } catch (Throwable th) {
                            String str2 = "Failed to instantiate message filter for (topic:" + topic.toStringUtf8() + ", subscriber:" + subscriberId.toStringUtf8() + ").";
                            SubscribeHandler.logger.error(str2, th);
                            PubSubException.InvalidMessageFilterException invalidMessageFilterException2 = new PubSubException.InvalidMessageFilterException(str2, th);
                            SubscribeHandler.this.subStats.incrementFailedOps();
                            channel.write(PubSubResponseUtils.getResponseForException(invalidMessageFilterException2, pubSubRequest.getTxnId())).addListener(ChannelFutureListener.CLOSE);
                        }
                    }
                }
            }, null);
        } catch (PubSubException.ServerNotResponsibleForTopicException e) {
            channel.write(PubSubResponseUtils.getResponseForException(e, pubSubRequest.getTxnId())).addListener(ChannelFutureListener.CLOSE);
            logger.error("Error getting current seq id for topic " + topic.toStringUtf8() + " when processing subscribe request (txnid:" + pubSubRequest.getTxnId() + ") :", e);
            this.subStats.incrementFailedOps();
            ServerStats.getInstance().incrementRequestsRedirect();
        }
    }
}
