package org.apache.seatunnel.engine.e2e;

import com.hazelcast.client.config.ClientConfig;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.engine.client.SeaTunnelClient;
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/engine/e2e/JobExecutionIT.class */
public class JobExecutionIT {
    private static final Logger log = LoggerFactory.getLogger(JobExecutionIT.class);

    @BeforeAll
    public static void beforeClass() throws Exception {
        SeaTunnelServerStarter.createHazelcastInstance(TestUtils.getClusterName("JobExecutionIT"));
    }

    @Test
    public void testSayHello() {
        ClientConfig locateAndGetClientConfig = ConfigProvider.locateAndGetClientConfig();
        locateAndGetClientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
        Assertions.assertEquals("Hello world", new SeaTunnelClient(locateAndGetClientConfig).printMessageToMaster("Hello world"));
    }

    @Test
    public void testExecuteJob() throws Exception {
        Common.setDeployMode(DeployMode.CLIENT);
        String resource = TestUtils.getResource("batch_fakesource_to_file.conf");
        JobConfig jobConfig = new JobConfig();
        jobConfig.setName("fake_to_file");
        ClientConfig locateAndGetClientConfig = ConfigProvider.locateAndGetClientConfig();
        locateAndGetClientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
        ClientJobProxy execute = new SeaTunnelClient(locateAndGetClientConfig).createExecutionContext(resource, jobConfig).execute();
        CompletableFuture supplyAsync = CompletableFuture.supplyAsync(() -> {
            return execute.waitForJobComplete();
        });
        Awaitility.await().atMost(20000L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
            Assertions.assertTrue(supplyAsync.isDone() && JobStatus.FINISHED.equals(supplyAsync.get()));
        });
    }

    @Test
    public void cancelJobTest() throws Exception {
        Common.setDeployMode(DeployMode.CLIENT);
        String resource = TestUtils.getResource("streaming_fakesource_to_file_complex.conf");
        JobConfig jobConfig = new JobConfig();
        jobConfig.setName("fake_to_file");
        ClientConfig locateAndGetClientConfig = ConfigProvider.locateAndGetClientConfig();
        locateAndGetClientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
        ClientJobProxy execute = new SeaTunnelClient(locateAndGetClientConfig).createExecutionContext(resource, jobConfig).execute();
        Assertions.assertFalse(execute.getJobStatus().isEndState());
        CompletableFuture supplyAsync = CompletableFuture.supplyAsync(() -> {
            return execute.waitForJobComplete();
        });
        Thread.sleep(1000L);
        execute.cancelJob();
        Awaitility.await().atMost(20000L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
            Assertions.assertTrue(supplyAsync.isDone() && JobStatus.CANCELED.equals(supplyAsync.get()));
        });
    }
}
