package org.apache.seatunnel.flink.transform;

import org.apache.flink.api.java.DataSet;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.batch.FlinkBatchTransform;
import org.apache.seatunnel.flink.stream.FlinkStreamTransform;
import org.apache.seatunnel.flink.util.TableUtil;
import org.apache.seatunnel.shade.com.typesafe.config.Config;

/* loaded from: input_file:org/apache/seatunnel/flink/transform/Sql.class */
public class Sql implements FlinkStreamTransform, FlinkBatchTransform {
    private String sql;
    private Config config;
    private static final String SQL = "sql";

    @Override // org.apache.seatunnel.flink.stream.FlinkStreamTransform
    public DataStream<Row> processStream(FlinkEnvironment flinkEnvironment, DataStream<Row> dataStream) throws Exception {
        StreamTableEnvironment streamTableEnvironment = flinkEnvironment.getStreamTableEnvironment();
        try {
            return TableUtil.tableToDataStream(streamTableEnvironment, streamTableEnvironment.sqlQuery(this.sql), false);
        } catch (Exception e) {
            throw new Exception("Flink streaming transform sql execute failed, SQL: " + this.sql, e);
        }
    }

    @Override // org.apache.seatunnel.flink.batch.FlinkBatchTransform
    public DataSet<Row> processBatch(FlinkEnvironment flinkEnvironment, DataSet<Row> dataSet) throws Exception {
        BatchTableEnvironment batchTableEnvironment = flinkEnvironment.getBatchTableEnvironment();
        try {
            return TableUtil.tableToDataSet(batchTableEnvironment, batchTableEnvironment.sqlQuery(this.sql));
        } catch (Exception e) {
            throw new Exception("Flink batch transform sql execute failed, SQL: " + this.sql, e);
        }
    }

    @Override // org.apache.seatunnel.apis.base.plugin.Plugin
    public void setConfig(Config config) {
        this.config = config;
    }

    @Override // org.apache.seatunnel.apis.base.plugin.Plugin
    public Config getConfig() {
        return this.config;
    }

    @Override // org.apache.seatunnel.apis.base.plugin.Plugin
    public CheckResult checkConfig() {
        return CheckConfigUtil.checkAllExists(this.config, SQL);
    }

    @Override // org.apache.seatunnel.apis.base.plugin.Plugin
    public void prepare(FlinkEnvironment flinkEnvironment) {
        this.sql = this.config.getString(SQL);
    }

    @Override // org.apache.seatunnel.apis.base.plugin.Plugin
    public String getPluginName() {
        return SQL;
    }
}
