package org.apache.seatunnel.spark.structuredstream;

import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.seatunnel.env.Execution;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.spark.BaseSparkTransform;
import org.apache.seatunnel.spark.SparkEnvironment;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.streaming.DataStreamWriter;

/* loaded from: input_file:org/apache/seatunnel/spark/structuredstream/StructuredStreamingExecution.class */
public class StructuredStreamingExecution implements Execution<StructuredStreamingSource, BaseSparkTransform, StructuredStreamingSink, SparkEnvironment> {
    private final SparkEnvironment sparkEnvironment;
    private Config config = ConfigFactory.empty();

    public StructuredStreamingExecution(SparkEnvironment sparkEnvironment) {
        this.sparkEnvironment = sparkEnvironment;
    }

    @Override // org.apache.seatunnel.env.Execution
    public void start(List<StructuredStreamingSource> list, List<BaseSparkTransform> list2, List<StructuredStreamingSink> list3) throws Exception {
        List list4 = (List) list.stream().map(structuredStreamingSource -> {
            return SparkEnvironment.registerInputTempView(structuredStreamingSource, this.sparkEnvironment);
        }).collect(Collectors.toList());
        if (list4.size() > 0) {
            Dataset<Row> dataset = (Dataset) list4.get(0);
            for (BaseSparkTransform baseSparkTransform : list2) {
                dataset = SparkEnvironment.transformProcess(this.sparkEnvironment, baseSparkTransform, dataset);
                SparkEnvironment.registerTransformTempView(baseSparkTransform, dataset);
            }
            Iterator<StructuredStreamingSink> it = list3.iterator();
            while (it.hasNext()) {
                ((DataStreamWriter) SparkEnvironment.sinkProcess(this.sparkEnvironment, it.next(), dataset)).start();
            }
            this.sparkEnvironment.getSparkSession().streams().awaitAnyTermination();
        }
    }

    @Override // org.apache.seatunnel.plugin.Plugin
    public void setConfig(Config config) {
        this.config = config;
    }

    @Override // org.apache.seatunnel.plugin.Plugin
    public Config getConfig() {
        return this.config;
    }
}
