package org.apache.seatunnel.core.starter.flink.execution;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.types.Row;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.translation.flink.sink.FlinkSink;
import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;
import scala.Serializable;

/* loaded from: input_file:org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.class */
public class SinkExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable>> {
    private static final String PLUGIN_TYPE = "sink";

    /* JADX INFO: Access modifiers changed from: protected */
    public SinkExecuteProcessor(FlinkEnvironment flinkEnvironment, JobContext jobContext, List<? extends Config> list) {
        super(flinkEnvironment, jobContext, list);
    }

    @Override // org.apache.seatunnel.core.starter.flink.execution.AbstractPluginExecuteProcessor
    protected List<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable>> initializePlugins(List<? extends Config> list) {
        SeaTunnelSinkPluginDiscovery seaTunnelSinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery(this.addUrlToClassloader);
        ArrayList arrayList = new ArrayList();
        List<SeaTunnelSink<SeaTunnelRow, Serializable, Serializable, Serializable>> list2 = (List) list.stream().map(config -> {
            PluginIdentifier of = PluginIdentifier.of("seatunnel", "sink", config.getString("plugin_name"));
            arrayList.addAll(seaTunnelSinkPluginDiscovery.getPluginJarPaths(Lists.newArrayList(of)));
            SeaTunnelSink createPluginInstance = seaTunnelSinkPluginDiscovery.createPluginInstance(of);
            createPluginInstance.prepare(config);
            createPluginInstance.setJobContext(this.jobContext);
            return createPluginInstance;
        }).distinct().collect(Collectors.toList());
        this.flinkEnvironment.registerPlugin(arrayList);
        return list2;
    }

    @Override // org.apache.seatunnel.core.starter.flink.execution.PluginExecuteProcessor
    public List<DataStream<Row>> execute(List<DataStream<Row>> list) throws TaskExecuteException {
        DataStream<Row> dataStream = list.get(0);
        for (int i = 0; i < this.plugins.size(); i++) {
            Config config = this.pluginConfigs.get(i);
            SeaTunnelSink seaTunnelSink = (SeaTunnelSink) this.plugins.get(i);
            DataStream<Row> orElse = fromSourceTable(config).orElse(dataStream);
            seaTunnelSink.setTypeInfo((SeaTunnelRowType) TypeConverterUtils.convert((TypeInformation<?>) orElse.getType()));
            orElse.sinkTo(new FlinkSink(seaTunnelSink)).name(seaTunnelSink.getPluginName());
        }
        return null;
    }
}
