package org.apache.seatunnel.core.starter.flink.execution;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SupportCoordinate;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.translation.flink.source.BaseSeaTunnelSourceFunction;
import org.apache.seatunnel.translation.flink.source.SeaTunnelCoordinatedSource;
import org.apache.seatunnel.translation.flink.source.SeaTunnelParallelSource;

/* loaded from: input_file:org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.class */
public class SourceExecuteProcessor extends AbstractPluginExecuteProcessor<SeaTunnelSource> {
    private static final String PLUGIN_TYPE = "source";

    public SourceExecuteProcessor(FlinkEnvironment flinkEnvironment, JobContext jobContext, List<? extends Config> list) {
        super(flinkEnvironment, jobContext, list);
    }

    @Override // org.apache.seatunnel.core.starter.flink.execution.PluginExecuteProcessor
    public List<DataStream<Row>> execute(List<DataStream<Row>> list) {
        StreamExecutionEnvironment streamExecutionEnvironment = this.flinkEnvironment.getStreamExecutionEnvironment();
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < this.plugins.size(); i++) {
            SeaTunnelSource seaTunnelSource = (SeaTunnelSource) this.plugins.get(i);
            DataStreamSource<Row> addSource = addSource(streamExecutionEnvironment, seaTunnelSource instanceof SupportCoordinate ? new SeaTunnelCoordinatedSource(seaTunnelSource) : new SeaTunnelParallelSource(seaTunnelSource), "SeaTunnel " + seaTunnelSource.getClass().getSimpleName(), seaTunnelSource.getBoundedness() == Boundedness.BOUNDED);
            registerResultTable(this.pluginConfigs.get(i), addSource);
            arrayList.add(addSource);
        }
        return arrayList;
    }

    private DataStreamSource<Row> addSource(StreamExecutionEnvironment streamExecutionEnvironment, BaseSeaTunnelSourceFunction baseSeaTunnelSourceFunction, String str, boolean z) {
        Preconditions.checkNotNull(baseSeaTunnelSourceFunction);
        Preconditions.checkNotNull(str);
        Preconditions.checkNotNull(Boolean.valueOf(z));
        TypeInformation<Row> producedType = baseSeaTunnelSourceFunction.getProducedType();
        boolean z2 = baseSeaTunnelSourceFunction instanceof ParallelSourceFunction;
        streamExecutionEnvironment.clean(baseSeaTunnelSourceFunction);
        return new DataStreamSource<>(streamExecutionEnvironment, producedType, new StreamSource(baseSeaTunnelSourceFunction), z2, str, z ? org.apache.flink.api.connector.source.Boundedness.BOUNDED : org.apache.flink.api.connector.source.Boundedness.CONTINUOUS_UNBOUNDED);
    }

    @Override // org.apache.seatunnel.core.starter.flink.execution.AbstractPluginExecuteProcessor
    protected List<SeaTunnelSource> initializePlugins(List<? extends Config> list) {
        SeaTunnelSourcePluginDiscovery seaTunnelSourcePluginDiscovery = new SeaTunnelSourcePluginDiscovery(this.addUrlToClassloader);
        ArrayList arrayList = new ArrayList();
        HashSet hashSet = new HashSet();
        for (Config config : list) {
            PluginIdentifier of = PluginIdentifier.of("seatunnel", "source", config.getString("plugin_name"));
            hashSet.addAll(seaTunnelSourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(of)));
            SeaTunnelSource createPluginInstance = seaTunnelSourcePluginDiscovery.createPluginInstance(of);
            createPluginInstance.prepare(config);
            createPluginInstance.setJobContext(this.jobContext);
            if (this.jobContext.getJobMode() == JobMode.BATCH && createPluginInstance.getBoundedness() == Boundedness.UNBOUNDED) {
                throw new UnsupportedOperationException(String.format("'%s' source don't support off-line job.", createPluginInstance.getPluginName()));
            }
            arrayList.add(createPluginInstance);
        }
        this.flinkEnvironment.registerPlugin(new ArrayList(hashSet));
        return arrayList;
    }
}
