package org.apache.seatunnel.core.starter.flink.execution;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.types.Row;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.stream.FlinkStreamTransform;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelFlinkTransformPluginDiscovery;
import org.apache.seatunnel.shade.com.typesafe.config.Config;

/* loaded from: input_file:org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.class */
public class TransformExecuteProcessor extends AbstractPluginExecuteProcessor<FlinkStreamTransform> {
    private static final String PLUGIN_TYPE = "transform";

    /* JADX INFO: Access modifiers changed from: protected */
    public TransformExecuteProcessor(FlinkEnvironment flinkEnvironment, JobContext jobContext, List<? extends Config> list) {
        super(flinkEnvironment, jobContext, list);
    }

    @Override // org.apache.seatunnel.core.starter.flink.execution.AbstractPluginExecuteProcessor
    protected List<FlinkStreamTransform> initializePlugins(List<? extends Config> list) {
        SeaTunnelFlinkTransformPluginDiscovery seaTunnelFlinkTransformPluginDiscovery = new SeaTunnelFlinkTransformPluginDiscovery();
        ArrayList arrayList = new ArrayList();
        List<FlinkStreamTransform> list2 = (List) list.stream().map(config -> {
            PluginIdentifier of = PluginIdentifier.of("seatunnel", "transform", config.getString("plugin_name"));
            arrayList.addAll(seaTunnelFlinkTransformPluginDiscovery.getPluginJarPaths(Lists.newArrayList(of)));
            FlinkStreamTransform flinkStreamTransform = (FlinkStreamTransform) seaTunnelFlinkTransformPluginDiscovery.createPluginInstance(of);
            flinkStreamTransform.setConfig(config);
            flinkStreamTransform.prepare(this.flinkEnvironment);
            return flinkStreamTransform;
        }).distinct().collect(Collectors.toList());
        this.flinkEnvironment.registerPlugin(arrayList);
        return list2;
    }

    @Override // org.apache.seatunnel.core.starter.flink.execution.PluginExecuteProcessor
    public List<DataStream<Row>> execute(List<DataStream<Row>> list) throws TaskExecuteException {
        if (this.plugins.isEmpty()) {
            return list;
        }
        DataStream<Row> dataStream = list.get(0);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < this.plugins.size(); i++) {
            try {
                FlinkStreamTransform flinkStreamTransform = (FlinkStreamTransform) this.plugins.get(i);
                Config config = this.pluginConfigs.get(i);
                dataStream = flinkStreamTransform.processStream(this.flinkEnvironment, fromSourceTable(config).orElse(dataStream));
                registerResultTable(config, dataStream);
                flinkStreamTransform.registerFunction(this.flinkEnvironment);
                arrayList.add(dataStream);
            } catch (Exception e) {
                throw new TaskExecuteException(String.format("SeaTunnel transform task: %s execute error", ((FlinkStreamTransform) this.plugins.get(i)).getPluginName()), e);
            }
        }
        return arrayList;
    }
}
