package ai.starlake.job.sink.kafka;

import ai.starlake.config.DatasetArea$;
import ai.starlake.config.Settings;
import ai.starlake.config.SparkEnv;
import ai.starlake.schema.handlers.SchemaHandler;
import ai.starlake.schema.model.SinkType;
import ai.starlake.schema.model.Views;
import ai.starlake.utils.Formatter$;
import ai.starlake.utils.JobBase;
import ai.starlake.utils.JobResult;
import ai.starlake.utils.SparkJob;
import ai.starlake.utils.SparkJobResult;
import ai.starlake.utils.Utils$;
import ai.starlake.utils.kafka.KafkaClient;
import com.typesafe.config.ConfigValue;
import com.typesafe.scalalogging.Logger;
import com.typesafe.scalalogging.StrictLogging;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.DatasetLogging;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.UDFRegistration;
import org.apache.spark.sql.avro.SchemaConverters;
import org.apache.spark.sql.avro.SchemaConverters$;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.streaming.Trigger;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.Tuple3;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.Seq;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.$colon;
import scala.collection.immutable.List;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.immutable.Vector$;
import scala.reflect.ScalaSignature;
import scala.reflect.api.Mirror;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.TypeTags;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.reflect.runtime.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Try;
import scala.util.Try$;

/* compiled from: KafkaJob.scala */
@ScalaSignature(bytes = "\u0006\u0001\t\u001da\u0001B\u0012%\u0001=B\u0001\u0002\u0010\u0001\u0003\u0006\u0004%\t!\u0010\u0005\t\u0005\u0002\u0011\t\u0011)A\u0005}!A1\t\u0001BC\u0002\u0013\rA\t\u0003\u0005L\u0001\t\u0005\t\u0015!\u0003F\u0011\u0015a\u0005\u0001\"\u0001N\u0011\u001d\u0011\u0006A1A\u0005\u0002MCa\u0001\u0018\u0001!\u0002\u0013!\u0006bB/\u0001\u0005\u0004%IA\u0018\u0005\u0007M\u0002\u0001\u000b\u0011B0\t\u000f\u001d\u0004!\u0019!C\u0005Q\"1A\u000f\u0001Q\u0001\n%Dq!\u001e\u0001C\u0002\u0013\u0005a\u000fC\u0004\u0002\u0002\u0001\u0001\u000b\u0011B<\t\u0013\u0005\r\u0001A1A\u0005\u0002\u0005\u0015\u0001\u0002CA\u0012\u0001\u0001\u0006I!a\u0002\t\u000f\u0005\u0015\u0002\u0001\"\u0001\u0002(!I\u0011q\u0007\u0001\u0012\u0002\u0013\u0005\u0011\u0011\b\u0005\b\u0003\u001f\u0002A\u0011AA)\u0011%\tY\b\u0001b\u0001\n\u0013\ti\b\u0003\u0005\u0002\u0002\u0002\u0001\u000b\u0011BA@\u0011%\t\u0019\t\u0001b\u0001\n\u0013\t)\t\u0003\u0005\u0002\u000e\u0002\u0001\u000b\u0011BAD\u0011%\ty\t\u0001b\u0001\n\u0013\t)\t\u0003\u0005\u0002\u0012\u0002\u0001\u000b\u0011BAD\u0011\u001d\t\u0019\n\u0001C\u0005\u0003+Cq!a'\u0001\t\u0003\ti\nC\u0004\u00022\u0002!\t!!(\t\u000f\u0005M\u0006\u0001\"\u0003\u00026\"9\u0011\u0011\u001c\u0001\u0005\n\u0005m\u0007\"CAs\u0001\t\u0007I\u0011BAt\u0011!\t\t\u0010\u0001Q\u0001\n\u0005%\bbBAz\u0001\u0011%\u0011Q\u001f\u0005\b\u0003s\u0004A\u0011IA~\u0011\u0019\u0011)\u0001\u0001C!Q\nA1*\u00194lC*{'M\u0003\u0002&M\u0005)1.\u00194lC*\u0011q\u0005K\u0001\u0005g&t7N\u0003\u0002*U\u0005\u0019!n\u001c2\u000b\u0005-b\u0013\u0001C:uCJd\u0017m[3\u000b\u00035\n!!Y5\u0004\u0001M\u0019\u0001\u0001\r\u001c\u0011\u0005E\"T\"\u0001\u001a\u000b\u0003M\nQa]2bY\u0006L!!\u000e\u001a\u0003\r\u0005s\u0017PU3g!\t9$(D\u00019\u0015\tI$&A\u0003vi&d7/\u0003\u0002<q\tA1\u000b]1sW*{'-\u0001\blC\u001a\\\u0017MS8c\u0007>tg-[4\u0016\u0003y\u0002\"a\u0010!\u000e\u0003\u0011J!!\u0011\u0013\u0003\u001d-\u000bgm[1K_\n\u001cuN\u001c4jO\u0006y1.\u00194lC*{'mQ8oM&<\u0007%\u0001\u0005tKR$\u0018N\\4t+\u0005)\u0005C\u0001$J\u001b\u00059%B\u0001%+\u0003\u0019\u0019wN\u001c4jO&\u0011!j\u0012\u0002\t'\u0016$H/\u001b8hg\u0006I1/\u001a;uS:<7\u000fI\u0001\u0007y%t\u0017\u000e\u001e \u0015\u00059\u000bFCA(Q!\ty\u0004\u0001C\u0003D\u000b\u0001\u000fQ\tC\u0003=\u000b\u0001\u0007a(A\u0007tG\",W.\u0019%b]\u0012dWM]\u000b\u0002)B\u0011QKW\u0007\u0002-*\u0011q\u000bW\u0001\tQ\u0006tG\r\\3sg*\u0011\u0011LK\u0001\u0007g\u000eDW-\\1\n\u0005m3&!D*dQ\u0016l\u0017\rS1oI2,'/\u0001\btG\",W.\u0019%b]\u0012dWM\u001d\u0011\u0002\u0017Q|\u0007/[2D_:4\u0017nZ\u000b\u0002?B\u0011\u0001m\u0019\b\u0003\r\u0006L!AY$\u0002\u0011M+G\u000f^5oONL!\u0001Z3\u0003!-\u000bgm[1U_BL7mQ8oM&<'B\u00012H\u00031!x\u000e]5d\u0007>tg-[4!\u0003%1\u0017N\\1m!\u0006$\b.F\u0001j!\tQ\u0017O\u0004\u0002l_B\u0011ANM\u0007\u0002[*\u0011aNL\u0001\u0007yI|w\u000e\u001e \n\u0005A\u0014\u0014A\u0002)sK\u0012,g-\u0003\u0002sg\n11\u000b\u001e:j]\u001eT!\u0001\u001d\u001a\u0002\u0015\u0019Lg.\u00197QCRD\u0007%A\ttG\",W.\u0019*fO&\u001cHO]=Ve2,\u0012a\u001e\t\u0004caT\u0018BA=3\u0005\u0019y\u0005\u000f^5p]B\u00111\u0010`\u0007\u0002\u0001%\u0011QP \u0002\u000f\u0015\u0012\u00147mQ8oM&<g*Y7f\u0013\ty\bHA\u0004K_\n\u0014\u0015m]3\u0002%M\u001c\u0007.Z7b%\u0016<\u0017n\u001d;ssV\u0013H\u000eI\u0001\u0015g\u000eDW-\\1SK\u001eL7\u000f\u001e:z\u00072LWM\u001c;\u0016\u0005\u0005\u001d\u0001\u0003B\u0019y\u0003\u0013\u0001B!a\u0003\u0002 5\u0011\u0011Q\u0002\u0006\u0005\u0003\u001f\t\t\"\u0001\u0004dY&,g\u000e\u001e\u0006\u0005\u0003'\t)\"\u0001\btG\",W.\u0019:fO&\u001cHO]=\u000b\u0007\u0015\n9B\u0003\u0003\u0002\u001a\u0005m\u0011!C2p]\u001adW/\u001a8u\u0015\t\ti\"\u0001\u0002j_&!\u0011\u0011EA\u0007\u0005i\u0019\u0015m\u00195fIN\u001b\u0007.Z7b%\u0016<\u0017n\u001d;ss\u000ec\u0017.\u001a8u\u0003U\u00198\r[3nCJ+w-[:uef\u001cE.[3oi\u0002\n\u0011\u0003\\8pWV\u0004Hk\u001c9jGN\u001b\u0007.Z7b)\u00159\u0018\u0011FA\u0017\u0011\u0019\tY\u0003\u0005a\u0001S\u0006)Ao\u001c9jG\"I\u0011q\u0006\t\u0011\u0002\u0003\u0007\u0011\u0011G\u0001\u0006SN\\U-\u001f\t\u0004c\u0005M\u0012bAA\u001be\t9!i\\8mK\u0006t\u0017a\u00077p_.,\b\u000fV8qS\u000e\u001c6\r[3nC\u0012\"WMZ1vYR$#'\u0006\u0002\u0002<)\"\u0011\u0011GA\u001fW\t\ty\u0004\u0005\u0003\u0002B\u0005-SBAA\"\u0015\u0011\t)%a\u0012\u0002\u0013Ut7\r[3dW\u0016$'bAA%e\u0005Q\u0011M\u001c8pi\u0006$\u0018n\u001c8\n\t\u00055\u00131\t\u0002\u0012k:\u001c\u0007.Z2lK\u00124\u0016M]5b]\u000e,\u0017aF1we>\u001c6\r[3nCR{7\u000b]1sWN\u001b\u0007.Z7b)\u0011\t\u0019&a\u001e\u0011\t\u0005U\u0013\u0011\u000f\b\u0005\u0003/\ni'\u0004\u0002\u0002Z)!\u00111LA/\u0003\u0011\tgO]8\u000b\t\u0005}\u0013\u0011M\u0001\u0004gFd'\u0002BA2\u0003K\nQa\u001d9be.TA!a\u001a\u0002j\u00051\u0011\r]1dQ\u0016T!!a\u001b\u0002\u0007=\u0014x-\u0003\u0003\u0002p\u0005e\u0013\u0001E*dQ\u0016l\u0017mQ8om\u0016\u0014H/\u001a:t\u0013\u0011\t\u0019(!\u001e\u0003\u0015M\u001b\u0007.Z7b)f\u0004XM\u0003\u0003\u0002p\u0005e\u0003BBA=%\u0001\u0007\u0011.\u0001\u0006bmJ|7k\u00195f[\u0006\fQ\u0002\u001a4WC2,XmU2iK6\fWCAA@!\u0011\t\u00040a\u0015\u0002\u001d\u00114g+\u00197vKN\u001b\u0007.Z7bA\u0005aqO]5uK>\u0003H/[8ogV\u0011\u0011q\u0011\t\u0006U\u0006%\u0015.[\u0005\u0004\u0003\u0017\u001b(aA'ba\u0006iqO]5uK>\u0003H/[8og\u0002\nqa\u001c9uS>t7/\u0001\u0005paRLwN\\:!\u0003Uaw.\u00193PaRLwN\\:Ge>l7i\u001c8gS\u001e$B!a\"\u0002\u0018\"1\u0011\u0011T\rA\u0002%\f1bY8oM&<g+\u00197vK\u00069qN\u001a4m_\u0006$GCAAP!\u0019\t\t+a*\u0002,6\u0011\u00111\u0015\u0006\u0004\u0003K\u0013\u0014\u0001B;uS2LA!!+\u0002$\n\u0019AK]=\u0011\u0007]\ni+C\u0002\u00020b\u0012ab\u00159be.TuN\u0019*fgVdG/\u0001\u0003m_\u0006$\u0017!\u00032bi\u000eD7+\u0019<f)\u0011\t9,!6\u0011\t\u0005e\u0016q\u001a\b\u0005\u0003w\u000bYM\u0004\u0003\u0002>\u0006%g\u0002BA`\u0003\u000ftA!!1\u0002F:\u0019A.a1\n\u0005\u0005-\u0014\u0002BA4\u0003SJA!a\u0019\u0002f%!\u0011qLA1\u0013\u0011\ti-!\u0018\u0002\u000fA\f7m[1hK&!\u0011\u0011[Aj\u0005%!\u0015\r^1Ge\u0006lWM\u0003\u0003\u0002N\u0006u\u0003bBAl9\u0001\u0007\u0011qW\u0001\u0003I\u001a\fQb\u001d;sK\u0006lGk\\&bM.\fG\u0003BAo\u0003G\u00042!MAp\u0013\r\t\tO\r\u0002\u0005+:LG\u000fC\u0004\u0002Xv\u0001\r!a.\u0002#Q\u0014\u0018M\\:g_Jl\u0017J\\:uC:\u001cW-\u0006\u0002\u0002jB!\u0011\u0007_Av!\ry\u0014Q^\u0005\u0004\u0003_$#A\u0005#bi\u00064%/Y7f)J\fgn\u001d4pe6\f!\u0003\u001e:b]N4wN]7J]N$\u0018M\\2fA\u0005AAO]1og\u001a|W\u000e\u0006\u0003\u00028\u0006]\bbBAlA\u0001\u0007\u0011qW\u0001\u0004eVtGCAA\u007f!\u0019\t\t+a*\u0002��B\u0019qG!\u0001\n\u0007\t\r\u0001HA\u0005K_\n\u0014Vm];mi\u0006!a.Y7f\u0001")
/* loaded from: input_file:ai/starlake/job/sink/kafka/KafkaJob.class */
public class KafkaJob implements SparkJob {
    private final KafkaJobConfig kafkaJobConfig;
    private final Settings settings;
    private final SchemaHandler schemaHandler;
    private final Settings.KafkaTopicConfig topicConfig;
    private final String finalPath;
    private final Option<String> schemaRegistryUrl;
    private final Option<CachedSchemaRegistryClient> schemaRegistryClient;
    private final Option<SchemaConverters.SchemaType> dfValueSchema;
    private final Map<String, String> writeOptions;
    private final Map<String, String> options;
    private final Option<DataFrameTransform> transformInstance;
    private SparkEnv ai$starlake$utils$SparkJob$$sparkEnv;
    private SparkSession session;
    private final Logger logger;
    private volatile byte bitmap$0;

    @Override // ai.starlake.utils.SparkJob
    public SparkConf withExtraSparkConf(SparkConf sparkConf) {
        SparkConf withExtraSparkConf;
        withExtraSparkConf = withExtraSparkConf(sparkConf);
        return withExtraSparkConf;
    }

    @Override // ai.starlake.utils.SparkJob
    public void registerUdf(String str) {
        registerUdf(str);
    }

    @Override // ai.starlake.utils.SparkJob
    public DataFrameWriter<Row> partitionedDatasetWriter(Dataset<Row> dataset, List<String> list) {
        DataFrameWriter<Row> partitionedDatasetWriter;
        partitionedDatasetWriter = partitionedDatasetWriter(dataset, list);
        return partitionedDatasetWriter;
    }

    @Override // ai.starlake.utils.SparkJob
    public Dataset<Row> partitionDataset(Dataset<Row> dataset, List<String> list) {
        Dataset<Row> partitionDataset;
        partitionDataset = partitionDataset(dataset, list);
        return partitionDataset;
    }

    @Override // ai.starlake.utils.SparkJob
    public Object analyze(String str) {
        Object analyze;
        analyze = analyze(str);
        return analyze;
    }

    @Override // ai.starlake.utils.SparkJob
    public void createSparkViews(Views views, Map<String, String> map, Map<String, String> map2) {
        createSparkViews(views, map, map2);
    }

    @Override // ai.starlake.utils.SparkJob
    public Dataset<Row> createSparkView(SinkType sinkType, Option<String> option, String str) {
        Dataset<Row> createSparkView;
        createSparkView = createSparkView(sinkType, option, str);
        return createSparkView;
    }

    @Override // ai.starlake.utils.JobBase
    public Tuple3<SinkType, Option<String>, String> parseViewDefinition(String str) {
        Tuple3<SinkType, Option<String>, String> parseViewDefinition;
        parseViewDefinition = parseViewDefinition(str);
        return parseViewDefinition;
    }

    @Override // org.apache.spark.sql.DatasetLogging
    public <T> DatasetLogging.DatasetHelper<T> DatasetHelper(Dataset<T> dataset) {
        DatasetLogging.DatasetHelper<T> DatasetHelper;
        DatasetHelper = DatasetHelper(dataset);
        return DatasetHelper;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [ai.starlake.job.sink.kafka.KafkaJob] */
    private SparkEnv ai$starlake$utils$SparkJob$$sparkEnv$lzycompute() {
        SparkEnv ai$starlake$utils$SparkJob$$sparkEnv;
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 1)) == 0) {
                ai$starlake$utils$SparkJob$$sparkEnv = ai$starlake$utils$SparkJob$$sparkEnv();
                this.ai$starlake$utils$SparkJob$$sparkEnv = ai$starlake$utils$SparkJob$$sparkEnv;
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 1);
            }
        }
        return this.ai$starlake$utils$SparkJob$$sparkEnv;
    }

    @Override // ai.starlake.utils.SparkJob
    public SparkEnv ai$starlake$utils$SparkJob$$sparkEnv() {
        return ((byte) (this.bitmap$0 & 1)) == 0 ? ai$starlake$utils$SparkJob$$sparkEnv$lzycompute() : this.ai$starlake$utils$SparkJob$$sparkEnv;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [ai.starlake.job.sink.kafka.KafkaJob] */
    private SparkSession session$lzycompute() {
        SparkSession session;
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 2)) == 0) {
                session = session();
                this.session = session;
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 2);
            }
        }
        return this.session;
    }

    @Override // ai.starlake.utils.SparkJob
    public SparkSession session() {
        return ((byte) (this.bitmap$0 & 2)) == 0 ? session$lzycompute() : this.session;
    }

    public Logger logger() {
        return this.logger;
    }

    public void com$typesafe$scalalogging$StrictLogging$_setter_$logger_$eq(Logger logger) {
        this.logger = logger;
    }

    public KafkaJobConfig kafkaJobConfig() {
        return this.kafkaJobConfig;
    }

    @Override // ai.starlake.utils.JobBase
    public Settings settings() {
        return this.settings;
    }

    public SchemaHandler schemaHandler() {
        return this.schemaHandler;
    }

    private Settings.KafkaTopicConfig topicConfig() {
        return this.topicConfig;
    }

    private String finalPath() {
        return this.finalPath;
    }

    public Option<String> schemaRegistryUrl() {
        return this.schemaRegistryUrl;
    }

    public Option<CachedSchemaRegistryClient> schemaRegistryClient() {
        return this.schemaRegistryClient;
    }

    public Option<String> lookupTopicSchema(String str, boolean z) {
        return schemaRegistryClient().map(cachedSchemaRegistryClient -> {
            return cachedSchemaRegistryClient.getLatestSchemaMetadata(new StringBuilder(0).append(str).append((Object) (z ? "-key" : "-value")).toString()).getSchema();
        });
    }

    public boolean lookupTopicSchema$default$2() {
        return false;
    }

    public SchemaConverters.SchemaType avroSchemaToSparkSchema(String str) {
        return SchemaConverters$.MODULE$.toSqlType(new Schema.Parser().parse(str));
    }

    private Option<SchemaConverters.SchemaType> dfValueSchema() {
        return this.dfValueSchema;
    }

    private Map<String, String> writeOptions() {
        return this.writeOptions;
    }

    private Map<String, String> options() {
        return this.options;
    }

    private Map<String, String> loadOptionsFromConfig(String str) {
        return ((TraversableOnce) ((TraversableLike) ((TraversableLike) JavaConverters$.MODULE$.asScalaSetConverter(settings().extraConf().getConfig(str).entrySet()).asScala()).to(Vector$.MODULE$.canBuildFrom())).map(entry -> {
            return new Tuple2(entry.getKey(), ((ConfigValue) entry.getValue()).unwrapped().toString());
        }, Vector$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
    }

    public Try<SparkJobResult> offload() {
        return Try$.MODULE$.apply(() -> {
            return !this.kafkaJobConfig().streaming() ? (SparkJobResult) Utils$.MODULE$.withResources(() -> {
                return new KafkaClient(this.settings().comet().kafka(), this.settings());
            }, kafkaClient -> {
                Tuple2<Dataset<Row>, List<Tuple2<Object, Object>>> consumeTopicBatch = kafkaClient.consumeTopicBatch(this.kafkaJobConfig().topicConfigName(), this.session(), this.topicConfig());
                if (consumeTopicBatch == null) {
                    throw new MatchError(consumeTopicBatch);
                }
                Tuple2 tuple2 = new Tuple2((Dataset) consumeTopicBatch._1(), (List) consumeTopicBatch._2());
                Dataset<Row> dataset = (Dataset) tuple2._1();
                List<Tuple2<Object, Object>> list = (List) tuple2._2();
                Dataset<Row> batchSave = this.batchSave(dataset);
                kafkaClient.topicSaveOffsets(this.kafkaJobConfig().topicConfigName(), this.topicConfig().allAccessOptions(this.settings().comet().kafka().sparkServerOptions()), list);
                return new SparkJobResult(new Some(batchSave));
            }) : (SparkJobResult) Utils$.MODULE$.withResources(() -> {
                return new KafkaClient(this.settings().comet().kafka(), this.settings());
            }, kafkaClient2 -> {
                this.streamToKafka(kafkaClient2.consumeTopicStreaming(this.session(), this.topicConfig()));
                return new SparkJobResult(None$.MODULE$);
            });
        });
    }

    public Try<SparkJobResult> load() {
        return Try$.MODULE$.apply(() -> {
            return (SparkJobResult) Utils$.MODULE$.withResources(() -> {
                return new KafkaClient(this.settings().comet().kafka(), this.settings());
            }, kafkaClient -> {
                boolean streaming = this.kafkaJobConfig().streaming();
                if (true == streaming) {
                    this.streamToKafka(this.session().readStream().format(this.kafkaJobConfig().format()).options(this.options()).load().selectExpr(this.topicConfig().fields()));
                    return new SparkJobResult(None$.MODULE$);
                }
                if (false != streaming) {
                    throw new MatchError(BoxesRunTime.boxToBoolean(streaming));
                }
                Dataset<Row> transfom = this.transfom(this.session().read().format(this.kafkaJobConfig().format()).load(Predef$.MODULE$.wrapRefArray(new StringOps(Predef$.MODULE$.augmentString(this.finalPath())).split(','))));
                kafkaClient.sinkToTopic(this.topicConfig(), transfom);
                return new SparkJobResult(new Some(transfom));
            });
        });
    }

    private Dataset<Row> batchSave(Dataset<Row> dataset) {
        Dataset<Row> repartition;
        Dataset<Row> transfom = transfom(dataset);
        Some coalesce = kafkaJobConfig().coalesce();
        if (None$.MODULE$.equals(coalesce)) {
            repartition = transfom;
        } else {
            if (!(coalesce instanceof Some)) {
                throw new MatchError(coalesce);
            }
            repartition = transfom.repartition(BoxesRunTime.unboxToInt(coalesce.value()));
        }
        Dataset<Row> dataset2 = repartition;
        if (logger().underlying().isInfoEnabled()) {
            logger().underlying().info("Saving to {}", new Object[]{kafkaJobConfig()});
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        dataset2.write().mode(kafkaJobConfig().mode()).format(kafkaJobConfig().format()).options(writeOptions()).save(finalPath());
        if (logger().underlying().isInfoEnabled()) {
            logger().underlying().info("Kafka saved messages to offload -> {}", new Object[]{finalPath()});
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
        }
        Some coalesce2 = kafkaJobConfig().coalesce();
        if ((coalesce2 instanceof Some) && 1 == BoxesRunTime.unboxToInt(coalesce2.value())) {
            Path path = new Path(finalPath());
            Path path2 = (Path) ((IterableLike) settings().storageHandler().list(path, settings().storageHandler().list$default$2(), settings().storageHandler().list$default$3(), false, settings().storageHandler().list$default$5()).filter(path3 -> {
                return BoxesRunTime.boxToBoolean($anonfun$batchSave$1(path3));
            })).head();
            Path path4 = new Path(new StringBuilder(4).append(path.toString()).append(".tmp").toString());
            if (settings().storageHandler().move(path2, path4)) {
                settings().storageHandler().delete(path);
                BoxesRunTime.boxToBoolean(settings().storageHandler().move(path4, path));
            } else {
                BoxedUnit boxedUnit5 = BoxedUnit.UNIT;
            }
        } else {
            BoxedUnit boxedUnit6 = BoxedUnit.UNIT;
        }
        return transfom;
    }

    private void streamToKafka(Dataset<Row> dataset) {
        DataStreamWriter dataStreamWriter;
        DataStreamWriter options = transfom(dataset).writeStream().outputMode(kafkaJobConfig().streamingWriteMode()).format(kafkaJobConfig().streamingWriteFormat()).options(writeOptions());
        Some map = kafkaJobConfig().streamingTrigger().map(str -> {
            return str.toLowerCase();
        }).map(str2 -> {
            if ("once".equals(str2)) {
                return Trigger.Once();
            }
            if ("processingtime".equals(str2)) {
                return Trigger.ProcessingTime(this.kafkaJobConfig().streamingTriggerOption());
            }
            if ("continuous".equals(str2)) {
                return Trigger.Continuous(this.kafkaJobConfig().streamingTriggerOption());
            }
            throw new MatchError(str2);
        });
        if (map instanceof Some) {
            dataStreamWriter = options.trigger((Trigger) map.value());
        } else {
            if (!None$.MODULE$.equals(map)) {
                throw new MatchError(map);
            }
            dataStreamWriter = options;
        }
        DataStreamWriter dataStreamWriter2 = dataStreamWriter;
        Seq<String> streamingWritePartitionBy = kafkaJobConfig().streamingWritePartitionBy();
        DataStreamWriter partitionBy = Nil$.MODULE$.equals(streamingWritePartitionBy) ? dataStreamWriter2 : dataStreamWriter2.partitionBy(streamingWritePartitionBy);
        if (kafkaJobConfig().streamingWriteToTable()) {
            throw new Exception("streamingWriteToTable Not Supported");
        }
        partitionBy.start(finalPath()).awaitTermination();
    }

    private Option<DataFrameTransform> transformInstance() {
        return this.transformInstance;
    }

    private Dataset<Row> transfom(Dataset<Row> dataset) {
        Dataset<Row> dataset2;
        Some transformInstance = transformInstance();
        if (transformInstance instanceof Some) {
            dataset2 = ((DataFrameTransform) transformInstance.value()).transform(dataset, session());
        } else {
            if (!None$.MODULE$.equals(transformInstance)) {
                throw new MatchError(transformInstance);
            }
            dataset2 = dataset;
        }
        return dataset2;
    }

    @Override // ai.starlake.utils.JobBase
    public Try<JobResult> run() {
        settings().comet().kafka().customDeserializer().foreach(str -> {
            CustomDeserializer$.MODULE$.configure(str, this.settings().comet().kafka().serverOptions());
            String str = this.topicConfig().topicName();
            UDFRegistration udf = this.session().udf();
            Function1 function1 = bArr -> {
                return CustomDeserializer$.MODULE$.deserialize(str, bArr);
            };
            TypeTags universe = package$.MODULE$.universe();
            final KafkaJob kafkaJob = null;
            TypeTags.TypeTag apply = universe.TypeTag().apply(package$.MODULE$.universe().runtimeMirror(KafkaJob.class.getClassLoader()), new TypeCreator(kafkaJob) { // from class: ai.starlake.job.sink.kafka.KafkaJob$$typecreator1$1
                public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                    Universe universe2 = mirror.universe();
                    return universe2.internal().reificationSupport().TypeRef(universe2.internal().reificationSupport().SingleType(mirror.staticPackage("scala").asModule().moduleClass().asType().toTypeConstructor(), mirror.staticModule("scala.Predef")), universe2.internal().reificationSupport().selectType(mirror.staticModule("scala.Predef").asModule().moduleClass(), "String"), Nil$.MODULE$);
                }
            });
            TypeTags universe2 = package$.MODULE$.universe();
            final KafkaJob kafkaJob2 = null;
            return udf.register("deserialize", function1, apply, universe2.TypeTag().apply(package$.MODULE$.universe().runtimeMirror(KafkaJob.class.getClassLoader()), new TypeCreator(kafkaJob2) { // from class: ai.starlake.job.sink.kafka.KafkaJob$$typecreator2$1
                public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                    Universe universe3 = mirror.universe();
                    return universe3.internal().reificationSupport().TypeRef(universe3.internal().reificationSupport().ThisType(mirror.staticPackage("scala").asModule().moduleClass()), mirror.staticClass("scala.Array"), new $colon.colon(mirror.staticClass("scala.Byte").asType().toTypeConstructor(), Nil$.MODULE$));
                }
            }));
        });
        return kafkaJobConfig().offload() ? offload() : load();
    }

    @Override // ai.starlake.utils.JobBase
    public String name() {
        return String.valueOf(kafkaJobConfig().topicConfigName());
    }

    public static final /* synthetic */ boolean $anonfun$batchSave$1(Path path) {
        return path.getName().startsWith("part-");
    }

    public KafkaJob(KafkaJobConfig kafkaJobConfig, Settings settings) {
        Map<String, String> writeOptions;
        Map<String, String> options;
        this.kafkaJobConfig = kafkaJobConfig;
        this.settings = settings;
        StrictLogging.$init$(this);
        DatasetLogging.$init$(this);
        JobBase.$init$((JobBase) this);
        SparkJob.$init$((SparkJob) this);
        DatasetArea$.MODULE$.initMetadata(settings.metadataStorageHandler(), settings);
        this.schemaHandler = new SchemaHandler(settings.metadataStorageHandler(), settings);
        this.topicConfig = (Settings.KafkaTopicConfig) settings.comet().kafka().topics().apply(kafkaJobConfig.topicConfigName());
        this.finalPath = Formatter$.MODULE$.RichFormatter(kafkaJobConfig.path()).richFormat(schemaHandler().activeEnv(), (Map) Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("config"), kafkaJobConfig.topicConfigName()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("topic"), topicConfig().topicName())})), settings);
        this.schemaRegistryUrl = settings.comet().kafka().serverOptions().get("schema.registry.url");
        this.schemaRegistryClient = schemaRegistryUrl().map(str -> {
            return new CachedSchemaRegistryClient(str, 128, (java.util.Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(this.settings().comet().kafka().serverOptions()).asJava());
        });
        this.dfValueSchema = lookupTopicSchema(topicConfig().topicName(), lookupTopicSchema$default$2()).map(str2 -> {
            return this.avroSchemaToSparkSchema(str2);
        });
        Some some = kafkaJobConfig.writeOptions().get("config");
        if (some instanceof Some) {
            writeOptions = loadOptionsFromConfig((String) some.value());
        } else {
            if (!None$.MODULE$.equals(some)) {
                throw new MatchError(some);
            }
            writeOptions = kafkaJobConfig.writeOptions();
        }
        this.writeOptions = writeOptions;
        Some some2 = kafkaJobConfig.options().get("config");
        if (some2 instanceof Some) {
            options = loadOptionsFromConfig((String) some2.value());
        } else {
            if (!None$.MODULE$.equals(some2)) {
                throw new MatchError(some2);
            }
            options = kafkaJobConfig.options();
        }
        this.options = options;
        this.transformInstance = kafkaJobConfig.transform().map(str3 -> {
            return (DataFrameTransform) Utils$.MODULE$.loadInstance(str3);
        }).map(dataFrameTransform -> {
            return dataFrameTransform.configure(this.topicConfig());
        });
    }
}
