package ai.starlake.job.sink.kafka;

import ai.starlake.config.Settings;
import ai.starlake.config.SparkEnv;
import ai.starlake.job.sink.DataFrameTransform;
import ai.starlake.job.sink.DataFrameTransform$;
import ai.starlake.schema.handlers.SchemaHandler;
import ai.starlake.schema.handlers.StorageHandler;
import ai.starlake.utils.Formatter$;
import ai.starlake.utils.JobResult;
import ai.starlake.utils.SparkJob;
import ai.starlake.utils.SparkJobResult;
import ai.starlake.utils.Utils$;
import ai.starlake.utils.kafka.KafkaClient;
import ai.starlake.utils.kafka.KafkaClient$;
import com.typesafe.config.ConfigValue;
import com.typesafe.scalalogging.Logger;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.DatasetLogging;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.UDFRegistration;
import org.apache.spark.sql.avro.SchemaConverters;
import org.apache.spark.sql.avro.SchemaConverters$;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.JavaConverters$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.$colon;
import scala.collection.immutable.List;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.StringOps;
import scala.collection.immutable.Vector$;
import scala.reflect.ScalaSignature;
import scala.reflect.api.Mirror;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.TypeTags;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.reflect.runtime.package$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Try;
import scala.util.Try$;

/* compiled from: KafkaJob.scala */
@ScalaSignature(bytes = "\u0006\u0001\t]a\u0001B\u0013'\u0001EB\u0001B\u0010\u0001\u0003\u0006\u0004%\ta\u0010\u0005\t\t\u0002\u0011\t\u0011)A\u0005\u0001\"AQ\t\u0001BC\u0002\u0013\ra\t\u0003\u0005N\u0001\t\u0005\t\u0015!\u0003H\u0011\u0015q\u0005\u0001\"\u0001P\u0011\u001d!\u0006A1A\u0005\u0002UCaA\u0018\u0001!\u0002\u00131\u0006bB0\u0001\u0005\u0004%I\u0001\u0019\u0005\u0007W\u0002\u0001\u000b\u0011B1\t\u000f1\u0004!\u0019!C\u0005A\"1Q\u000e\u0001Q\u0001\n\u0005DqA\u001c\u0001C\u0002\u0013%q\u000e\u0003\u0004}\u0001\u0001\u0006I\u0001\u001d\u0005\b{\u0002\u0011\r\u0011\"\u0003p\u0011\u0019q\b\u0001)A\u0005a\"1q\u0010\u0001C\u0005\u0003\u0003A\u0001\"a\u0002\u0001\u0005\u0004%\ta\u001c\u0005\b\u0003\u0013\u0001\u0001\u0015!\u0003q\u0011%\tY\u0001\u0001b\u0001\n\u0003\ti\u0001\u0003\u0005\u0002,\u0001\u0001\u000b\u0011BA\b\u0011\u001d\ti\u0003\u0001C\u0001\u0003_A\u0011\"a\u0010\u0001#\u0003%\t!!\u0011\t\u000f\u0005]\u0003\u0001\"\u0001\u0002Z!I\u00111\u0011\u0001C\u0002\u0013%\u0011Q\u0011\u0005\t\u0003/\u0003\u0001\u0015!\u0003\u0002\b\"I\u0011\u0011\u0014\u0001C\u0002\u0013%\u0011Q\u0011\u0005\t\u00037\u0003\u0001\u0015!\u0003\u0002\b\"9\u0011Q\u0014\u0001\u0005\n\u0005}\u0005bBAU\u0001\u0011\u0005\u00111\u0016\u0005\b\u0003\u007f\u0003A\u0011BAa\u0011\u001d\t)\u000f\u0001C\u0005\u0003ODq!a;\u0001\t\u0013\ti\u000fC\u0005\u0002x\u0002\u0011\r\u0011\"\u0003\u0002z\"A!Q\u0001\u0001!\u0002\u0013\tY\u0010C\u0004\u0003\b\u0001!\tE!\u0003\t\u000f\tM\u0001\u0001\"\u0011\u0003\u0016\tA1*\u00194lC*{'M\u0003\u0002(Q\u0005)1.\u00194lC*\u0011\u0011FK\u0001\u0005g&t7N\u0003\u0002,Y\u0005\u0019!n\u001c2\u000b\u00055r\u0013\u0001C:uCJd\u0017m[3\u000b\u0003=\n!!Y5\u0004\u0001M\u0019\u0001A\r\u001d\u0011\u0005M2T\"\u0001\u001b\u000b\u0003U\nQa]2bY\u0006L!a\u000e\u001b\u0003\r\u0005s\u0017PU3g!\tID(D\u0001;\u0015\tYD&A\u0003vi&d7/\u0003\u0002>u\tA1\u000b]1sW*{'-\u0001\blC\u001a\\\u0017MS8c\u0007>tg-[4\u0016\u0003\u0001\u0003\"!\u0011\"\u000e\u0003\u0019J!a\u0011\u0014\u0003\u001d-\u000bgm[1K_\n\u001cuN\u001c4jO\u0006y1.\u00194lC*{'mQ8oM&<\u0007%\u0001\u0005tKR$\u0018N\\4t+\u00059\u0005C\u0001%L\u001b\u0005I%B\u0001&-\u0003\u0019\u0019wN\u001c4jO&\u0011A*\u0013\u0002\t'\u0016$H/\u001b8hg\u0006I1/\u001a;uS:<7\u000fI\u0001\u0007y%t\u0017\u000e\u001e \u0015\u0005A\u001bFCA)S!\t\t\u0005\u0001C\u0003F\u000b\u0001\u000fq\tC\u0003?\u000b\u0001\u0007\u0001)A\u0007tG\",W.\u0019%b]\u0012dWM]\u000b\u0002-B\u0011q\u000bX\u0007\u00021*\u0011\u0011LW\u0001\tQ\u0006tG\r\\3sg*\u00111\fL\u0001\u0007g\u000eDW-\\1\n\u0005uC&!D*dQ\u0016l\u0017\rS1oI2,'/\u0001\btG\",W.\u0019%b]\u0012dWM\u001d\u0011\u0002\u0017Q|\u0007/[2D_:4\u0017nZ\u000b\u0002CB\u00191G\u00193\n\u0005\r$$AB(qi&|g\u000e\u0005\u0002fQ:\u0011\u0001JZ\u0005\u0003O&\u000b\u0001bU3ui&twm]\u0005\u0003S*\u0014\u0001cS1gW\u0006$v\u000e]5d\u0007>tg-[4\u000b\u0005\u001dL\u0015\u0001\u0004;pa&\u001c7i\u001c8gS\u001e\u0004\u0013\u0001E<sSR,Gk\u001c9jG\u000e{gNZ5h\u0003E9(/\u001b;f)>\u0004\u0018nY\"p]\u001aLw\rI\u0001\u000fM&t\u0017\r\\,sSR,\u0007+\u0019;i+\u0005\u0001\bcA\u001accB\u0011!/\u001f\b\u0003g^\u0004\"\u0001\u001e\u001b\u000e\u0003UT!A\u001e\u0019\u0002\rq\u0012xn\u001c;?\u0013\tAH'\u0001\u0004Qe\u0016$WMZ\u0005\u0003un\u0014aa\u0015;sS:<'B\u0001=5\u0003=1\u0017N\\1m/JLG/\u001a)bi\"\u0004\u0013!\u00044j]\u0006dGj\\1e!\u0006$\b.\u0001\bgS:\fG\u000eT8bIB\u000bG\u000f\u001b\u0011\u0002\u0015\u0019|'/\\1u!\u0006$\b\u000eF\u0002q\u0003\u0007Aa!!\u0002\u0011\u0001\u0004\u0001\u0018\u0001\u00029bi\"\f\u0011c]2iK6\f'+Z4jgR\u0014\u00180\u0016:m\u0003I\u00198\r[3nCJ+w-[:uef,&\u000f\u001c\u0011\u0002)M\u001c\u0007.Z7b%\u0016<\u0017n\u001d;ss\u000ec\u0017.\u001a8u+\t\ty\u0001\u0005\u00034E\u0006E\u0001\u0003BA\n\u0003Oi!!!\u0006\u000b\t\u0005]\u0011\u0011D\u0001\u0007G2LWM\u001c;\u000b\t\u0005m\u0011QD\u0001\u000fg\u000eDW-\\1sK\u001eL7\u000f\u001e:z\u0015\r9\u0013q\u0004\u0006\u0005\u0003C\t\u0019#A\u0005d_:4G.^3oi*\u0011\u0011QE\u0001\u0003S>LA!!\u000b\u0002\u0016\tQ2)Y2iK\u0012\u001c6\r[3nCJ+w-[:uef\u001cE.[3oi\u0006)2o\u00195f[\u0006\u0014VmZ5tiJL8\t\\5f]R\u0004\u0013!\u00057p_.,\b\u000fV8qS\u000e\u001c6\r[3nCR)\u0001/!\r\u00026!1\u00111G\u000bA\u0002E\fQ\u0001^8qS\u000eD\u0011\"a\u000e\u0016!\u0003\u0005\r!!\u000f\u0002\u000b%\u001c8*Z=\u0011\u0007M\nY$C\u0002\u0002>Q\u0012qAQ8pY\u0016\fg.A\u000em_>\\W\u000f\u001d+pa&\u001c7k\u00195f[\u0006$C-\u001a4bk2$HEM\u000b\u0003\u0003\u0007RC!!\u000f\u0002F-\u0012\u0011q\t\t\u0005\u0003\u0013\n\u0019&\u0004\u0002\u0002L)!\u0011QJA(\u0003%)hn\u00195fG.,GMC\u0002\u0002RQ\n!\"\u00198o_R\fG/[8o\u0013\u0011\t)&a\u0013\u0003#Ut7\r[3dW\u0016$g+\u0019:jC:\u001cW-A\fbmJ|7k\u00195f[\u0006$vn\u00159be.\u001c6\r[3nCR!\u00111LA@!\u0011\ti&!\u001f\u000f\t\u0005}\u0013QO\u0007\u0003\u0003CRA!a\u0019\u0002f\u0005!\u0011M\u001e:p\u0015\u0011\t9'!\u001b\u0002\u0007M\fHN\u0003\u0003\u0002l\u00055\u0014!B:qCJ\\'\u0002BA8\u0003c\na!\u00199bG\",'BAA:\u0003\ry'oZ\u0005\u0005\u0003o\n\t'\u0001\tTG\",W.Y\"p]Z,'\u000f^3sg&!\u00111PA?\u0005)\u00196\r[3nCRK\b/\u001a\u0006\u0005\u0003o\n\t\u0007\u0003\u0004\u0002\u0002^\u0001\r!]\u0001\u000bCZ\u0014xnU2iK6\f\u0017\u0001D<sSR,w\n\u001d;j_:\u001cXCAAD!\u0019\tI)a%rc6\u0011\u00111\u0012\u0006\u0005\u0003\u001b\u000by)A\u0005j[6,H/\u00192mK*\u0019\u0011\u0011\u0013\u001b\u0002\u0015\r|G\u000e\\3di&|g.\u0003\u0003\u0002\u0016\u0006-%aA'ba\u0006iqO]5uK>\u0003H/[8og\u0002\nqa\u001c9uS>t7/\u0001\u0005paRLwN\\:!\u0003Uaw.\u00193PaRLwN\\:Ge>l7i\u001c8gS\u001e$B!!)\u0002&B)!/a)rc&\u0019\u0011QS>\t\r\u0005\u001dF\u00041\u0001r\u0003-\u0019wN\u001c4jOZ\u000bG.^3\u0002\u0011AL\u0007/\u001a7j]\u0016$\"!!,\u0011\r\u0005=\u0016QWA]\u001b\t\t\tLC\u0002\u00024R\nA!\u001e;jY&!\u0011qWAY\u0005\r!&/\u001f\t\u0004s\u0005m\u0016bAA_u\tq1\u000b]1sW*{'MU3tk2$\u0018!\u00032bi\u000eD7+\u0019<f)\u0011\t\u0019-!9\u0011\t\u0005\u0015\u00171\u001c\b\u0005\u0003\u000f\f9N\u0004\u0003\u0002J\u0006Ug\u0002BAf\u0003'tA!!4\u0002R:\u0019A/a4\n\u0005\u0005M\u0014\u0002BA8\u0003cJA!a\u001b\u0002n%!\u0011qMA5\u0013\u0011\tI.!\u001a\u0002\u000fA\f7m[1hK&!\u0011Q\\Ap\u0005%!\u0015\r^1Ge\u0006lWM\u0003\u0003\u0002Z\u0006\u0015\u0004bBAr=\u0001\u0007\u00111Y\u0001\u0003I\u001a\f\u0011\u0002\u001e:b]N4wN]7\u0015\t\u0005\r\u0017\u0011\u001e\u0005\b\u0003G|\u0002\u0019AAb\u000399(/\u001b;f'R\u0014X-Y7j]\u001e$B!a<\u0002vB\u00191'!=\n\u0007\u0005MHG\u0001\u0003V]&$\bbBArA\u0001\u0007\u00111Y\u0001\u0012iJ\fgn\u001d4pe6Len\u001d;b]\u000e,WCAA~!\u0011\u0019$-!@\u0011\t\u0005}(\u0011A\u0007\u0002Q%\u0019!1\u0001\u0015\u0003%\u0011\u000bG/\u0019$sC6,GK]1og\u001a|'/\\\u0001\u0013iJ\fgn\u001d4pe6Len\u001d;b]\u000e,\u0007%A\u0002sk:$\"Aa\u0003\u0011\r\u0005=\u0016Q\u0017B\u0007!\rI$qB\u0005\u0004\u0005#Q$!\u0003&pEJ+7/\u001e7u\u0003\u0011q\u0017-\\3\u0016\u0003E\u0004")
/* loaded from: input_file:ai/starlake/job/sink/kafka/KafkaJob.class */
public class KafkaJob implements SparkJob {
    private final KafkaJobConfig kafkaJobConfig;
    private final Settings settings;
    private final SchemaHandler schemaHandler;
    private final Option<Settings.KafkaTopicConfig> topicConfig;
    private final Option<Settings.KafkaTopicConfig> writeTopicConfig;
    private final Option<String> finalWritePath;
    private final Option<String> finalLoadPath;
    private final Option<String> schemaRegistryUrl;
    private final Option<CachedSchemaRegistryClient> schemaRegistryClient;
    private final Map<String, String> writeOptions;
    private final Map<String, String> options;
    private final Option<DataFrameTransform> transformInstance;
    private SparkEnv ai$starlake$utils$SparkJob$$sparkEnv;
    private SparkSession session;
    private Option<SparkSession> optionalAuditSession;
    private final String appName;
    private final Logger logger;
    private volatile byte bitmap$0;

    @Override // ai.starlake.utils.SparkJob
    public SparkConf withExtraSparkConf(SparkConf sparkConf) {
        SparkConf withExtraSparkConf;
        withExtraSparkConf = withExtraSparkConf(sparkConf);
        return withExtraSparkConf;
    }

    @Override // ai.starlake.utils.SparkJob
    public void registerUdf(String str) {
        registerUdf(str);
    }

    @Override // ai.starlake.utils.SparkJob
    public DataFrameWriter<Row> partitionedDatasetWriter(Dataset<Row> dataset, List<String> list) {
        DataFrameWriter<Row> partitionedDatasetWriter;
        partitionedDatasetWriter = partitionedDatasetWriter(dataset, list);
        return partitionedDatasetWriter;
    }

    @Override // ai.starlake.utils.SparkJob
    public Dataset<Row> partitionDataset(Dataset<Row> dataset, List<String> list) {
        Dataset<Row> partitionDataset;
        partitionDataset = partitionDataset(dataset, list);
        return partitionDataset;
    }

    @Override // ai.starlake.utils.SparkJob
    public Object analyze(String str) {
        Object analyze;
        analyze = analyze(str);
        return analyze;
    }

    @Override // ai.starlake.utils.JobBase
    public String applicationId() {
        String applicationId;
        applicationId = applicationId();
        return applicationId;
    }

    @Override // org.apache.spark.sql.DatasetLogging
    public <T> DatasetLogging.DatasetHelper<T> DatasetHelper(Dataset<T> dataset) {
        DatasetLogging.DatasetHelper<T> DatasetHelper;
        DatasetHelper = DatasetHelper(dataset);
        return DatasetHelper;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [ai.starlake.job.sink.kafka.KafkaJob] */
    private SparkEnv ai$starlake$utils$SparkJob$$sparkEnv$lzycompute() {
        SparkEnv ai$starlake$utils$SparkJob$$sparkEnv;
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 1)) == 0) {
                ai$starlake$utils$SparkJob$$sparkEnv = ai$starlake$utils$SparkJob$$sparkEnv();
                this.ai$starlake$utils$SparkJob$$sparkEnv = ai$starlake$utils$SparkJob$$sparkEnv;
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 1);
            }
        }
        return this.ai$starlake$utils$SparkJob$$sparkEnv;
    }

    @Override // ai.starlake.utils.SparkJob
    public SparkEnv ai$starlake$utils$SparkJob$$sparkEnv() {
        return ((byte) (this.bitmap$0 & 1)) == 0 ? ai$starlake$utils$SparkJob$$sparkEnv$lzycompute() : this.ai$starlake$utils$SparkJob$$sparkEnv;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [ai.starlake.job.sink.kafka.KafkaJob] */
    private SparkSession session$lzycompute() {
        SparkSession session;
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 2)) == 0) {
                session = session();
                this.session = session;
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 2);
            }
        }
        return this.session;
    }

    @Override // ai.starlake.utils.SparkJob
    public SparkSession session() {
        return ((byte) (this.bitmap$0 & 2)) == 0 ? session$lzycompute() : this.session;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [ai.starlake.job.sink.kafka.KafkaJob] */
    private Option<SparkSession> optionalAuditSession$lzycompute() {
        Option<SparkSession> optionalAuditSession;
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 4)) == 0) {
                optionalAuditSession = optionalAuditSession();
                this.optionalAuditSession = optionalAuditSession;
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 4);
            }
        }
        return this.optionalAuditSession;
    }

    @Override // ai.starlake.utils.SparkJob
    public Option<SparkSession> optionalAuditSession() {
        return ((byte) (this.bitmap$0 & 4)) == 0 ? optionalAuditSession$lzycompute() : this.optionalAuditSession;
    }

    @Override // ai.starlake.utils.JobBase
    public String appName() {
        return this.appName;
    }

    @Override // ai.starlake.utils.JobBase
    public void ai$starlake$utils$JobBase$_setter_$appName_$eq(String str) {
        this.appName = str;
    }

    public Logger logger() {
        return this.logger;
    }

    public void com$typesafe$scalalogging$StrictLogging$_setter_$logger_$eq(Logger logger) {
        this.logger = logger;
    }

    public KafkaJobConfig kafkaJobConfig() {
        return this.kafkaJobConfig;
    }

    @Override // ai.starlake.utils.JobBase
    public Settings settings() {
        return this.settings;
    }

    public SchemaHandler schemaHandler() {
        return this.schemaHandler;
    }

    private Option<Settings.KafkaTopicConfig> topicConfig() {
        return this.topicConfig;
    }

    private Option<Settings.KafkaTopicConfig> writeTopicConfig() {
        return this.writeTopicConfig;
    }

    private Option<String> finalWritePath() {
        return this.finalWritePath;
    }

    private Option<String> finalLoadPath() {
        return this.finalLoadPath;
    }

    private Option<String> formatPath(Option<String> option) {
        return option.map(str -> {
            return Formatter$.MODULE$.RichFormatter(str).richFormat(this.schemaHandler().activeEnvVars(this.schemaHandler().activeEnvVars$default$1()), (Map) Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("config"), this.kafkaJobConfig().topicConfigName().getOrElse(() -> {
                return "";
            })), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("topic"), this.topicConfig().map(kafkaTopicConfig -> {
                return kafkaTopicConfig.topicName();
            }).getOrElse(() -> {
                return "";
            }))})), this.settings());
        });
    }

    public Option<String> schemaRegistryUrl() {
        return this.schemaRegistryUrl;
    }

    public Option<CachedSchemaRegistryClient> schemaRegistryClient() {
        return this.schemaRegistryClient;
    }

    public Option<String> lookupTopicSchema(String str, boolean z) {
        return schemaRegistryClient().map(cachedSchemaRegistryClient -> {
            return cachedSchemaRegistryClient.getLatestSchemaMetadata(new StringBuilder(0).append(str).append((Object) (z ? "-key" : "-value")).toString()).getSchema();
        });
    }

    public boolean lookupTopicSchema$default$2() {
        return false;
    }

    public SchemaConverters.SchemaType avroSchemaToSparkSchema(String str) {
        return SchemaConverters$.MODULE$.toSqlType(new Schema.Parser().parse(str));
    }

    private Map<String, String> writeOptions() {
        return this.writeOptions;
    }

    private Map<String, String> options() {
        return this.options;
    }

    private Map<String, String> loadOptionsFromConfig(String str) {
        return ((TraversableOnce) ((TraversableLike) ((TraversableLike) JavaConverters$.MODULE$.asScalaSetConverter(settings().extraConf().getConfig(str).entrySet()).asScala()).to(Vector$.MODULE$.canBuildFrom())).map(entry -> {
            return new Tuple2(entry.getKey(), ((ConfigValue) entry.getValue()).unwrapped().toString());
        }, Vector$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
    }

    public Try<SparkJobResult> pipeline() {
        return Try$.MODULE$.apply(() -> {
            Some some = this.topicConfig();
            if (some instanceof Some) {
                Settings.KafkaTopicConfig kafkaTopicConfig = (Settings.KafkaTopicConfig) some.value();
                if (!this.kafkaJobConfig().streaming()) {
                    return (SparkJobResult) Utils$.MODULE$.withResources(() -> {
                        return new KafkaClient(this.settings().appConfig().kafka(), this.settings());
                    }, kafkaClient -> {
                        Tuple2<Dataset<Row>, List<Tuple2<Object, Object>>> consumeTopicBatch = kafkaClient.consumeTopicBatch((String) this.kafkaJobConfig().topicConfigName().getOrElse(() -> {
                            return "";
                        }), this.session(), kafkaTopicConfig);
                        if (consumeTopicBatch == null) {
                            throw new MatchError(consumeTopicBatch);
                        }
                        Tuple2 tuple2 = new Tuple2((Dataset) consumeTopicBatch._1(), (List) consumeTopicBatch._2());
                        Dataset<Row> dataset = (Dataset) tuple2._1();
                        List<Tuple2<Object, Object>> list = (List) tuple2._2();
                        Dataset<Row> batchSave = this.batchSave(this.transform(dataset));
                        kafkaClient.topicSaveOffsets((String) this.kafkaJobConfig().topicConfigName().getOrElse(() -> {
                            return "";
                        }), kafkaTopicConfig.allAccessOptions(this.settings()), list);
                        return new SparkJobResult(new Some(batchSave));
                    });
                }
                this.writeStreaming(this.transform(KafkaClient$.MODULE$.consumeTopicStreaming(this.session(), kafkaTopicConfig, this.settings())));
                return new SparkJobResult(None$.MODULE$);
            }
            if (!None$.MODULE$.equals(some)) {
                throw new MatchError(some);
            }
            if (this.kafkaJobConfig().streaming()) {
                Predef$ predef$ = Predef$.MODULE$;
                String format = this.kafkaJobConfig().format();
                predef$.assert(format != null ? !format.equals("kafka") : "kafka" != 0);
                this.writeStreaming(this.transform(this.session().readStream().format(this.kafkaJobConfig().format()).options(this.options()).load()));
                return new SparkJobResult(None$.MODULE$);
            }
            Predef$.MODULE$.assert(this.kafkaJobConfig().path().isDefined());
            Dataset<Row> transform = this.transform(this.session().read().format(this.kafkaJobConfig().format()).load(Predef$.MODULE$.wrapRefArray(new StringOps(Predef$.MODULE$.augmentString((String) this.finalLoadPath().getOrElse(() -> {
                throw new Exception("Load path should be set in config");
            }))).split(','))));
            Tuple2 tuple2 = new Tuple2(this.kafkaJobConfig().writeFormat(), this.writeTopicConfig());
            if (tuple2 != null) {
                String str = (String) tuple2._1();
                Some some2 = (Option) tuple2._2();
                if ("kafka".equals(str) && (some2 instanceof Some)) {
                    Settings.KafkaTopicConfig kafkaTopicConfig2 = (Settings.KafkaTopicConfig) some2.value();
                    Utils$.MODULE$.withResources(() -> {
                        return new KafkaClient(this.settings().appConfig().kafka(), this.settings());
                    }, kafkaClient2 -> {
                        kafkaClient2.sinkToTopic(kafkaTopicConfig2, transform);
                        return BoxedUnit.UNIT;
                    });
                    return new SparkJobResult(new Some(transform));
                }
            }
            this.batchSave(transform);
            return new SparkJobResult(new Some(transform));
        });
    }

    private Dataset<Row> batchSave(Dataset<Row> dataset) {
        Dataset<Row> repartition;
        Some coalesce = kafkaJobConfig().coalesce();
        if (None$.MODULE$.equals(coalesce)) {
            repartition = dataset;
        } else {
            if (!(coalesce instanceof Some)) {
                throw new MatchError(coalesce);
            }
            repartition = dataset.repartition(BoxesRunTime.unboxToInt(coalesce.value()));
        }
        Dataset<Row> dataset2 = repartition;
        if (logger().underlying().isInfoEnabled()) {
            logger().underlying().info("Saving to {}", new Object[]{kafkaJobConfig()});
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        String writeFormat = kafkaJobConfig().writeFormat();
        DataFrameWriter options = dataset2.write().mode(kafkaJobConfig().writeMode()).format(kafkaJobConfig().writeFormat()).options(((writeFormat != null ? !writeFormat.equals("kafka") : "kafka" != 0) ? Predef$.MODULE$.Map().empty() : (Map) writeTopicConfig().map(kafkaTopicConfig -> {
            return kafkaTopicConfig.allAccessOptions(this.settings());
        }).getOrElse(() -> {
            return Predef$.MODULE$.Map().empty();
        })).$plus$plus(writeOptions()));
        Some finalWritePath = finalWritePath();
        if (None$.MODULE$.equals(finalWritePath)) {
            options.save();
            BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
        } else {
            if (!(finalWritePath instanceof Some)) {
                throw new MatchError(finalWritePath);
            }
            options.save((String) finalWritePath.value());
            BoxedUnit boxedUnit4 = BoxedUnit.UNIT;
        }
        if (logger().underlying().isInfoEnabled()) {
            logger().underlying().info("Kafka saved messages to offload -> {}", new Object[]{finalWritePath()});
            BoxedUnit boxedUnit5 = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit6 = BoxedUnit.UNIT;
        }
        Tuple2 tuple2 = new Tuple2(kafkaJobConfig().coalesce(), finalWritePath());
        if (tuple2 != null) {
            Some some = (Option) tuple2._1();
            Some some2 = (Option) tuple2._2();
            if ((some instanceof Some) && 1 == BoxesRunTime.unboxToInt(some.value()) && (some2 instanceof Some)) {
                Path path = new Path((String) some2.value());
                StorageHandler storageHandler = settings().storageHandler(settings().storageHandler$default$1());
                Path path2 = (Path) ((IterableLike) storageHandler.list(path, storageHandler.list$default$2(), storageHandler.list$default$3(), false, storageHandler.list$default$5(), storageHandler.list$default$6()).filter(path3 -> {
                    return BoxesRunTime.boxToBoolean($anonfun$batchSave$3(path3));
                })).head();
                Path path4 = new Path(new StringBuilder(4).append(path.toString()).append(".tmp").toString());
                if (settings().storageHandler(settings().storageHandler$default$1()).move(path2, path4)) {
                    settings().storageHandler(settings().storageHandler$default$1()).delete(path);
                    BoxesRunTime.boxToBoolean(settings().storageHandler(settings().storageHandler$default$1()).move(path4, path));
                } else {
                    BoxedUnit boxedUnit7 = BoxedUnit.UNIT;
                }
                return dataset;
            }
        }
        if (tuple2 != null && None$.MODULE$.equals((Option) tuple2._1())) {
            BoxedUnit boxedUnit8 = BoxedUnit.UNIT;
            return dataset;
        }
        if (tuple2 != null) {
            throw new Exception("Only coalesce(1) supported. Anything else is ignored");
        }
        throw new MatchError(tuple2);
    }

    private Dataset<Row> transform(Dataset<Row> dataset) {
        return DataFrameTransform$.MODULE$.transform(transformInstance(), dataset, session());
    }

    /* JADX WARN: Removed duplicated region for block: B:10:0x012c  */
    /* JADX WARN: Removed duplicated region for block: B:13:0x017d  */
    /* JADX WARN: Removed duplicated region for block: B:23:0x01d0  */
    /* JADX WARN: Removed duplicated region for block: B:32:0x0218  */
    /* JADX WARN: Removed duplicated region for block: B:39:0x0185  */
    /* JADX WARN: Removed duplicated region for block: B:40:0x014a  */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void writeStreaming(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> r6) {
        /*
            Method dump skipped, instructions count: 604
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: ai.starlake.job.sink.kafka.KafkaJob.writeStreaming(org.apache.spark.sql.Dataset):void");
    }

    private Option<DataFrameTransform> transformInstance() {
        return this.transformInstance;
    }

    @Override // ai.starlake.utils.JobBase
    public Try<JobResult> run() {
        ((Map) settings().appConfig().kafka().customDeserializers().getOrElse(() -> {
            return Predef$.MODULE$.Map().empty();
        })).foreach(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            String str = (String) tuple2._1();
            String str2 = (String) tuple2._2();
            String str3 = (String) this.topicConfig().map(kafkaTopicConfig -> {
                return kafkaTopicConfig.topicName();
            }).getOrElse(() -> {
                return (String) this.writeTopicConfig().map(kafkaTopicConfig2 -> {
                    return kafkaTopicConfig2.topicName();
                }).getOrElse(() -> {
                    throw new Exception("Cannot register de/serializers if topic not defined");
                });
            });
            CustomDeserializer$.MODULE$.configure(str, str2, this.settings().appConfig().kafka().serverOptions());
            UDFRegistration udf = this.session().udf();
            Function1 function1 = bArr -> {
                return CustomDeserializer$.MODULE$.deserialize(str, str3, bArr);
            };
            TypeTags universe = package$.MODULE$.universe();
            final KafkaJob kafkaJob = null;
            TypeTags.TypeTag apply = universe.TypeTag().apply(package$.MODULE$.universe().runtimeMirror(KafkaJob.class.getClassLoader()), new TypeCreator(kafkaJob) { // from class: ai.starlake.job.sink.kafka.KafkaJob$$typecreator1$1
                public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                    Universe universe2 = mirror.universe();
                    return universe2.internal().reificationSupport().TypeRef(universe2.internal().reificationSupport().SingleType(mirror.staticPackage("scala").asModule().moduleClass().asType().toTypeConstructor(), mirror.staticModule("scala.Predef")), universe2.internal().reificationSupport().selectType(mirror.staticModule("scala.Predef").asModule().moduleClass(), "String"), Nil$.MODULE$);
                }
            });
            TypeTags universe2 = package$.MODULE$.universe();
            final KafkaJob kafkaJob2 = null;
            return udf.register(str, function1, apply, universe2.TypeTag().apply(package$.MODULE$.universe().runtimeMirror(KafkaJob.class.getClassLoader()), new TypeCreator(kafkaJob2) { // from class: ai.starlake.job.sink.kafka.KafkaJob$$typecreator2$1
                public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                    Universe universe3 = mirror.universe();
                    return universe3.internal().reificationSupport().TypeRef(universe3.internal().reificationSupport().ThisType(mirror.staticPackage("scala").asModule().moduleClass()), mirror.staticClass("scala.Array"), new $colon.colon(mirror.staticClass("scala.Byte").asType().toTypeConstructor(), Nil$.MODULE$));
                }
            }));
        });
        return pipeline();
    }

    @Override // ai.starlake.utils.JobBase
    public String name() {
        return String.valueOf(kafkaJobConfig().topicConfigName());
    }

    public static final /* synthetic */ boolean $anonfun$batchSave$3(Path path) {
        return path.getName().startsWith("part-");
    }

    /* JADX WARN: Removed duplicated region for block: B:10:0x019c  */
    /* JADX WARN: Removed duplicated region for block: B:22:0x0205  */
    /* JADX WARN: Removed duplicated region for block: B:23:0x022c  */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public KafkaJob(ai.starlake.job.sink.kafka.KafkaJobConfig r8, ai.starlake.config.Settings r9) {
        /*
            Method dump skipped, instructions count: 607
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: ai.starlake.job.sink.kafka.KafkaJob.<init>(ai.starlake.job.sink.kafka.KafkaJobConfig, ai.starlake.config.Settings):void");
    }
}
