package org.apache.skywalking.oap.server.core.analysis.worker;

import java.util.List;
import lombok.Generated;
import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.commons.datacarrier.consumer.BulkConsumePool;
import org.apache.skywalking.apm.commons.datacarrier.consumer.ConsumerPoolFactory;
import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.oap.server.core.UnexpectedException;
import org.apache.skywalking.oap.server.core.analysis.data.MergableBufferedData;
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
import org.apache.skywalking.oap.server.core.worker.AbstractWorker;
import org.apache.skywalking.oap.server.library.module.ModuleDefineHolder;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker.class */
public class MetricsAggregateWorker extends AbstractWorker<Metrics> {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(MetricsAggregateWorker.class);
    public final long l1FlushPeriod;
    private AbstractWorker<Metrics> nextWorker;
    private final DataCarrier<Metrics> dataCarrier;
    private final MergableBufferedData<Metrics> mergeDataCache;
    private CounterMetrics aggregationCounter;
    private long lastSendTime;

    /* loaded from: input_file:org/apache/skywalking/oap/server/core/analysis/worker/MetricsAggregateWorker$AggregatorConsumer.class */
    private class AggregatorConsumer implements IConsumer<Metrics> {
        private AggregatorConsumer() {
        }

        public void init() {
        }

        public void consume(List<Metrics> list) {
            MetricsAggregateWorker.this.onWork(list);
        }

        public void onError(List<Metrics> list, Throwable th) {
            MetricsAggregateWorker.log.error(th.getMessage(), th);
        }

        public void onExit() {
        }

        public void nothingToConsume() {
            MetricsAggregateWorker.this.flush();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MetricsAggregateWorker(ModuleDefineHolder moduleDefineHolder, AbstractWorker<Metrics> abstractWorker, String str, long j) {
        super(moduleDefineHolder);
        this.lastSendTime = 0L;
        this.nextWorker = abstractWorker;
        this.mergeDataCache = new MergableBufferedData<>();
        this.dataCarrier = new DataCarrier<>("MetricsAggregateWorker." + str, "METRICS_L1_AGGREGATION", 2, 10000);
        try {
            ConsumerPoolFactory.INSTANCE.createIfAbsent("METRICS_L1_AGGREGATION", new BulkConsumePool.Creator("METRICS_L1_AGGREGATION", BulkConsumePool.Creator.recommendMaxSize() * 2, 20L));
            this.dataCarrier.consume(ConsumerPoolFactory.INSTANCE.get("METRICS_L1_AGGREGATION"), new AggregatorConsumer());
            this.aggregationCounter = moduleDefineHolder.find("telemetry").provider().getService(MetricsCreator.class).createCounter("metrics_aggregation", "The number of rows in aggregation", new MetricsTag.Keys(new String[]{"metricName", "level", "dimensionality"}), new MetricsTag.Values(new String[]{str, "1", "minute"}));
            this.l1FlushPeriod = j;
        } catch (Exception e) {
            throw new UnexpectedException(e.getMessage(), e);
        }
    }

    @Override // org.apache.skywalking.oap.server.core.worker.AbstractWorker
    public final void in(Metrics metrics) {
        this.dataCarrier.produce(metrics);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onWork(List<Metrics> list) {
        list.forEach(metrics -> {
            this.aggregationCounter.inc();
            this.mergeDataCache.accept((MergableBufferedData<Metrics>) metrics);
        });
        flush();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void flush() {
        long currentTimeMillis = System.currentTimeMillis();
        if (currentTimeMillis - this.lastSendTime > this.l1FlushPeriod) {
            this.mergeDataCache.read().forEach(metrics -> {
                if (log.isDebugEnabled()) {
                    log.debug(metrics.toString());
                }
                this.nextWorker.in(metrics);
            });
            this.lastSendTime = currentTimeMillis;
        }
    }
}
