package org.apache.seatunnel.flink.batch;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.seatunnel.env.Execution;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.util.TableUtil;
import org.apache.seatunnel.plugin.Plugin;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/flink/batch/FlinkBatchExecution.class */
public class FlinkBatchExecution implements Execution<FlinkBatchSource, FlinkBatchTransform, FlinkBatchSink, FlinkEnvironment> {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) FlinkBatchExecution.class);
    private Config config;
    private final FlinkEnvironment flinkEnvironment;

    public FlinkBatchExecution(FlinkEnvironment flinkEnvironment) {
        this.flinkEnvironment = flinkEnvironment;
    }

    @Override // org.apache.seatunnel.env.Execution
    public void start(List<FlinkBatchSource> list, List<FlinkBatchTransform> list2, List<FlinkBatchSink> list3) throws Exception {
        ArrayList arrayList = new ArrayList();
        for (FlinkBatchSource flinkBatchSource : list) {
            DataSet<Row> data = flinkBatchSource.getData(this.flinkEnvironment);
            arrayList.add(data);
            registerResultTable(flinkBatchSource.getConfig(), data);
        }
        DataSet<Row> dataSet = (DataSet) arrayList.get(0);
        for (FlinkBatchTransform flinkBatchTransform : list2) {
            dataSet = flinkBatchTransform.processBatch(this.flinkEnvironment, fromSourceTable(flinkBatchTransform.getConfig()).orElse(dataSet));
            registerResultTable(flinkBatchTransform.getConfig(), dataSet);
            flinkBatchTransform.registerFunction(this.flinkEnvironment);
        }
        for (FlinkBatchSink flinkBatchSink : list3) {
            flinkBatchSink.outputBatch(this.flinkEnvironment, fromSourceTable(flinkBatchSink.getConfig()).orElse(dataSet));
        }
        if (whetherExecute(list3)) {
            try {
                LOGGER.info("Flink Execution Plan:{}", this.flinkEnvironment.getBatchEnvironment().getExecutionPlan());
                LOGGER.info(this.flinkEnvironment.getBatchEnvironment().execute(this.flinkEnvironment.getJobName()).toString());
            } catch (Exception e) {
                LOGGER.warn("Flink with job name [{}] execute failed", this.flinkEnvironment.getJobName());
                throw e;
            }
        }
    }

    private void registerResultTable(Config config, DataSet<Row> dataSet) {
        if (config.hasPath(Plugin.RESULT_TABLE_NAME)) {
            String string = config.getString(Plugin.RESULT_TABLE_NAME);
            BatchTableEnvironment batchTableEnvironment = this.flinkEnvironment.getBatchTableEnvironment();
            if (TableUtil.tableExists(batchTableEnvironment, string)) {
                return;
            }
            if (config.hasPath("field_name")) {
                batchTableEnvironment.registerDataSet(string, dataSet, config.getString("field_name"));
            } else {
                batchTableEnvironment.registerDataSet(string, dataSet);
            }
        }
    }

    private Optional<DataSet<Row>> fromSourceTable(Config config) {
        if (!config.hasPath(Plugin.SOURCE_TABLE_NAME)) {
            return Optional.empty();
        }
        BatchTableEnvironment batchTableEnvironment = this.flinkEnvironment.getBatchTableEnvironment();
        return Optional.ofNullable(TableUtil.tableToDataSet(batchTableEnvironment, batchTableEnvironment.scan(new String[]{config.getString(Plugin.SOURCE_TABLE_NAME)})));
    }

    @Override // org.apache.seatunnel.plugin.Plugin
    public void setConfig(Config config) {
        this.config = config;
    }

    @Override // org.apache.seatunnel.plugin.Plugin
    public Config getConfig() {
        return this.config;
    }

    private boolean whetherExecute(List<FlinkBatchSink> list) {
        return list.stream().anyMatch(flinkBatchSink -> {
            return !"ConsoleSink".equals(flinkBatchSink.getPluginName());
        });
    }
}
