package org.apache.seatunnel.core.starter.flink.execution;

import java.net.URL;
import java.util.List;
import java.util.Optional;
import java.util.function.BiConsumer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.apis.base.plugin.Plugin;
import org.apache.seatunnel.core.starter.flink.config.FlinkCommon;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.util.TableUtil;
import org.apache.seatunnel.shade.com.typesafe.config.Config;

/* loaded from: input_file:org/apache/seatunnel/core/starter/flink/execution/AbstractPluginExecuteProcessor.class */
public abstract class AbstractPluginExecuteProcessor<T> implements PluginExecuteProcessor {
    protected final FlinkEnvironment flinkEnvironment;
    protected final List<? extends Config> pluginConfigs;
    protected final JobContext jobContext;
    protected final List<T> plugins;
    protected static final String ENGINE_TYPE = "seatunnel";
    protected static final String PLUGIN_NAME = "plugin_name";
    protected final BiConsumer<ClassLoader, URL> addUrlToClassloader = FlinkCommon.ADD_URL_TO_CLASSLOADER;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractPluginExecuteProcessor(FlinkEnvironment flinkEnvironment, JobContext jobContext, List<? extends Config> list) {
        this.flinkEnvironment = flinkEnvironment;
        this.jobContext = jobContext;
        this.pluginConfigs = list;
        this.plugins = initializePlugins(list);
    }

    protected abstract List<T> initializePlugins(List<? extends Config> list);

    /* JADX INFO: Access modifiers changed from: protected */
    public void registerResultTable(Config config, DataStream<Row> dataStream) {
        if (config.hasPath(Plugin.RESULT_TABLE_NAME)) {
            String string = config.getString(Plugin.RESULT_TABLE_NAME);
            StreamTableEnvironment streamTableEnvironment = this.flinkEnvironment.getStreamTableEnvironment();
            if (TableUtil.tableExists(streamTableEnvironment, string)) {
                return;
            }
            if (config.hasPath("field_name")) {
                streamTableEnvironment.registerDataStream(string, dataStream, config.getString("field_name"));
            } else {
                streamTableEnvironment.registerDataStream(string, dataStream);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Optional<DataStream<Row>> fromSourceTable(Config config) {
        if (!config.hasPath(Plugin.SOURCE_TABLE_NAME)) {
            return Optional.empty();
        }
        StreamTableEnvironment streamTableEnvironment = this.flinkEnvironment.getStreamTableEnvironment();
        return Optional.ofNullable(TableUtil.tableToDataStream(streamTableEnvironment, streamTableEnvironment.scan(new String[]{config.getString(Plugin.SOURCE_TABLE_NAME)}), true));
    }
}
