package org.apache.hedwig.server.proxy;

import com.google.protobuf.ByteString;
import org.apache.hedwig.client.api.MessageHandler;
import org.apache.hedwig.client.api.Subscriber;
import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.protocol.PubSubProtocol;
import org.apache.hedwig.protoextensions.PubSubResponseUtils;
import org.apache.hedwig.server.handlers.Handler;
import org.apache.hedwig.server.netty.UmbrellaHandler;
import org.apache.hedwig.util.Callback;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hedwig/server/proxy/ProxyStartDeliveryHandler.class */
public class ProxyStartDeliveryHandler implements Handler {
    static final Logger logger = LoggerFactory.getLogger(ProxyStartDeliveryHandler.class);
    Subscriber subscriber;
    ChannelTracker tracker;

    public ProxyStartDeliveryHandler(Subscriber subscriber, ChannelTracker channelTracker) {
        this.subscriber = subscriber;
        this.tracker = channelTracker;
    }

    @Override // org.apache.hedwig.server.handlers.Handler
    public void handleRequest(PubSubProtocol.PubSubRequest pubSubRequest, Channel channel) {
        if (!pubSubRequest.hasStartDeliveryRequest()) {
            UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, pubSubRequest.getTxnId(), "Missing start delivery request data");
            return;
        }
        ByteString topic = pubSubRequest.getTopic();
        ByteString subscriberId = pubSubRequest.getStartDeliveryRequest().getSubscriberId();
        synchronized (this.tracker) {
            final Channel channel2 = this.tracker.getChannel(topic, subscriberId);
            if (channel2 == null) {
                channel.write(PubSubResponseUtils.getResponseForException(new PubSubException.ClientNotSubscribedException("no subscription to start delivery on"), pubSubRequest.getTxnId()));
                return;
            }
            MessageHandler messageHandler = new MessageHandler() { // from class: org.apache.hedwig.server.proxy.ProxyStartDeliveryHandler.1
                public void deliver(ByteString byteString, ByteString byteString2, PubSubProtocol.Message message, final Callback<Void> callback, final Object obj) {
                    channel2.write(PubSubProtocol.PubSubResponse.newBuilder().setProtocolVersion(PubSubProtocol.ProtocolVersion.VERSION_ONE).setStatusCode(PubSubProtocol.StatusCode.SUCCESS).setTxnId(0L).setMessage(message).setTopic(byteString).setSubscriberId(byteString2).build()).addListener(new ChannelFutureListener() { // from class: org.apache.hedwig.server.proxy.ProxyStartDeliveryHandler.1.1
                        public void operationComplete(ChannelFuture channelFuture) throws Exception {
                            if (channelFuture.isSuccess()) {
                                callback.operationFinished(obj, (Object) null);
                            }
                        }
                    });
                }
            };
            channel.write(PubSubResponseUtils.getSuccessResponse(pubSubRequest.getTxnId()));
            try {
                this.subscriber.startDelivery(topic, subscriberId, messageHandler);
            } catch (AlreadyStartDeliveryException e) {
                logger.error("Unexpected: Already start delivery when attempting to start delivery", e);
                throw new RuntimeException((Throwable) e);
            } catch (PubSubException.ClientNotSubscribedException e2) {
                logger.error("Unexpected: No subscription when attempting to start delivery", e2);
                throw new RuntimeException((Throwable) e2);
            }
        }
    }
}
