package org.apache.seatunnel.engine.e2e.classloader;

import io.restassured.RestAssured;
import io.restassured.response.Response;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.TimeUnit;
import org.apache.seatunnel.e2e.common.util.ContainerUtil;
import org.apache.seatunnel.engine.e2e.SeaTunnelContainer;
import org.awaitility.Awaitility;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/seatunnel/engine/e2e/classloader/ClassLoaderITBase.class */
public abstract class ClassLoaderITBase extends SeaTunnelContainer {
    private static final String CONF_FILE = "/classloader/fake_to_inmemory.conf";
    private static final String http = "http://";
    private static final String colon = ":";
    private static final Path config = Paths.get("/tmp/seatunnel/", "config");
    private static final Path binPath = Paths.get("/tmp/seatunnel/", "bin", "seatunnel-cluster.sh");

    abstract boolean cacheMode();

    abstract String seatunnelConfigFileName();

    @Test
    public void testFakeSourceToInMemorySink() throws IOException, InterruptedException {
        LOG.info("test classloader with cache mode: {}", Boolean.valueOf(cacheMode()));
        for (int i = 0; i < 10; i++) {
            Assertions.assertEquals(0, executeJob(this.server, CONF_FILE).getExitCode());
            Assertions.assertTrue(containsDaemonThread());
            if (cacheMode()) {
                Assertions.assertTrue(3 >= getClassLoaderCount());
            } else {
                Assertions.assertTrue(2 + i >= getClassLoaderCount());
            }
        }
    }

    @Test
    public void testFakeSourceToInMemorySinkForRestApi() throws IOException, InterruptedException {
        LOG.info("test classloader with cache mode: {}", Boolean.valueOf(cacheMode()));
        ContainerUtil.copyConnectorJarToContainer(this.server, CONF_FILE, getConnectorModulePath(), getConnectorNamePrefix(), getConnectorType(), "/tmp/seatunnel/");
        Awaitility.await().atMost(2L, TimeUnit.MINUTES).untilAsserted(() -> {
            Response response = RestAssured.given().get(http + this.server.getHost() + colon + this.server.getFirstMappedPort() + "/hazelcast/rest/cluster", new Object[0]);
            response.then().statusCode(200);
            Thread.sleep(10000L);
            Assertions.assertEquals(1, response.jsonPath().getList("members").size());
        });
        for (int i = 0; i < 10; i++) {
            RestAssured.given().body("{\n\t\"env\": {\n\t\t\"parallelism\": 10,\n\t\t\"job.mode\": \"BATCH\"\n\t},\n\t\"source\": [\n\t\t{\n\t\t\t\"plugin_name\": \"FakeSource\",\n\t\t\t\"result_table_name\": \"fake\",\n\t\t\t\"parallelism\": 10,\n\t\t\t\"schema\": {\n\t\t\t\t\"fields\": {\n\t\t\t\t\t\"name\": \"string\",\n\t\t\t\t\t\"age\": \"int\",\n\t\t\t\t\t\"score\": \"double\"\n\t\t\t\t}\n\t\t\t}\n\t\t}\n\t],\n\t\"transform\": [],\n\t\"sink\": [\n\t\t{\n\t\t\t\"plugin_name\": \"InMemory\",\n\t\t\t\"source_table_name\": \"fake\"\n\t\t}\n\t]\n}").header("Content-Type", "application/json; charset=utf-8", new Object[0]).post(http + this.server.getHost() + colon + this.server.getFirstMappedPort() + "/hazelcast/rest/maps/submit-job", new Object[0]).then().statusCode(200);
            Awaitility.await().atMost(2L, TimeUnit.MINUTES).untilAsserted(() -> {
                RestAssured.given().get(http + this.server.getHost() + colon + this.server.getFirstMappedPort() + "/hazelcast/rest/maps/finished-jobs/FINISHED", new Object[0]).then().statusCode(200).body("[0].jobStatus", Matchers.equalTo("FINISHED"), new Object[0]);
            });
            Thread.sleep(5000L);
            Assertions.assertTrue(containsDaemonThread());
            if (cacheMode()) {
                Assertions.assertTrue(3 >= getClassLoaderCount());
            } else {
                Assertions.assertTrue(2 + i >= getClassLoaderCount());
            }
        }
    }

    private int getClassLoaderCount() throws IOException, InterruptedException {
        return ((Integer) ContainerUtil.getJVMLiveObject(this.server).getOrDefault("org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader", 0)).intValue();
    }

    private boolean containsDaemonThread() throws IOException, InterruptedException {
        return ContainerUtil.getJVMThreadNames(this.server).stream().anyMatch(str -> {
            return str.contains("InMemorySinkWriter-daemon-thread");
        });
    }

    @Override // org.apache.seatunnel.engine.e2e.SeaTunnelContainer
    @BeforeEach
    public void startUp() throws Exception {
        this.server = createSeaTunnelContainerWithFakeSourceAndInMemorySink(ContainerUtil.PROJECT_ROOT_PATH + "/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/classloader/" + seatunnelConfigFileName());
    }

    @Override // org.apache.seatunnel.engine.e2e.SeaTunnelContainer
    @AfterEach
    public void tearDown() throws Exception {
        super.tearDown();
    }
}
