package org.apache.seatunnel.core.sql.job;

import java.io.File;
import java.lang.reflect.Method;
import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.ServiceLoader;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.apache.flink.table.factories.Factory;
import org.apache.flink.table.operations.CatalogSinkModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ddl.CreateTableOperation;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.common.utils.ReflectionUtils;
import org.apache.seatunnel.core.sql.classloader.CustomClassLoader;
import org.apache.seatunnel.core.sql.splitter.SqlStatementSplitter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/core/sql/job/Executor.class */
public class Executor {
    private static final String FLINK_SQL_SET_MATCHING_REGEX = "SET(\\s+(\\S+)\\s*=(.*))?";
    private static final int FLINK_SQL_SET_OPERANDS = 3;
    private static final String CONNECTOR_IDENTIFIER = "connector";
    private static final String SQL_CONNECTOR_PREFIX = "flink-sql";
    private static final String CONNECTOR_JAR_PREFIX = "flink-sql-connector-";
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) Executor.class);
    private static CustomClassLoader CLASSLOADER = new CustomClassLoader();

    private Executor() {
        throw new IllegalStateException("Utility class");
    }

    public static void runJob(JobInfo jobInfo) {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment create = StreamTableEnvironment.create(executionEnvironment, EnvironmentSettings.newInstance().inStreamingMode().build());
        try {
            Configuration configuration = (Configuration) ((Method) ((Optional) Objects.requireNonNull(ReflectionUtils.getDeclaredMethod(StreamExecutionEnvironment.class, "getConfiguration", new Class[0]))).orElseThrow(() -> {
                return new RuntimeException("can't find method: getConfiguration");
            })).invoke(executionEnvironment, new Object[0]);
            ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
            try {
                Thread.currentThread().setContextClassLoader(CLASSLOADER);
                handleStatements(jobInfo.getJobContent(), create, configuration).execute();
                Thread.currentThread().setContextClassLoader(contextClassLoader);
            } catch (Throwable th) {
                Thread.currentThread().setContextClassLoader(contextClassLoader);
                throw th;
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private static StatementSet handleStatements(String str, StreamTableEnvironment streamTableEnvironment, Configuration configuration) {
        StatementSet createStatementSet = streamTableEnvironment.createStatementSet();
        TableEnvironmentImpl tableEnvironmentImpl = (TableEnvironmentImpl) streamTableEnvironment;
        Configuration configuration2 = streamTableEnvironment.getConfig().getConfiguration();
        for (String str2 : SqlStatementSplitter.normalizeStatements(str)) {
            Optional<Pair<String, String>> parseSetOperation = parseSetOperation(str2);
            if (parseSetOperation.isPresent()) {
                Pair<String, String> pair = parseSetOperation.get();
                callSetOperation(configuration2, pair.getLeft(), pair.getRight());
            } else {
                CreateTableOperation createTableOperation = (Operation) tableEnvironmentImpl.getParser().parse(str2).get(0);
                if (createTableOperation instanceof CatalogSinkModifyOperation) {
                    createStatementSet.addInsertSql(str2);
                } else {
                    if (createTableOperation instanceof CreateTableOperation) {
                        loadConnector((String) createTableOperation.getCatalogTable().getOptions().get(CONNECTOR_IDENTIFIER), configuration);
                    }
                    streamTableEnvironment.executeSql(str2);
                }
            }
        }
        return createStatementSet;
    }

    private static void loadConnector(String str, Configuration configuration) {
        Iterator it = ServiceLoader.load(Factory.class, CLASSLOADER).iterator();
        while (it.hasNext()) {
            if (((Factory) it.next()).factoryIdentifier().equals(str)) {
                return;
            }
        }
        Common.setDeployMode(DeployMode.CLIENT.getName());
        File file = Common.connectorJarDir(SQL_CONNECTOR_PREFIX).toFile();
        if (!file.exists() || file.listFiles() == null) {
            return;
        }
        List list = (List) Arrays.stream(file.listFiles()).filter(file2 -> {
            return file2.getName().startsWith(CONNECTOR_JAR_PREFIX + str);
        }).collect(Collectors.toList());
        if (list.size() > 1) {
            LOGGER.warn("Found more than one connector jars for {}. Only the first one will be loaded.", str);
        }
        File file3 = list.size() >= 1 ? (File) list.get(0) : null;
        if (file3 != null) {
            CLASSLOADER.addJar(file3.toPath());
            List list2 = (List) configuration.get(PipelineOptions.JARS);
            List arrayList = list2 == null ? new ArrayList() : list2;
            List list3 = (List) configuration.get(PipelineOptions.CLASSPATHS);
            List arrayList2 = list3 == null ? new ArrayList() : list3;
            try {
                String url = file3.toPath().toUri().toURL().toString();
                arrayList.add(url);
                arrayList2.add(url);
                configuration.set(PipelineOptions.JARS, arrayList);
                configuration.set(PipelineOptions.CLASSPATHS, arrayList2);
            } catch (MalformedURLException e) {
                LOGGER.error("Failed to load connector {}. Connector file: {}", str, file3.getAbsolutePath());
            }
        }
    }

    @VisibleForTesting
    static Optional<Pair<String, String>> parseSetOperation(String str) {
        Matcher matcher = Pattern.compile(FLINK_SQL_SET_MATCHING_REGEX, 34).matcher(str.trim());
        if (!matcher.matches()) {
            return Optional.empty();
        }
        String[] strArr = new String[matcher.groupCount()];
        for (int i = 0; i < strArr.length; i++) {
            strArr[i] = matcher.group(i + 1);
        }
        return operandConverter(strArr);
    }

    private static Optional<Pair<String, String>> operandConverter(String[] strArr) {
        return strArr.length != 3 ? Optional.empty() : Optional.of(Pair.of(strArr[1].trim(), strArr[2].trim()));
    }

    private static void callSetOperation(Configuration configuration, String str, String str2) {
        if (StringUtils.isEmpty(str)) {
            new IllegalArgumentException("key can not be empty!");
        }
        if (StringUtils.isEmpty(str2)) {
            new IllegalArgumentException("value can not be empty!");
        }
        configuration.setString(str, str2);
    }
}
