package it.agilelab.bigdata.wasp.consumers.spark.plugins.http;

import it.agilelab.bigdata.wasp.consumers.spark.writers.SparkStructuredStreamingWriter;
import it.agilelab.bigdata.wasp.core.logging.Logging;
import it.agilelab.bigdata.wasp.core.logging.WaspLogger;
import it.agilelab.bigdata.wasp.models.HttpCompression;
import it.agilelab.bigdata.wasp.models.HttpCompression$Disabled$;
import it.agilelab.bigdata.wasp.models.HttpCompression$Gzip$;
import it.agilelab.bigdata.wasp.models.HttpCompression$Lz4$;
import it.agilelab.bigdata.wasp.models.HttpCompression$Snappy$;
import it.agilelab.bigdata.wasp.models.HttpModel;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.functions$;
import org.apache.spark.sql.streaming.DataStreamWriter;
import org.apache.spark.sql.types.ArrayType;
import org.apache.spark.sql.types.BinaryType$;
import org.apache.spark.sql.types.MapType;
import org.apache.spark.sql.types.StructType;
import scala.MatchError;
import scala.None$;
import scala.Option$;
import scala.Predef$;
import scala.Some;
import scala.StringContext;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.immutable.$colon;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.StringBuilder;
import scala.reflect.ScalaSignature;

/* compiled from: HttpWaspWriter.scala */
@ScalaSignature(bytes = "\u0006\u0001=4A!\u0001\u0002\u0001'\tq\u0001\n\u001e;q/\u0006\u001c\bo\u0016:ji\u0016\u0014(BA\u0002\u0005\u0003\u0011AG\u000f\u001e9\u000b\u0005\u00151\u0011a\u00029mk\u001eLgn\u001d\u0006\u0003\u000f!\tQa\u001d9be.T!!\u0003\u0006\u0002\u0013\r|gn];nKJ\u001c(BA\u0006\r\u0003\u00119\u0018m\u001d9\u000b\u00055q\u0011a\u00022jO\u0012\fG/\u0019\u0006\u0003\u001fA\t\u0001\"Y4jY\u0016d\u0017M\u0019\u0006\u0002#\u0005\u0011\u0011\u000e^\u0002\u0001'\u0011\u0001AC\u0007\u0011\u0011\u0005UAR\"\u0001\f\u000b\u0003]\tQa]2bY\u0006L!!\u0007\f\u0003\r\u0005s\u0017PU3g!\tYb$D\u0001\u001d\u0015\tib!A\u0004xe&$XM]:\n\u0005}a\"AH*qCJ\\7\u000b\u001e:vGR,(/\u001a3TiJ,\u0017-\\5oO^\u0013\u0018\u000e^3s!\t\tc%D\u0001#\u0015\t\u0019C%A\u0004m_\u001e<\u0017N\\4\u000b\u0005\u0015R\u0011\u0001B2pe\u0016L!a\n\u0012\u0003\u000f1{wmZ5oO\"A\u0011\u0006\u0001B\u0001B\u0003%!&A\u0005iiR\u0004Xj\u001c3fYB\u00111FL\u0007\u0002Y)\u0011QFC\u0001\u0007[>$W\r\\:\n\u0005=b#!\u0003%uiBlu\u000eZ3m\u0011\u0015\t\u0004\u0001\"\u00013\u0003\u0019a\u0014N\\5u}Q\u00111'\u000e\t\u0003i\u0001i\u0011A\u0001\u0005\u0006SA\u0002\rA\u000b\u0005\bo\u0001\u0011\r\u0011\"\u00039\u0003)1\u0018\r\\\"pY:\u000bW.Z\u000b\u0002sA\u0011!hP\u0007\u0002w)\u0011A(P\u0001\u0005Y\u0006twMC\u0001?\u0003\u0011Q\u0017M^1\n\u0005\u0001[$AB*ue&tw\r\u0003\u0004C\u0001\u0001\u0006I!O\u0001\fm\u0006d7i\u001c7OC6,\u0007\u0005C\u0003E\u0001\u0011\u0005S)A\u0003xe&$X\r\u0006\u0002G/B\u0019q)U*\u000e\u0003!S!!\u0013&\u0002\u0013M$(/Z1nS:<'BA&M\u0003\r\u0019\u0018\u000f\u001c\u0006\u0003\u000f5S!AT(\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005\u0001\u0016aA8sO&\u0011!\u000b\u0013\u0002\u0011\t\u0006$\u0018m\u0015;sK\u0006lwK]5uKJ\u0004\"\u0001V+\u000e\u0003)K!A\u0016&\u0003\u0007I{w\u000fC\u0003Y\u0007\u0002\u0007\u0011,\u0001\u0004tiJ,\u0017-\u001c\t\u00035\"t!a\u00174\u000f\u0005q+gBA/e\u001d\tq6M\u0004\u0002`E6\t\u0001M\u0003\u0002b%\u00051AH]8pizJ\u0011\u0001U\u0005\u0003\u001d>K!aB'\n\u0005-c\u0015BA4K\u0003\u001d\u0001\u0018mY6bO\u0016L!!\u001b6\u0003\u0013\u0011\u000bG/\u0019$sC6,'BA4K\u0011\u0019a\u0007\u0001\"\u0005\u000b[\u0006I\u0001O]3qCJ,GI\u0012\u000b\u00033:DQ\u0001W6A\u0002e\u0003")
/* loaded from: input_file:it/agilelab/bigdata/wasp/consumers/spark/plugins/http/HttpWaspWriter.class */
public class HttpWaspWriter implements SparkStructuredStreamingWriter, Logging {
    private final HttpModel httpModel;
    private final String valColName;
    private final WaspLogger logger;

    public WaspLogger logger() {
        return this.logger;
    }

    public void it$agilelab$bigdata$wasp$core$logging$Logging$_setter_$logger_$eq(WaspLogger waspLogger) {
        this.logger = waspLogger;
    }

    private String valColName() {
        return this.valColName;
    }

    public DataStreamWriter<Row> write(Dataset<Row> dataset) {
        Dataset<Row> prepareDF = prepareDF(dataset);
        return prepareDF.writeStream().foreach(HttpWriter$.MODULE$.apply(this.httpModel, valColName()));
    }

    public Dataset<Row> prepareDF(Dataset<Row> dataset) {
        None$ some;
        Column column;
        Column column2;
        HttpCompression compression = this.httpModel.compression();
        if (HttpCompression$Disabled$.MODULE$.equals(compression)) {
            some = None$.MODULE$;
        } else {
            if (!HttpCompression$Gzip$.MODULE$.equals(compression)) {
                if (HttpCompression$Snappy$.MODULE$.equals(compression)) {
                    throw new IllegalArgumentException("Unsupported compression format: snappy");
                }
                if (HttpCompression$Lz4$.MODULE$.equals(compression)) {
                    throw new IllegalArgumentException("Unsupported compression format: lz4");
                }
                throw new MatchError(compression);
            }
            some = new Some("gzip");
        }
        None$ none$ = some;
        List list = this.httpModel.valueFieldsNames().isEmpty() ? Predef$.MODULE$.refArrayOps((Object[]) this.httpModel.headersFieldName().fold(new HttpWaspWriter$$anonfun$1(this, dataset), new HttpWaspWriter$$anonfun$2(this, dataset))).toList() : this.httpModel.valueFieldsNames();
        Configuration hadoopConfiguration = dataset.sparkSession().sparkContext().hadoopConfiguration();
        if (list instanceof $colon.colon) {
            $colon.colon colonVar = ($colon.colon) list;
            String str = (String) colonVar.head();
            if (Nil$.MODULE$.equals(colonVar.tl$1())) {
                Some map = Predef$.MODULE$.refArrayOps(dataset.schema().fields()).find(new HttpWaspWriter$$anonfun$3(this, str)).map(new HttpWaspWriter$$anonfun$4(this));
                if (((map instanceof Some) && (map.x() instanceof MapType)) ? true : (map instanceof Some) && (map.x() instanceof ArrayType)) {
                    column2 = this.httpModel.structured() ? functions$.MODULE$.to_json(functions$.MODULE$.struct(str, Predef$.MODULE$.wrapRefArray(new String[0]))) : functions$.MODULE$.to_json(functions$.MODULE$.col(str));
                } else if ((map instanceof Some) && (map.x() instanceof StructType)) {
                    column2 = functions$.MODULE$.to_json(functions$.MODULE$.col(str));
                } else {
                    if (None$.MODULE$.equals(map)) {
                        throw new IllegalArgumentException(new StringBuilder().append(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Cannot find column ", " inside data frame, columns are "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{str}))).append(Predef$.MODULE$.refArrayOps(dataset.schema().fields()).mkString("[", ",", "]")).toString());
                    }
                    column2 = functions$.MODULE$.to_json(functions$.MODULE$.struct(str, Predef$.MODULE$.wrapRefArray(new String[0])));
                }
                column = column2;
                Column cast = column.cast(BinaryType$.MODULE$);
                return dataset.select((Seq) Option$.MODULE$.option2Iterable(this.httpModel.headersFieldName().map(new HttpWaspWriter$$anonfun$8(this, dataset))).toSeq().$colon$plus(((Column) none$.map(new HttpWaspWriter$$anonfun$6(this, hadoopConfiguration, cast)).getOrElse(new HttpWaspWriter$$anonfun$7(this, cast))).as(valColName()), Seq$.MODULE$.canBuildFrom()));
            }
        }
        column = functions$.MODULE$.to_json(functions$.MODULE$.struct((Seq) list.map(new HttpWaspWriter$$anonfun$5(this), List$.MODULE$.canBuildFrom())));
        Column cast2 = column.cast(BinaryType$.MODULE$);
        return dataset.select((Seq) Option$.MODULE$.option2Iterable(this.httpModel.headersFieldName().map(new HttpWaspWriter$$anonfun$8(this, dataset))).toSeq().$colon$plus(((Column) none$.map(new HttpWaspWriter$$anonfun$6(this, hadoopConfiguration, cast2)).getOrElse(new HttpWaspWriter$$anonfun$7(this, cast2))).as(valColName()), Seq$.MODULE$.canBuildFrom()));
    }

    public HttpWaspWriter(HttpModel httpModel) {
        this.httpModel = httpModel;
        Logging.class.$init$(this);
        this.valColName = "value";
    }
}
