package ch.cern.sparkmeasure;

import java.nio.charset.StandardCharsets;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.spark.SparkConf;
import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerApplicationEnd;
import org.apache.spark.scheduler.SparkListenerApplicationStart;
import org.apache.spark.scheduler.SparkListenerEvent;
import org.apache.spark.scheduler.SparkListenerExecutorAdded;
import org.apache.spark.scheduler.SparkListenerJobEnd;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.scheduler.SparkListenerStageCompleted;
import org.apache.spark.scheduler.SparkListenerStageSubmitted;
import org.apache.spark.scheduler.cluster.ExecutorInfo;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession$;
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd;
import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.MatchError;
import scala.None$;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.immutable.Map;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;
import scala.util.Try$;

/* compiled from: KafkaSink.scala */
@ScalaSignature(bytes = "\u0006\u0005\u0005\u001de\u0001\u0002\u000e\u001c\u0001\tB\u0001b\f\u0001\u0003\u0002\u0003\u0006I\u0001\r\u0005\u0006i\u0001!\t!\u000e\u0005\bs\u0001\u0011\r\u0011\"\u0003;\u0011\u0019\t\u0005\u0001)A\u0005w!Q!\t\u0001I\u0001\u0002\u0007\u0005\u000b\u0011B\"\t\u000fQ\u0003!\u0019!C\u0001+\"1a\u000b\u0001Q\u0001\n%Cqa\u0016\u0001C\u0002\u0013\u0005Q\u000b\u0003\u0004Y\u0001\u0001\u0006I!\u0013\u0005\n3\u0002\u0001\r\u00111A\u0005\niC\u0011B\u001b\u0001A\u0002\u0003\u0007I\u0011B6\t\u0013A\u0004\u0001\u0019!A!B\u0013Y\u0006bB9\u0001\u0001\u0004%\t!\u0016\u0005\be\u0002\u0001\r\u0011\"\u0001t\u0011\u0019)\b\u0001)Q\u0005\u0013\")a\u000f\u0001C!o\")Q\u0010\u0001C!}\"9\u0011\u0011\u0002\u0001\u0005B\u0005-\u0001bBA\f\u0001\u0011\u0005\u0013\u0011\u0004\u0005\b\u0003K\u0001A\u0011IA\u0014\u0011\u001d\t\u0019\u0004\u0001C!\u0003kAq!!\u0011\u0001\t\u0003\n\u0019\u0005C\u0004\u0002P\u0001!\t%!\u0015\t\u000f\u0005u\u0003\u0001\"\u0005\u0002`!9\u00111\u0011\u0001\u0005\n\u0005\u0015%!C&bM.\f7+\u001b8l\u0015\taR$\u0001\u0007ta\u0006\u00148.\\3bgV\u0014XM\u0003\u0002\u001f?\u0005!1-\u001a:o\u0015\u0005\u0001\u0013AA2i\u0007\u0001\u0019\"\u0001A\u0012\u0011\u0005\u0011jS\"A\u0013\u000b\u0005\u0019:\u0013!C:dQ\u0016$W\u000f\\3s\u0015\tA\u0013&A\u0003ta\u0006\u00148N\u0003\u0002+W\u00051\u0011\r]1dQ\u0016T\u0011\u0001L\u0001\u0004_J<\u0017B\u0001\u0018&\u00055\u0019\u0006/\u0019:l\u0019&\u001cH/\u001a8fe\u0006!1m\u001c8g!\t\t$'D\u0001(\u0013\t\u0019tEA\u0005Ta\u0006\u00148nQ8oM\u00061A(\u001b8jiz\"\"A\u000e\u001d\u0011\u0005]\u0002Q\"A\u000e\t\u000b=\u0012\u0001\u0019\u0001\u0019\u0002\r1|wmZ3s+\u0005Y\u0004C\u0001\u001f@\u001b\u0005i$B\u0001 ,\u0003\u0015\u0019HN\u001a\u001bk\u0013\t\u0001UH\u0001\u0004M_\u001e<WM]\u0001\bY><w-\u001a:!\u0003\rAH%\r\t\u0005\t\u001eK\u0015*D\u0001F\u0015\u00051\u0015!B:dC2\f\u0017B\u0001%F\u0005\u0019!V\u000f\u001d7feA\u0011!*\u0015\b\u0003\u0017>\u0003\"\u0001T#\u000e\u00035S!AT\u0011\u0002\rq\u0012xn\u001c;?\u0013\t\u0001V)\u0001\u0004Qe\u0016$WMZ\u0005\u0003%N\u0013aa\u0015;sS:<'B\u0001)F\u0003\u0019\u0011'o\\6feV\t\u0011*A\u0004ce>\\WM\u001d\u0011\u0002\u000bQ|\u0007/[2\u0002\rQ|\u0007/[2!\u0003!\u0001(o\u001c3vG\u0016\u0014X#A.\u0011\tq\u0013\u0017\nZ\u0007\u0002;*\u0011\u0011L\u0018\u0006\u0003?\u0002\fqa\u00197jK:$8O\u0003\u0002bS\u0005)1.\u00194lC&\u00111-\u0018\u0002\t!J|G-^2feB\u0019A)Z4\n\u0005\u0019,%!B!se\u0006L\bC\u0001#i\u0013\tIWI\u0001\u0003CsR,\u0017\u0001\u00049s_\u0012,8-\u001a:`I\u0015\fHC\u00017p!\t!U.\u0003\u0002o\u000b\n!QK\\5u\u0011\u001d\u00115\"!AA\u0002m\u000b\u0011\u0002\u001d:pIV\u001cWM\u001d\u0011\u0002\u000b\u0005\u0004\b/\u00133\u0002\u0013\u0005\u0004\b/\u00133`I\u0015\fHC\u00017u\u0011\u001d\u0011e\"!AA\u0002%\u000ba!\u00199q\u0013\u0012\u0004\u0013aD8o\u000bb,7-\u001e;pe\u0006#G-\u001a3\u0015\u00051D\b\"B=\u0011\u0001\u0004Q\u0018!D3yK\u000e,Ho\u001c:BI\u0012,G\r\u0005\u0002%w&\u0011A0\n\u0002\u001b'B\f'o\u001b'jgR,g.\u001a:Fq\u0016\u001cW\u000f^8s\u0003\u0012$W\rZ\u0001\u0011_:\u001cF/Y4f'V\u0014W.\u001b;uK\u0012$\"\u0001\\@\t\u000f\u0005\u0005\u0011\u00031\u0001\u0002\u0004\u0005q1\u000f^1hKN+(-\\5ui\u0016$\u0007c\u0001\u0013\u0002\u0006%\u0019\u0011qA\u0013\u00037M\u0003\u0018M]6MSN$XM\\3s'R\fw-Z*vE6LG\u000f^3e\u0003Aygn\u0015;bO\u0016\u001cu.\u001c9mKR,G\rF\u0002m\u0003\u001bAq!a\u0004\u0013\u0001\u0004\t\t\"\u0001\bti\u0006<WmQ8na2,G/\u001a3\u0011\u0007\u0011\n\u0019\"C\u0002\u0002\u0016\u0015\u00121d\u00159be.d\u0015n\u001d;f]\u0016\u00148\u000b^1hK\u000e{W\u000e\u001d7fi\u0016$\u0017\u0001D8o\u001fRDWM]#wK:$Hc\u00017\u0002\u001c!9\u0011QD\nA\u0002\u0005}\u0011!B3wK:$\bc\u0001\u0013\u0002\"%\u0019\u00111E\u0013\u0003%M\u0003\u0018M]6MSN$XM\\3s\u000bZ,g\u000e^\u0001\u000b_:TuNY*uCJ$Hc\u00017\u0002*!9\u00111\u0006\u000bA\u0002\u00055\u0012\u0001\u00036pEN#\u0018M\u001d;\u0011\u0007\u0011\ny#C\u0002\u00022\u0015\u0012Qc\u00159be.d\u0015n\u001d;f]\u0016\u0014(j\u001c2Ti\u0006\u0014H/\u0001\u0005p]*{'-\u00128e)\ra\u0017q\u0007\u0005\b\u0003s)\u0002\u0019AA\u001e\u0003\u0019QwNY#oIB\u0019A%!\u0010\n\u0007\u0005}REA\nTa\u0006\u00148\u000eT5ti\u0016tWM\u001d&pE\u0016sG-\u0001\np]\u0006\u0003\b\u000f\\5dCRLwN\\*uCJ$Hc\u00017\u0002F!9\u0011q\t\fA\u0002\u0005%\u0013\u0001E1qa2L7-\u0019;j_:\u001cF/\u0019:u!\r!\u00131J\u0005\u0004\u0003\u001b*#!H*qCJ\\G*[:uK:,'/\u00119qY&\u001c\u0017\r^5p]N#\u0018M\u001d;\u0002!=t\u0017\t\u001d9mS\u000e\fG/[8o\u000b:$Gc\u00017\u0002T!9\u0011QK\fA\u0002\u0005]\u0013AD1qa2L7-\u0019;j_:,e\u000e\u001a\t\u0004I\u0005e\u0013bAA.K\tY2\u000b]1sW2K7\u000f^3oKJ\f\u0005\u000f\u001d7jG\u0006$\u0018n\u001c8F]\u0012\faA]3q_J$X\u0003BA1\u0003c\"2\u0001\\A2\u0011\u001d\t)\u0007\u0007a\u0001\u0003O\nq!\\3ue&\u001c7\u000f\u0005\u0004K\u0003SJ\u0015QN\u0005\u0004\u0003W\u001a&aA'baB!\u0011qNA9\u0019\u0001!q!a\u001d\u0019\u0005\u0004\t)HA\u0001U#\u0011\t9(! \u0011\u0007\u0011\u000bI(C\u0002\u0002|\u0015\u0013qAT8uQ&tw\rE\u0002E\u0003\u007fJ1!!!F\u0005\r\te._\u0001\u000fK:\u001cXO]3Qe>$WoY3s)\u0005a\u0007")
/* loaded from: input_file:ch/cern/sparkmeasure/KafkaSink.class */
public class KafkaSink extends SparkListener {
    private final Logger ch$cern$sparkmeasure$KafkaSink$$logger = LoggerFactory.getLogger(getClass().getName());
    private final /* synthetic */ Tuple2 x$1;
    private final String broker;
    private final String topic;
    private Producer<String, byte[]> producer;
    private String appId;

    public Logger ch$cern$sparkmeasure$KafkaSink$$logger() {
        return this.ch$cern$sparkmeasure$KafkaSink$$logger;
    }

    public String broker() {
        return this.broker;
    }

    public String topic() {
        return this.topic;
    }

    private Producer<String, byte[]> producer() {
        return this.producer;
    }

    private void producer_$eq(Producer<String, byte[]> producer) {
        this.producer = producer;
    }

    public String appId() {
        return this.appId;
    }

    public void appId_$eq(String str) {
        this.appId = str;
    }

    public void onExecutorAdded(SparkListenerExecutorAdded sparkListenerExecutorAdded) {
        ExecutorInfo executorInfo = sparkListenerExecutorAdded.executorInfo();
        report((Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("name"), "executors_started"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("appId"), appId()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("executorId"), sparkListenerExecutorAdded.executorId()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("host"), executorInfo.executorHost()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("totalCores"), BoxesRunTime.boxToInteger(executorInfo.totalCores())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("startTime"), BoxesRunTime.boxToLong(sparkListenerExecutorAdded.time())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("epochMillis"), BoxesRunTime.boxToLong(System.currentTimeMillis()))})));
    }

    public void onStageSubmitted(SparkListenerStageSubmitted sparkListenerStageSubmitted) {
        long unboxToLong = BoxesRunTime.unboxToLong(sparkListenerStageSubmitted.stageInfo().submissionTime().getOrElse(() -> {
            return 0L;
        }));
        int attemptNumber = sparkListenerStageSubmitted.stageInfo().attemptNumber();
        report((Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("name"), "stages_started"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("appId"), appId()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("stageId"), Integer.toString(sparkListenerStageSubmitted.stageInfo().stageId())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("attemptNumber"), BoxesRunTime.boxToInteger(attemptNumber)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("submissionTime"), BoxesRunTime.boxToLong(unboxToLong)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("epochMillis"), BoxesRunTime.boxToLong(System.currentTimeMillis()))})));
    }

    public void onStageCompleted(SparkListenerStageCompleted sparkListenerStageCompleted) {
        String num = Integer.toString(sparkListenerStageCompleted.stageInfo().stageId());
        long unboxToLong = BoxesRunTime.unboxToLong(sparkListenerStageCompleted.stageInfo().submissionTime().getOrElse(() -> {
            return 0L;
        }));
        long unboxToLong2 = BoxesRunTime.unboxToLong(sparkListenerStageCompleted.stageInfo().completionTime().getOrElse(() -> {
            return 0L;
        }));
        int attemptNumber = sparkListenerStageCompleted.stageInfo().attemptNumber();
        long currentTimeMillis = System.currentTimeMillis();
        report((Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("name"), "stages_ended"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("appId"), appId()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("stageId"), num), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("attemptNumber"), BoxesRunTime.boxToInteger(attemptNumber)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("submissionTime"), BoxesRunTime.boxToLong(unboxToLong)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("completionTime"), BoxesRunTime.boxToLong(unboxToLong2)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("epochMillis"), BoxesRunTime.boxToLong(currentTimeMillis))})));
        org.apache.spark.executor.TaskMetrics taskMetrics = sparkListenerStageCompleted.stageInfo().taskMetrics();
        report((Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("name"), "stage_metrics"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("appId"), appId()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("stageId"), num), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("attemptNumber"), BoxesRunTime.boxToInteger(attemptNumber)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("submissionTime"), BoxesRunTime.boxToLong(unboxToLong)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("completionTime"), BoxesRunTime.boxToLong(unboxToLong2)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("failureReason"), sparkListenerStageCompleted.stageInfo().failureReason().getOrElse(() -> {
            return "";
        })), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("executorRunTime"), BoxesRunTime.boxToLong(taskMetrics.executorRunTime())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("executorCpuTime"), BoxesRunTime.boxToLong(taskMetrics.executorRunTime())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("executorDeserializeCpuTime"), BoxesRunTime.boxToLong(taskMetrics.executorDeserializeCpuTime())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("executorDeserializeTime"), BoxesRunTime.boxToLong(taskMetrics.executorDeserializeTime())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("jvmGCTime"), BoxesRunTime.boxToLong(taskMetrics.jvmGCTime())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("memoryBytesSpilled"), BoxesRunTime.boxToLong(taskMetrics.memoryBytesSpilled())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("peakExecutionMemory"), BoxesRunTime.boxToLong(taskMetrics.peakExecutionMemory())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("resultSerializationTime"), BoxesRunTime.boxToLong(taskMetrics.resultSerializationTime())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("resultSize"), BoxesRunTime.boxToLong(taskMetrics.resultSize())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("bytesRead"), BoxesRunTime.boxToLong(taskMetrics.inputMetrics().bytesRead())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("recordsRead"), BoxesRunTime.boxToLong(taskMetrics.inputMetrics().recordsRead())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("bytesWritten"), BoxesRunTime.boxToLong(taskMetrics.outputMetrics().bytesWritten())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("recordsWritten"), BoxesRunTime.boxToLong(taskMetrics.outputMetrics().recordsWritten())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("shuffleTotalBytesRead"), BoxesRunTime.boxToLong(taskMetrics.shuffleReadMetrics().totalBytesRead())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("shuffleRemoteBytesRead"), BoxesRunTime.boxToLong(taskMetrics.shuffleReadMetrics().remoteBytesRead())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("shuffleRemoteBytesReadToDisk"), BoxesRunTime.boxToLong(taskMetrics.shuffleReadMetrics().remoteBytesReadToDisk())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("shuffleLocalBytesRead"), BoxesRunTime.boxToLong(taskMetrics.shuffleReadMetrics().localBytesRead())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("shuffleTotalBlocksFetched"), BoxesRunTime.boxToLong(taskMetrics.shuffleReadMetrics().totalBlocksFetched())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("shuffleLocalBlocksFetched"), BoxesRunTime.boxToLong(taskMetrics.shuffleReadMetrics().localBlocksFetched())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("shuffleRemoteBlocksFetched"), BoxesRunTime.boxToLong(taskMetrics.shuffleReadMetrics().remoteBlocksFetched())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("shuffleRecordsRead"), BoxesRunTime.boxToLong(taskMetrics.shuffleReadMetrics().recordsRead())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("shuffleFetchWaitTime"), BoxesRunTime.boxToLong(taskMetrics.shuffleReadMetrics().fetchWaitTime())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("shuffleBytesWritten"), BoxesRunTime.boxToLong(taskMetrics.shuffleWriteMetrics().bytesWritten())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("shuffleRecordsWritten"), BoxesRunTime.boxToLong(taskMetrics.shuffleWriteMetrics().recordsWritten())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("shuffleWriteTime"), BoxesRunTime.boxToLong(taskMetrics.shuffleWriteMetrics().writeTime())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("epochMillis"), BoxesRunTime.boxToLong(currentTimeMillis))})));
    }

    public void onOtherEvent(SparkListenerEvent sparkListenerEvent) {
        long currentTimeMillis = System.currentTimeMillis();
        if (sparkListenerEvent instanceof SparkListenerSQLExecutionStart) {
            SparkListenerSQLExecutionStart sparkListenerSQLExecutionStart = (SparkListenerSQLExecutionStart) sparkListenerEvent;
            long time = sparkListenerSQLExecutionStart.time();
            String l = Long.toString(sparkListenerSQLExecutionStart.executionId());
            report((Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("name"), "queries_started"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("appId"), appId()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("description"), sparkListenerSQLExecutionStart.description()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("queryId"), l), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("startTime"), BoxesRunTime.boxToLong(time)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("epochMillis"), BoxesRunTime.boxToLong(currentTimeMillis))})));
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            return;
        }
        if (!(sparkListenerEvent instanceof SparkListenerSQLExecutionEnd)) {
            None$ none$ = None$.MODULE$;
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            return;
        }
        SparkListenerSQLExecutionEnd sparkListenerSQLExecutionEnd = (SparkListenerSQLExecutionEnd) sparkListenerEvent;
        long time2 = sparkListenerSQLExecutionEnd.time();
        report((Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("name"), "queries_ended"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("appId"), appId()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("queryId"), Long.toString(sparkListenerSQLExecutionEnd.executionId())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("endTime"), BoxesRunTime.boxToLong(time2)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("epochMillis"), BoxesRunTime.boxToLong(currentTimeMillis))})));
        BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
    }

    public void onJobStart(SparkListenerJobStart sparkListenerJobStart) {
        long time = sparkListenerJobStart.time();
        report((Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("name"), "jobs_started"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("appId"), appId()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("jobId"), Integer.toString(sparkListenerJobStart.jobId())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("startTime"), BoxesRunTime.boxToLong(time)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("epochMillis"), BoxesRunTime.boxToLong(System.currentTimeMillis()))})));
    }

    public void onJobEnd(SparkListenerJobEnd sparkListenerJobEnd) {
        long time = sparkListenerJobEnd.time();
        report((Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("name"), "jobs_ended"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("appId"), appId()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("jobId"), Integer.toString(sparkListenerJobEnd.jobId())), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("completionTime"), BoxesRunTime.boxToLong(time)), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("epochMillis"), BoxesRunTime.boxToLong(System.currentTimeMillis()))})));
    }

    public void onApplicationStart(SparkListenerApplicationStart sparkListenerApplicationStart) {
        appId_$eq((String) sparkListenerApplicationStart.appId().getOrElse(() -> {
            return "noAppId";
        }));
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v12, types: [ch.cern.sparkmeasure.KafkaSink] */
    /* JADX WARN: Type inference failed for: r0v2 */
    /* JADX WARN: Type inference failed for: r0v3, types: [java.lang.Throwable] */
    public void onApplicationEnd(SparkListenerApplicationEnd sparkListenerApplicationEnd) {
        ch$cern$sparkmeasure$KafkaSink$$logger().info(new StringBuilder(64).append("Spark application ended, timestamp = ").append(sparkListenerApplicationEnd.time()).append(", closing Kafka connection.").toString());
        ?? r0 = this;
        synchronized (r0) {
            if (Option$.MODULE$.apply(producer()).isDefined()) {
                producer().flush();
                producer().close();
                r0 = this;
                r0.producer_$eq(null);
            }
        }
    }

    public <T> void report(Map<String, T> map) {
        Try$.MODULE$.apply(() -> {
            this.ensureProducer();
            return this.producer().send(new ProducerRecord(this.topic(), IOUtils$.MODULE$.writeToStringSerializedJSON(map).getBytes(StandardCharsets.UTF_8)));
        }).recover(new KafkaSink$$anonfun$report$2(this));
    }

    private synchronized void ensureProducer() {
        if (Option$.MODULE$.apply(producer()).isEmpty()) {
            Properties properties = new Properties();
            properties.put("bootstrap.servers", broker());
            properties.put("retries", "10");
            properties.put("batch.size", "16384");
            properties.put("linger.ms", "0");
            properties.put("buffer.memory", "16384000");
            properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            properties.put("value.serializer", ByteArraySerializer.class.getName());
            properties.put("client.id", "spark-measure");
            producer_$eq(new KafkaProducer(properties));
        }
    }

    public KafkaSink(SparkConf sparkConf) {
        ch$cern$sparkmeasure$KafkaSink$$logger().warn("Custom monitoring listener with Kafka sink initializing. Now attempting to connect to Kafka topic");
        Tuple2<String, String> parseKafkaConfig = Utils$.MODULE$.parseKafkaConfig(sparkConf, ch$cern$sparkmeasure$KafkaSink$$logger());
        if (parseKafkaConfig == null) {
            throw new MatchError(parseKafkaConfig);
        }
        this.x$1 = new Tuple2((String) parseKafkaConfig._1(), (String) parseKafkaConfig._2());
        this.broker = (String) this.x$1._1();
        this.topic = (String) this.x$1._2();
        Some activeSession = SparkSession$.MODULE$.getActiveSession();
        this.appId = activeSession instanceof Some ? ((SparkSession) activeSession.value()).sparkContext().applicationId() : "noAppId";
    }
}
