package it.agilelab.bigdata.wasp.consumers.spark.plugins.elastic;

import akka.actor.ActorRef;
import it.agilelab.bigdata.wasp.consumers.spark.writers.SparkBatchWriter;
import it.agilelab.bigdata.wasp.core.WaspSystem$;
import it.agilelab.bigdata.wasp.core.logging.Logging;
import it.agilelab.bigdata.wasp.core.logging.WaspLogger;
import it.agilelab.bigdata.wasp.core.utils.ElasticConfiguration;
import it.agilelab.bigdata.wasp.models.IndexModel;
import it.agilelab.bigdata.wasp.models.configuration.ConnectionConfig;
import it.agilelab.bigdata.wasp.models.configuration.ElasticConfigModel;
import it.agilelab.bigdata.wasp.repository.core.bl.IndexBL;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.elasticsearch.spark.sql.EsSparkSQL$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Map;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

/* compiled from: ElasticWriters.scala */
@ScalaSignature(bytes = "\u0006\u0001u4Aa\u0002\u0005\u00013!AA\u0007\u0001B\u0001B\u0003%Q\u0007\u0003\u0005?\u0001\t\u0005\t\u0015!\u0003@\u0011!A\u0005A!A!\u0002\u0013I\u0005\u0002\u0003+\u0001\u0005\u0003\u0005\u000b\u0011B+\t\u000bu\u0003A\u0011\u00010\t\u000b\u0015\u0004A\u0011\t4\u0003;\u0015c\u0017m\u001d;jGN,\u0017M]2i'B\f'o\u001b\"bi\u000eDwK]5uKJT!!\u0003\u0006\u0002\u000f\u0015d\u0017m\u001d;jG*\u00111\u0002D\u0001\ba2,x-\u001b8t\u0015\tia\"A\u0003ta\u0006\u00148N\u0003\u0002\u0010!\u0005I1m\u001c8tk6,'o\u001d\u0006\u0003#I\tAa^1ta*\u00111\u0003F\u0001\bE&<G-\u0019;b\u0015\t)b#\u0001\u0005bO&dW\r\\1c\u0015\u00059\u0012AA5u\u0007\u0001\u0019R\u0001\u0001\u000e!M9\u0002\"a\u0007\u0010\u000e\u0003qQ\u0011!H\u0001\u0006g\u000e\fG.Y\u0005\u0003?q\u0011a!\u00118z%\u00164\u0007CA\u0011%\u001b\u0005\u0011#BA\u0012\r\u0003\u001d9(/\u001b;feNL!!\n\u0012\u0003!M\u0003\u0018M]6CCR\u001c\u0007n\u0016:ji\u0016\u0014\bCA\u0014-\u001b\u0005A#BA\u0015+\u0003\u0015)H/\u001b7t\u0015\tY\u0003#\u0001\u0003d_J,\u0017BA\u0017)\u0005Q)E.Y:uS\u000e\u001cuN\u001c4jOV\u0014\u0018\r^5p]B\u0011qFM\u0007\u0002a)\u0011\u0011GK\u0001\bY><w-\u001b8h\u0013\t\u0019\u0004GA\u0004M_\u001e<\u0017N\\4\u0002\u000f%tG-\u001a=C\u0019B\u0011a\u0007P\u0007\u0002o)\u0011\u0001(O\u0001\u0003E2T!a\u000b\u001e\u000b\u0005m\u0002\u0012A\u0003:fa>\u001c\u0018\u000e^8ss&\u0011Qh\u000e\u0002\b\u0013:$W\r\u001f\"M\u0003\t\u00198\r\u0005\u0002A\r6\t\u0011I\u0003\u0002\u000e\u0005*\u00111\tR\u0001\u0007CB\f7\r[3\u000b\u0003\u0015\u000b1a\u001c:h\u0013\t9\u0015I\u0001\u0007Ta\u0006\u00148nQ8oi\u0016DH/\u0001\u0003oC6,\u0007C\u0001&R\u001d\tYu\n\u0005\u0002M95\tQJ\u0003\u0002O1\u00051AH]8pizJ!\u0001\u0015\u000f\u0002\rA\u0013X\rZ3g\u0013\t\u00116K\u0001\u0004TiJLgn\u001a\u0006\u0003!r\t\u0011#\u001a7bgRL7-\u00113nS:\f5\r^8s!\t16,D\u0001X\u0015\tA\u0016,A\u0003bGR|'OC\u0001[\u0003\u0011\t7n[1\n\u0005q;&\u0001C!di>\u0014(+\u001a4\u0002\rqJg.\u001b;?)\u0015y\u0016MY2e!\t\u0001\u0007!D\u0001\t\u0011\u0015!T\u00011\u00016\u0011\u0015qT\u00011\u0001@\u0011\u0015AU\u00011\u0001J\u0011\u0015!V\u00011\u0001V\u0003\u00159(/\u001b;f)\t9'\u000e\u0005\u0002\u001cQ&\u0011\u0011\u000e\b\u0002\u0005+:LG\u000fC\u0003l\r\u0001\u0007A.\u0001\u0003eCR\f\u0007CA7{\u001d\tqwO\u0004\u0002pk:\u0011\u0001\u000f\u001e\b\u0003cNt!\u0001\u0014:\n\u0003\u0015K!a\u0011#\n\u00055\u0011\u0015B\u0001<B\u0003\r\u0019\u0018\u000f\\\u0005\u0003qf\fq\u0001]1dW\u0006<WM\u0003\u0002w\u0003&\u00111\u0010 \u0002\n\t\u0006$\u0018M\u0012:b[\u0016T!\u0001_=")
/* loaded from: input_file:it/agilelab/bigdata/wasp/consumers/spark/plugins/elastic/ElasticsearchSparkBatchWriter.class */
public class ElasticsearchSparkBatchWriter implements SparkBatchWriter, ElasticConfiguration, Logging {
    private final IndexBL indexBL;
    private final SparkContext sc;
    private final String name;
    private final ActorRef elasticAdminActor;
    private final WaspLogger logger;
    private ElasticConfigModel elasticConfig;
    private volatile boolean bitmap$0;

    public WaspLogger logger() {
        return this.logger;
    }

    public void it$agilelab$bigdata$wasp$core$logging$Logging$_setter_$logger_$eq(WaspLogger waspLogger) {
        this.logger = waspLogger;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v8, types: [it.agilelab.bigdata.wasp.consumers.spark.plugins.elastic.ElasticsearchSparkBatchWriter] */
    private ElasticConfigModel elasticConfig$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$0) {
                this.elasticConfig = ElasticConfiguration.elasticConfig$(this);
                r0 = this;
                r0.bitmap$0 = true;
            }
        }
        return this.elasticConfig;
    }

    public ElasticConfigModel elasticConfig() {
        return !this.bitmap$0 ? elasticConfig$lzycompute() : this.elasticConfig;
    }

    public void write(Dataset<Row> dataset) {
        Option byName = this.indexBL.getByName(this.name);
        if (!byName.isDefined()) {
            logger().warn(() -> {
                return new StringBuilder(59).append("The index '").append(this.name).append("' does not exits pay ATTENTION spark won't start").toString();
            });
            return;
        }
        IndexModel indexModel = (IndexModel) byName.get();
        String eventuallyTimedName = indexModel.eventuallyTimedName();
        logger().info(() -> {
            return new StringBuilder(57).append("Check or create the index model: '").append(indexModel.toString()).append(" with this index name: ").append(eventuallyTimedName).toString();
        });
        if (indexModel.schema().isEmpty()) {
            throw new Exception(new StringBuilder(51).append("There no define schema in the index configuration: ").append(indexModel).toString());
        }
        String lowerCase = indexModel.name().toLowerCase();
        String name = indexModel.name();
        if (lowerCase != null ? !lowerCase.equals(name) : name != null) {
            throw new Exception(new StringBuilder(38).append("The index name must be all lowercase: ").append(indexModel).toString());
        }
        if (!BoxesRunTime.unboxToBoolean(WaspSystem$.MODULE$.$qmark$qmark(this.elasticAdminActor, new CheckOrCreateIndex(eventuallyTimedName, indexModel.name(), indexModel.dataType(), indexModel.getJsonSchema()), WaspSystem$.MODULE$.$qmark$qmark$default$3()))) {
            String sb = new StringBuilder(21).append("Error creating index ").append(indexModel).toString();
            logger().error(() -> {
                return sb;
            });
            throw new Exception(sb);
        }
        Map $plus$plus = Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("es.nodes"), this.sc.broadcast(((TraversableOnce) elasticConfig().connections().filter(connectionConfig -> {
            return BoxesRunTime.boxToBoolean($anonfun$write$6(connectionConfig));
        })).mkString(","), ClassTag$.MODULE$.apply(String.class)).value()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("es.batch.size.entries"), "1")})).$plus$plus(Option$.MODULE$.option2Iterable(((IndexModel) byName.get()).idField().map(str -> {
            return new Tuple2("es.mapping.id", str);
        })));
        logger().info(() -> {
            return new StringBuilder(13).append("Data schema: ").append(dataset.schema()).toString();
        });
        logger().info(() -> {
            return new StringBuilder(63).append("Write to elastic with this configuration: options: ").append($plus$plus).append(", resource: ").append(indexModel.resource()).toString();
        });
        EsSparkSQL$.MODULE$.saveToEs(dataset, indexModel.resource(), $plus$plus);
    }

    public static final /* synthetic */ boolean $anonfun$write$6(ConnectionConfig connectionConfig) {
        Object orElse = connectionConfig.metadata().flatMap(map -> {
            return map.get("connectiontype");
        }).getOrElse(() -> {
            return "";
        });
        return orElse != null ? orElse.equals("binary") : "binary" == 0;
    }

    public ElasticsearchSparkBatchWriter(IndexBL indexBL, SparkContext sparkContext, String str, ActorRef actorRef) {
        this.indexBL = indexBL;
        this.sc = sparkContext;
        this.name = str;
        this.elasticAdminActor = actorRef;
        ElasticConfiguration.$init$(this);
        Logging.$init$(this);
    }
}
