package org.apache.skywalking.oap.server.core.remote;

import io.grpc.stub.StreamObserver;
import java.util.Objects;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.remote.data.StreamData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.Empty;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteData;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteMessage;
import org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteServiceGrpc;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.core.worker.IWorkerInstanceGetter;
import org.apache.skywalking.oap.server.core.worker.RemoteHandleWorker;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/skywalking/oap/server/core/remote/RemoteServiceHandler.class */
public class RemoteServiceHandler extends RemoteServiceGrpc.RemoteServiceImplBase implements GRPCHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(RemoteServiceHandler.class);
    private final ModuleDefineHolder moduleDefineHolder;
    private IWorkerInstanceGetter workerInstanceGetter;
    private CounterMetrics remoteInCounter;
    private CounterMetrics remoteInErrorCounter;
    private CounterMetrics remoteInTargetNotFoundCounter;
    private HistogramMetrics remoteInHistogram;

    public RemoteServiceHandler(ModuleDefineHolder moduleDefineHolder) {
        this.moduleDefineHolder = moduleDefineHolder;
        this.remoteInCounter = moduleDefineHolder.find("telemetry").provider().getService(MetricsCreator.class).createCounter("remote_in_count", "The number(server side) of inside remote inside aggregate rpc.", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
        this.remoteInErrorCounter = moduleDefineHolder.find("telemetry").provider().getService(MetricsCreator.class).createCounter("remote_in_error_count", "The error number(server side) of inside remote inside aggregate rpc.", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
        this.remoteInTargetNotFoundCounter = moduleDefineHolder.find("telemetry").provider().getService(MetricsCreator.class).createCounter("remote_in_target_not_found_count", "The error number(server side) of inside remote handler target worker not found. May be caused by unmatched OAL scrips.", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE);
        this.remoteInHistogram = moduleDefineHolder.find("telemetry").provider().getService(MetricsCreator.class).createHistogramMetric("remote_in_latency", "The latency(server side) of inside remote inside aggregate rpc.", MetricsTag.EMPTY_KEY, MetricsTag.EMPTY_VALUE, new double[0]);
    }

    @Override // org.apache.skywalking.oap.server.core.remote.grpc.proto.RemoteServiceGrpc.RemoteServiceImplBase
    public StreamObserver<RemoteMessage> call(final StreamObserver<Empty> streamObserver) {
        if (Objects.isNull(this.workerInstanceGetter)) {
            synchronized (RemoteServiceHandler.class) {
                if (Objects.isNull(this.workerInstanceGetter)) {
                    this.workerInstanceGetter = (IWorkerInstanceGetter) this.moduleDefineHolder.find(CoreModule.NAME).provider().getService(IWorkerInstanceGetter.class);
                }
            }
        }
        return new StreamObserver<RemoteMessage>() { // from class: org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler.1
            public void onNext(RemoteMessage remoteMessage) {
                RemoteServiceHandler.this.remoteInCounter.inc();
                HistogramMetrics.Timer createTimer = RemoteServiceHandler.this.remoteInHistogram.createTimer();
                try {
                    String nextWorkerName = remoteMessage.getNextWorkerName();
                    RemoteData remoteData = remoteMessage.getRemoteData();
                    try {
                        RemoteHandleWorker remoteHandleWorker = RemoteServiceHandler.this.workerInstanceGetter.get(nextWorkerName);
                        if (remoteHandleWorker != null) {
                            AbstractWorker worker = remoteHandleWorker.getWorker();
                            StreamData newInstance = remoteHandleWorker.getStreamDataClass().newInstance();
                            newInstance.deserialize(remoteData);
                            worker.in(newInstance);
                        } else {
                            RemoteServiceHandler.this.remoteInTargetNotFoundCounter.inc();
                            RemoteServiceHandler.LOGGER.warn("Work name [{}] not found. Check OAL script, make sure they are same in the whole cluster.", nextWorkerName);
                        }
                    } catch (Throwable th) {
                        RemoteServiceHandler.this.remoteInErrorCounter.inc();
                        RemoteServiceHandler.LOGGER.error(th.getMessage(), th);
                    }
                } finally {
                    createTimer.finish();
                }
            }

            public void onError(Throwable th) {
                RemoteServiceHandler.LOGGER.error(th.getMessage(), th);
            }

            public void onCompleted() {
                streamObserver.onNext(Empty.newBuilder().m230build());
                streamObserver.onCompleted();
            }
        };
    }
}
