package com.glispa.kafka.influxdb.metrics.reporter;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/glispa/kafka/influxdb/metrics/reporter/InfluxdbKafkaMetricsReporter.class */
public class InfluxdbKafkaMetricsReporter implements MetricsReporter {
    private static final Logger log = LoggerFactory.getLogger(InfluxdbKafkaMetricsReporter.class);
    private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
    private final List<KafkaMetric> metricList = Collections.synchronizedList(new ArrayList());
    private KafkaMetricToInfxdbPointTransformer transformer;
    private MetricFilter filter;
    private InfluxdbConfig config;
    private InfluxDB influxDB;

    public void init(List<KafkaMetric> list) {
        this.metricList.addAll(list);
        int intValue = this.config.getInt(InfluxdbConfig.INFLUXDB_INTEVAL_SEC_CONFIG).intValue();
        this.executor.scheduleAtFixedRate(this::report, intValue, intValue, TimeUnit.SECONDS);
    }

    public void metricChange(KafkaMetric kafkaMetric) {
        this.metricList.add(kafkaMetric);
    }

    public void metricRemoval(KafkaMetric kafkaMetric) {
        this.metricList.remove(kafkaMetric);
    }

    public void close() {
        this.executor.submit(this::report);
        this.executor.shutdown();
        try {
            if (!this.executor.awaitTermination(5L, TimeUnit.SECONDS)) {
                log.error("Timed out before executor termination!");
            }
        } catch (InterruptedException e) {
            log.error("Can not shutdown executor gracefully", e);
        }
        this.influxDB.close();
    }

    private void report() {
        Stream<KafkaMetric> stream = this.metricList.stream();
        MetricFilter metricFilter = this.filter;
        metricFilter.getClass();
        Stream<KafkaMetric> filter = stream.filter((v1) -> {
            return r1.isAllowed(v1);
        });
        KafkaMetricToInfxdbPointTransformer kafkaMetricToInfxdbPointTransformer = this.transformer;
        kafkaMetricToInfxdbPointTransformer.getClass();
        Stream map = filter.map((v1) -> {
            return r1.transform(v1);
        }).filter((v0) -> {
            return v0.isPresent();
        }).map((v0) -> {
            return v0.get();
        });
        InfluxDB influxDB = this.influxDB;
        influxDB.getClass();
        map.forEach(influxDB::write);
    }

    public void configure(Map<String, ?> map) {
        this.config = new InfluxdbConfig(map);
        this.influxDB = InfluxDBFactory.connect(String.format("http://%s", this.config.getString(InfluxdbConfig.INFLUXDB_SERVER_CONFIG)), this.config.getString(InfluxdbConfig.INFLUXDB_USER_CONFIG), this.config.getPassword(InfluxdbConfig.INFLUXDB_PASSWORD_CONFIG).value());
        this.influxDB.setDatabase(this.config.getString(InfluxdbConfig.INFLUXDB_DATABASE_CONFIG));
        this.influxDB.enableBatch(100, 3, TimeUnit.SECONDS);
        this.influxDB.setRetentionPolicy(this.config.getString(InfluxdbConfig.INFLUXDB_RETENTION_POLICY_CONFIG));
        this.transformer = new KafkaMetricToInfxdbPointTransformer(this.config.getString(InfluxdbConfig.INFLUXDB_METRICS_NAME_PREFIX_CONFIG), new CustomTagsPropertyParser().parseMapProperty(this.config.getString(InfluxdbConfig.INFLUXDB_CUSTOM_TAGS_CONFIG)));
        this.filter = new MetricFilter(this.config.getString(InfluxdbConfig.INFLUXDB_METRICS_WHITELIST_REGEX_CONFIG), this.config.getString(InfluxdbConfig.INFLUXDB_METRICS_BLACKLIST_REGEX_CONFIG));
    }
}
