package org.apache.hedwig.server.handlers;

import org.apache.bookkeeper.util.MathUtils;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.protocol.PubSubProtocol;
import org.apache.hedwig.protoextensions.PubSubResponseUtils;
import org.apache.hedwig.server.common.ServerConfiguration;
import org.apache.hedwig.server.netty.ServerStats;
import org.apache.hedwig.server.netty.UmbrellaHandler;
import org.apache.hedwig.server.persistence.PersistRequest;
import org.apache.hedwig.server.persistence.PersistenceManager;
import org.apache.hedwig.server.topics.TopicManager;
import org.apache.hedwig.util.Callback;
import org.jboss.netty.channel.Channel;

/* loaded from: input_file:org/apache/hedwig/server/handlers/PublishHandler.class */
public class PublishHandler extends BaseHandler {
    private PersistenceManager persistenceMgr;
    private final ServerStats.OpStats pubStats;

    public PublishHandler(TopicManager topicManager, PersistenceManager persistenceManager, ServerConfiguration serverConfiguration) {
        super(topicManager, serverConfiguration);
        this.persistenceMgr = persistenceManager;
        this.pubStats = ServerStats.getInstance().getOpStats(PubSubProtocol.OperationType.PUBLISH);
    }

    @Override // org.apache.hedwig.server.handlers.BaseHandler
    public void handleRequestAtOwner(final PubSubProtocol.PubSubRequest pubSubRequest, final Channel channel) {
        if (!pubSubRequest.hasPublishRequest()) {
            UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, pubSubRequest.getTxnId(), "Missing publish request data");
            this.pubStats.incrementFailedOps();
        } else {
            PubSubProtocol.Message build = PubSubProtocol.Message.newBuilder(pubSubRequest.getPublishRequest().getMsg()).setSrcRegion(this.cfg.getMyRegionByteString()).build();
            final long now = MathUtils.now();
            this.persistenceMgr.persistMessage(new PersistRequest(pubSubRequest.getTopic(), build, new Callback<PubSubProtocol.MessageSeqId>() { // from class: org.apache.hedwig.server.handlers.PublishHandler.1
                public void operationFailed(Object obj, PubSubException pubSubException) {
                    channel.write(PubSubResponseUtils.getResponseForException(pubSubException, pubSubRequest.getTxnId()));
                    PublishHandler.this.pubStats.incrementFailedOps();
                }

                public void operationFinished(Object obj, PubSubProtocol.MessageSeqId messageSeqId) {
                    channel.write(PublishHandler.getSuccessResponse(pubSubRequest.getTxnId(), messageSeqId));
                    PublishHandler.this.pubStats.updateLatency(MathUtils.now() - now);
                }
            }, null));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static PubSubProtocol.PubSubResponse getSuccessResponse(long j, PubSubProtocol.MessageSeqId messageSeqId) {
        if (null == messageSeqId) {
            return PubSubResponseUtils.getSuccessResponse(j);
        }
        return PubSubProtocol.PubSubResponse.newBuilder().setProtocolVersion(PubSubResponseUtils.serverVersion).setStatusCode(PubSubProtocol.StatusCode.SUCCESS).setTxnId(j).setResponseBody(PubSubProtocol.ResponseBody.newBuilder().setPublishResponse(PubSubProtocol.PublishResponse.newBuilder().setPublishedMsgId(messageSeqId).build()).build()).build();
    }
}
