package org.apache.seatunnel.core.starter.spark.execution;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.spark.SparkEnvironment;
import org.apache.seatunnel.translation.spark.common.utils.TypeConverterUtils;
import org.apache.seatunnel.translation.spark.sink.SparkSinkInjector;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.DataType;

/* loaded from: input_file:org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.class */
public class SinkExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTunnelSink<?, ?, ?, ?>> {
    private static final String PLUGIN_TYPE = "sink";

    /* JADX INFO: Access modifiers changed from: protected */
    public SinkExecuteProcessor(SparkEnvironment sparkEnvironment, JobContext jobContext, List<? extends Config> list) {
        super(sparkEnvironment, jobContext, list);
    }

    @Override // org.apache.seatunnel.core.starter.spark.execution.AbstractPluginExecuteProcessor
    protected List<SeaTunnelSink<?, ?, ?, ?>> initializePlugins(List<? extends Config> list) {
        SeaTunnelSinkPluginDiscovery seaTunnelSinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery();
        ArrayList arrayList = new ArrayList();
        List<SeaTunnelSink<?, ?, ?, ?>> list2 = (List) list.stream().map(config -> {
            PluginIdentifier of = PluginIdentifier.of("seatunnel", "sink", config.getString("plugin_name"));
            arrayList.addAll(seaTunnelSinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(of)));
            SeaTunnelSink createPluginInstance = seaTunnelSinkPluginDiscovery.createPluginInstance(of);
            createPluginInstance.prepare(config);
            createPluginInstance.setJobContext(this.jobContext);
            return createPluginInstance;
        }).distinct().collect(Collectors.toList());
        this.sparkEnvironment.registerPlugin(arrayList);
        return list2;
    }

    @Override // org.apache.seatunnel.core.starter.spark.execution.PluginExecuteProcessor
    public List<Dataset<Row>> execute(List<Dataset<Row>> list) throws TaskExecuteException {
        Dataset<Row> dataset = list.get(0);
        for (int i = 0; i < this.plugins.size(); i++) {
            Config config = this.pluginConfigs.get(i);
            SeaTunnelSink seaTunnelSink = (SeaTunnelSink) this.plugins.get(i);
            Dataset<Row> orElse = fromSourceTable(config, this.sparkEnvironment).orElse(dataset);
            seaTunnelSink.setTypeInfo((SeaTunnelRowType) TypeConverterUtils.convert((DataType) orElse.schema()));
            SparkSinkInjector.inject((DataFrameWriter<Row>) orElse.write(), (SeaTunnelSink<?, ?, ?, ?>) seaTunnelSink).option("checkpointLocation", "/tmp").save();
        }
        return null;
    }
}
