package it.agilelab.bigdata.wasp.consumers.spark.plugins.hbase;

import it.agilelab.bigdata.wasp.consumers.spark.writers.SparkLegacyStreamingWriter;
import it.agilelab.bigdata.wasp.core.logging.Logging;
import it.agilelab.bigdata.wasp.core.logging.WaspLogger;
import it.agilelab.bigdata.wasp.models.KeyValueModel;
import it.agilelab.bigdata.wasp.models.KeyValueModel$;
import java.io.File;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.spark.HBaseContext;
import org.apache.hadoop.hbase.spark.HBaseContext$;
import org.apache.hadoop.hbase.spark.PutConverterFactory;
import org.apache.hadoop.hbase.spark.PutConverterFactory$;
import org.apache.hadoop.hbase.spark.datasources.HBaseSparkConf$;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SQLContext$;
import org.apache.spark.sql.datasources.hbase.HBaseTableCatalog$;
import org.apache.spark.sql.types.DataType$;
import org.apache.spark.streaming.StreamingContext;
import org.apache.spark.streaming.dstream.DStream;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.GenTraversableOnce;
import scala.collection.Seq$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.ArrayOps;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: HBaseWriters.scala */
@ScalaSignature(bytes = "\u0006\u0001u3A!\u0002\u0004\u0001/!AA\u0006\u0001B\u0001B\u0003%Q\u0006\u0003\u00054\u0001\t\u0005\t\u0015!\u00035\u0011\u0015y\u0004\u0001\"\u0001A\u0011\u0015)\u0005\u0001\"\u0011G\u0005QA%)Y:f'R\u0014X-Y7j]\u001e<&/\u001b;fe*\u0011q\u0001C\u0001\u0006Q\n\f7/\u001a\u0006\u0003\u0013)\tq\u0001\u001d7vO&t7O\u0003\u0002\f\u0019\u0005)1\u000f]1sW*\u0011QBD\u0001\nG>t7/^7feNT!a\u0004\t\u0002\t]\f7\u000f\u001d\u0006\u0003#I\tqAY5hI\u0006$\u0018M\u0003\u0002\u0014)\u0005A\u0011mZ5mK2\f'MC\u0001\u0016\u0003\tIGo\u0001\u0001\u0014\t\u0001Ab\u0004\n\t\u00033qi\u0011A\u0007\u0006\u00027\u0005)1oY1mC&\u0011QD\u0007\u0002\u0007\u0003:L(+\u001a4\u0011\u0005}\u0011S\"\u0001\u0011\u000b\u0005\u0005R\u0011aB<sSR,'o]\u0005\u0003G\u0001\u0012!d\u00159be.dUmZ1dsN#(/Z1nS:<wK]5uKJ\u0004\"!\n\u0016\u000e\u0003\u0019R!a\n\u0015\u0002\u000f1|wmZ5oO*\u0011\u0011FD\u0001\u0005G>\u0014X-\u0003\u0002,M\t9Aj\\4hS:<\u0017A\u00035cCN,Wj\u001c3fYB\u0011a&M\u0007\u0002_)\u0011\u0001GD\u0001\u0007[>$W\r\\:\n\u0005Iz#!D&fsZ\u000bG.^3N_\u0012,G.A\u0002tg\u000e\u0004\"!N\u001f\u000e\u0003YR!a\u000e\u001d\u0002\u0013M$(/Z1nS:<'BA\u0006:\u0015\tQ4(\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002y\u0005\u0019qN]4\n\u0005y2$\u0001E*ue\u0016\fW.\u001b8h\u0007>tG/\u001a=u\u0003\u0019a\u0014N\\5u}Q\u0019\u0011i\u0011#\u0011\u0005\t\u0003Q\"\u0001\u0004\t\u000b1\u001a\u0001\u0019A\u0017\t\u000bM\u001a\u0001\u0019\u0001\u001b\u0002\u000b]\u0014\u0018\u000e^3\u0015\u0005\u001dS\u0005CA\rI\u0013\tI%D\u0001\u0003V]&$\b\"B&\u0005\u0001\u0004a\u0015AB:ue\u0016\fW\u000eE\u0002N!Jk\u0011A\u0014\u0006\u0003\u001fZ\nq\u0001Z:ue\u0016\fW.\u0003\u0002R\u001d\n9Ai\u0015;sK\u0006l\u0007CA*[\u001d\t!\u0006\f\u0005\u0002V55\taK\u0003\u0002X-\u00051AH]8pizJ!!\u0017\u000e\u0002\rA\u0013X\rZ3g\u0013\tYFL\u0001\u0004TiJLgn\u001a\u0006\u00033j\u0001")
/* loaded from: input_file:it/agilelab/bigdata/wasp/consumers/spark/plugins/hbase/HBaseStreamingWriter.class */
public class HBaseStreamingWriter implements SparkLegacyStreamingWriter, Logging {
    private final KeyValueModel hbaseModel;
    private final StreamingContext ssc;
    private final WaspLogger logger;

    public WaspLogger logger() {
        return this.logger;
    }

    public void it$agilelab$bigdata$wasp$core$logging$Logging$_setter_$logger_$eq(WaspLogger waspLogger) {
        this.logger = waspLogger;
    }

    public void write(DStream<String> dStream) {
        SQLContext orCreate = SQLContext$.MODULE$.getOrCreate(this.ssc.sparkContext());
        KeyValueModel keyValueModel = this.hbaseModel;
        logger().info(() -> {
            return new StringBuilder(33).append("Initialize DStream HBase writer: ").append(this.hbaseModel).toString();
        });
        String str = (String) keyValueModel.dataFrameSchema().get();
        String str2 = (String) this.hbaseModel.getOptionsMap().getOrElse(HBaseSparkConf$.MODULE$.HBASE_CONFIG_LOCATION(), () -> {
            return "";
        });
        Configuration create = HBaseConfiguration.create();
        new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(str2.split(","))).foreach(str3 -> {
            create.addResource(str3);
            return BoxedUnit.UNIT;
        });
        new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps((Object[]) new ArrayOps.ofRef(Predef$.MODULE$.refArrayOps(str2.split(","))).filter(str4 -> {
            return BoxesRunTime.boxToBoolean($anonfun$write$5(str4));
        }))).foreach(str5 -> {
            $anonfun$write$6(create, str5);
            return BoxedUnit.UNIT;
        });
        HBaseContext hBaseContext = new HBaseContext(this.ssc.sparkContext(), create, HBaseContext$.MODULE$.$lessinit$greater$default$3());
        Map $plus$plus = this.hbaseModel.getOptionsMap().$plus$plus((GenTraversableOnce) this.hbaseModel.avroSchemas().getOrElse(() -> {
            return Predef$.MODULE$.Map().apply(Nil$.MODULE$);
        })).$plus$plus(Seq$.MODULE$.apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(HBaseTableCatalog$.MODULE$.tableCatalog()), this.hbaseModel.tableCatalog()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(KeyValueModel$.MODULE$.metadataAvroSchemaKey()), KeyValueModel$.MODULE$.metadataAvro()), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(HBaseTableCatalog$.MODULE$.newTable()), "4"), Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc("useAvroSchemaManager"), BoxesRunTime.boxToBoolean(this.hbaseModel.useAvroSchemaManager()).toString())})));
        dStream.foreachRDD(rdd -> {
            $anonfun$write$8(str, orCreate, $plus$plus, hBaseContext, rdd);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ boolean $anonfun$write$5(String str) {
        if (str != null ? !str.equals("") : "" != 0) {
            if (new File(str).exists()) {
                return true;
            }
        }
        return false;
    }

    public static final /* synthetic */ void $anonfun$write$6(Configuration configuration, String str) {
        configuration.addResource(new Path(str));
    }

    public static final /* synthetic */ void $anonfun$write$8(String str, SQLContext sQLContext, Map map, HBaseContext hBaseContext, RDD rdd) {
        if (rdd.isEmpty()) {
            return;
        }
        Dataset<Row> convertAvroColumns = PutConverterFactory$.MODULE$.convertAvroColumns(map, sQLContext.read().schema(DataType$.MODULE$.fromJson(str)).json(rdd));
        PutConverterFactory apply = PutConverterFactory$.MODULE$.apply((Map<String, String>) map, convertAvroColumns);
        hBaseContext.bulkPut(convertAvroColumns.queryExecution().toRdd(), apply.getTableName(), internalRow -> {
            return apply.convertToPut(internalRow);
        });
    }

    public HBaseStreamingWriter(KeyValueModel keyValueModel, StreamingContext streamingContext) {
        this.hbaseModel = keyValueModel;
        this.ssc = streamingContext;
        Logging.$init$(this);
    }
}
