package org.apache.seatunnel.engine.e2e;

import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.instance.impl.HazelcastInstanceImpl;
import io.restassured.RestAssured;
import io.restassured.response.Response;
import java.util.concurrent.TimeUnit;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.engine.client.SeaTunnelClient;
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
import org.awaitility.Awaitility;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/engine/e2e/RestApiIT.class */
public class RestApiIT {
    private static final Logger log = LoggerFactory.getLogger(RestApiIT.class);
    private static final String HOST = "http://localhost:";
    private static ClientJobProxy clientJobProxy;
    private static HazelcastInstanceImpl hazelcastInstance;

    @BeforeAll
    static void beforeClass() throws Exception {
        String clusterName = TestUtils.getClusterName("RestApiIT");
        SeaTunnelConfig locateAndGetSeaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
        locateAndGetSeaTunnelConfig.getHazelcastConfig().setClusterName(clusterName);
        hazelcastInstance = SeaTunnelServerStarter.createHazelcastInstance(locateAndGetSeaTunnelConfig);
        Common.setDeployMode(DeployMode.CLIENT);
        String resource = TestUtils.getResource("stream_fakesource_to_file.conf");
        JobConfig jobConfig = new JobConfig();
        jobConfig.setName("fake_to_file");
        ClientConfig locateAndGetClientConfig = ConfigProvider.locateAndGetClientConfig();
        locateAndGetClientConfig.setClusterName(clusterName);
        clientJobProxy = new SeaTunnelClient(locateAndGetClientConfig).createExecutionContext(resource, jobConfig).execute();
        Awaitility.await().atMost(2L, TimeUnit.MINUTES).untilAsserted(() -> {
            Assertions.assertEquals(JobStatus.RUNNING, clientJobProxy.getJobStatus());
        });
    }

    @Test
    public void testGetRunningJobById() {
        RestAssured.given().get(HOST + hazelcastInstance.getCluster().getLocalMember().getAddress().getPort() + "/hazelcast/rest/maps/running-job/" + clientJobProxy.getJobId(), new Object[0]).then().statusCode(200).body("jobName", Matchers.equalTo("fake_to_file"), new Object[0]).body("jobStatus", Matchers.equalTo("RUNNING"), new Object[0]);
    }

    @Test
    public void testGetRunningJobs() {
        RestAssured.given().get(HOST + hazelcastInstance.getCluster().getLocalMember().getAddress().getPort() + "/hazelcast/rest/maps/running-jobs", new Object[0]).then().statusCode(200).body("[0].jobName", Matchers.equalTo("fake_to_file"), new Object[0]).body("[0].jobStatus", Matchers.equalTo("RUNNING"), new Object[0]);
    }

    @Test
    public void testSystemMonitoringInformation() {
        RestAssured.given().get(HOST + hazelcastInstance.getCluster().getLocalMember().getAddress().getPort() + "/hazelcast/rest/maps/system-monitoring-information", new Object[0]).then().assertThat().time(Matchers.lessThan(5000L)).statusCode(200);
    }

    @Test
    public void testSubmitJob() {
        Response post = RestAssured.given().body("{\n    \"env\": {\n        \"job.mode\": \"batch\"\n    },\n    \"source\": [\n        {\n            \"plugin_name\": \"FakeSource\",\n            \"result_table_name\": \"fake\",\n            \"row.num\": 100,\n            \"schema\": {\n                \"fields\": {\n                    \"name\": \"string\",\n                    \"age\": \"int\",\n                    \"card\": \"int\"\n                }\n            }\n        }\n    ],\n    \"transform\": [\n    ],\n    \"sink\": [\n        {\n            \"plugin_name\": \"Console\",\n            \"source_table_name\": [\"fake\"]\n        }\n    ]\n}").post(HOST + hazelcastInstance.getCluster().getLocalMember().getAddress().getPort() + "/hazelcast/rest/maps/submit-job?jobId=1&jobName=test&isStartWithSavePoint=false", new Object[0]);
        post.then().statusCode(200).body("jobName", Matchers.equalTo("test"), new Object[0]);
        String string = post.getBody().jsonPath().getString("jobId");
        SeaTunnelServer seaTunnelServer = (SeaTunnelServer) hazelcastInstance.node.getNodeExtension().createExtensionServices().get("st:impl:seaTunnelServer");
        Assertions.assertEquals(JobStatus.RUNNING, seaTunnelServer.getCoordinatorService().getJobStatus(Long.parseLong(string)));
        Awaitility.await().atMost(2L, TimeUnit.MINUTES).untilAsserted(() -> {
            Assertions.assertEquals(JobStatus.FINISHED, seaTunnelServer.getCoordinatorService().getJobStatus(Long.parseLong(string)));
        });
    }

    @AfterAll
    static void afterClass() {
        if (hazelcastInstance != null) {
            hazelcastInstance.shutdown();
        }
    }
}
