package org.apache.seatunnel.engine.e2e;

import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.config.Config;
import com.hazelcast.instance.impl.HazelcastInstanceImpl;
import java.io.File;
import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import lombok.NonNull;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.common.utils.FileUtils;
import org.apache.seatunnel.engine.client.SeaTunnelClient;
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
import org.apache.seatunnel.shade.com.google.common.base.Preconditions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.shaded.org.apache.commons.lang3.tuple.ImmutablePair;

/* loaded from: input_file:org/apache/seatunnel/engine/e2e/ClusterFaultToleranceIT.class */
public class ClusterFaultToleranceIT {
    private static final Logger log = LoggerFactory.getLogger(ClusterFaultToleranceIT.class);
    public static final String DYNAMIC_TEST_CASE_NAME = "dynamic_test_case_name";
    public static final String DYNAMIC_JOB_MODE = "dynamic_job_mode";
    public static final String DYNAMIC_TEST_ROW_NUM_PER_PARALLELISM = "dynamic_test_row_num_per_parallelism";
    public static final String DYNAMIC_TEST_PARALLELISM = "dynamic_test_parallelism";

    @Test
    public void testBatchJobRunOkIn2Node() throws ExecutionException, InterruptedException {
        HazelcastInstanceImpl hazelcastInstanceImpl = null;
        HazelcastInstanceImpl hazelcastInstanceImpl2 = null;
        SeaTunnelClient seaTunnelClient = null;
        SeaTunnelConfig locateAndGetSeaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
        locateAndGetSeaTunnelConfig.getHazelcastConfig().setClusterName(TestUtils.getClusterName("ClusterFaultToleranceIT_testBatchJobRunOkIn2Node"));
        try {
            hazelcastInstanceImpl = SeaTunnelServerStarter.createHazelcastInstance(locateAndGetSeaTunnelConfig);
            hazelcastInstanceImpl2 = SeaTunnelServerStarter.createHazelcastInstance(locateAndGetSeaTunnelConfig);
            Awaitility.await().atMost(10000L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
                Assertions.assertEquals(2, hazelcastInstanceImpl.getCluster().getMembers().size());
            });
            Common.setDeployMode(DeployMode.CLIENT);
            ImmutablePair<String, String> createTestResources = createTestResources("testBatchJobRunOkIn2Node", JobMode.BATCH, 1000L, 6);
            JobConfig jobConfig = new JobConfig();
            jobConfig.setName("testBatchJobRunOkIn2Node");
            ClientConfig locateAndGetClientConfig = ConfigProvider.locateAndGetClientConfig();
            locateAndGetClientConfig.setClusterName(TestUtils.getClusterName("ClusterFaultToleranceIT_testBatchJobRunOkIn2Node"));
            seaTunnelClient = new SeaTunnelClient(locateAndGetClientConfig);
            ClientJobProxy execute = seaTunnelClient.createExecutionContext((String) createTestResources.getRight(), jobConfig).execute();
            execute.getClass();
            CompletableFuture supplyAsync = CompletableFuture.supplyAsync(execute::waitForJobComplete);
            Awaitility.await().atMost(600000L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
                Thread.sleep(2000L);
                System.out.println("\n=================================" + FileUtils.getFileLineNumberFromDir((String) createTestResources.getLeft()) + "=================================\n");
                Assertions.assertTrue(supplyAsync.isDone() && JobStatus.FINISHED.equals(supplyAsync.get()));
            });
            Assertions.assertEquals(1000 * 6, FileUtils.getFileLineNumberFromDir((String) createTestResources.getLeft()));
            System.out.println(seaTunnelClient.getJobMetrics(Long.valueOf(execute.getJobId())));
            log.info("========================clean test resource====================");
            if (seaTunnelClient != null) {
                seaTunnelClient.shutdown();
            }
            if (hazelcastInstanceImpl != null) {
                hazelcastInstanceImpl.shutdown();
            }
            if (hazelcastInstanceImpl2 != null) {
                hazelcastInstanceImpl2.shutdown();
            }
        } catch (Throwable th) {
            if (seaTunnelClient != null) {
                seaTunnelClient.shutdown();
            }
            if (hazelcastInstanceImpl != null) {
                hazelcastInstanceImpl.shutdown();
            }
            if (hazelcastInstanceImpl2 != null) {
                hazelcastInstanceImpl2.shutdown();
            }
            throw th;
        }
    }

    private ImmutablePair<String, String> createTestResources(@NonNull String str, @NonNull JobMode jobMode, long j, int i) {
        if (str == null) {
            throw new NullPointerException("testCaseName is marked non-null but is null");
        }
        if (jobMode == null) {
            throw new NullPointerException("jobMode is marked non-null but is null");
        }
        Preconditions.checkArgument(j > 0, "rowNumber must greater than 0");
        Preconditions.checkArgument(i > 0, "parallelism must greater than 0");
        HashMap hashMap = new HashMap();
        hashMap.put("dynamic_test_case_name", str);
        hashMap.put("dynamic_job_mode", jobMode.toString());
        hashMap.put("dynamic_test_row_num_per_parallelism", String.valueOf(j));
        hashMap.put("dynamic_test_parallelism", String.valueOf(i));
        String replace = ("/tmp/hive/warehouse/" + str).replace("/", File.separator);
        FileUtils.createNewDir(replace);
        String str2 = File.separator + "tmp" + File.separator + "test_conf" + File.separator + str + ".conf";
        TestUtils.createTestConfigFileFromTemplate("cluster_batch_fake_to_localfile_template.conf", hashMap, str2);
        return new ImmutablePair<>(replace, str2);
    }

    @Test
    public void testStreamJobRunOkIn2Node() throws ExecutionException, InterruptedException {
        long j = 1000;
        int i = 6;
        HazelcastInstanceImpl hazelcastInstanceImpl = null;
        HazelcastInstanceImpl hazelcastInstanceImpl2 = null;
        SeaTunnelClient seaTunnelClient = null;
        SeaTunnelConfig locateAndGetSeaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
        locateAndGetSeaTunnelConfig.getHazelcastConfig().setClusterName(TestUtils.getClusterName("ClusterFaultToleranceIT_testStreamJobRunOkIn2Node"));
        try {
            hazelcastInstanceImpl = SeaTunnelServerStarter.createHazelcastInstance(locateAndGetSeaTunnelConfig);
            hazelcastInstanceImpl2 = SeaTunnelServerStarter.createHazelcastInstance(locateAndGetSeaTunnelConfig);
            Awaitility.await().atMost(10000L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
                Assertions.assertEquals(2, hazelcastInstanceImpl.getCluster().getMembers().size());
            });
            Common.setDeployMode(DeployMode.CLIENT);
            ImmutablePair<String, String> createTestResources = createTestResources("testStreamJobRunOkIn2Node", JobMode.STREAMING, 1000L, 6);
            JobConfig jobConfig = new JobConfig();
            jobConfig.setName("testStreamJobRunOkIn2Node");
            ClientConfig locateAndGetClientConfig = ConfigProvider.locateAndGetClientConfig();
            locateAndGetClientConfig.setClusterName(TestUtils.getClusterName("ClusterFaultToleranceIT_testStreamJobRunOkIn2Node"));
            seaTunnelClient = new SeaTunnelClient(locateAndGetClientConfig);
            ClientJobProxy execute = seaTunnelClient.createExecutionContext((String) createTestResources.getRight(), jobConfig).execute();
            execute.getClass();
            CompletableFuture supplyAsync = CompletableFuture.supplyAsync(execute::waitForJobComplete);
            Awaitility.await().atMost(2L, TimeUnit.MINUTES).untilAsserted(() -> {
                Thread.sleep(2000L);
                System.out.println("\n=================================" + FileUtils.getFileLineNumberFromDir((String) createTestResources.getLeft()) + "=================================\n");
                Assertions.assertTrue(JobStatus.RUNNING.equals(execute.getJobStatus()) && j * ((long) i) == FileUtils.getFileLineNumberFromDir((String) createTestResources.getLeft()).longValue());
            });
            execute.cancelJob();
            Awaitility.await().atMost(600000L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
                Assertions.assertTrue(supplyAsync.isDone() && JobStatus.CANCELED.equals(supplyAsync.get()));
            });
            Assertions.assertEquals(1000 * 6, FileUtils.getFileLineNumberFromDir((String) createTestResources.getLeft()));
            if (seaTunnelClient != null) {
                seaTunnelClient.shutdown();
            }
            if (hazelcastInstanceImpl != null) {
                hazelcastInstanceImpl.shutdown();
            }
            if (hazelcastInstanceImpl2 != null) {
                hazelcastInstanceImpl2.shutdown();
            }
        } catch (Throwable th) {
            if (seaTunnelClient != null) {
                seaTunnelClient.shutdown();
            }
            if (hazelcastInstanceImpl != null) {
                hazelcastInstanceImpl.shutdown();
            }
            if (hazelcastInstanceImpl2 != null) {
                hazelcastInstanceImpl2.shutdown();
            }
            throw th;
        }
    }

    @Test
    public void testBatchJobRestoreIn2NodeWorkerDown() throws ExecutionException, InterruptedException {
        HazelcastInstanceImpl hazelcastInstanceImpl = null;
        HazelcastInstanceImpl hazelcastInstanceImpl2 = null;
        SeaTunnelClient seaTunnelClient = null;
        SeaTunnelConfig locateAndGetSeaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
        locateAndGetSeaTunnelConfig.getHazelcastConfig().setClusterName(TestUtils.getClusterName("ClusterFaultToleranceIT_testBatchJobRestoreIn2NodeWorkerDown"));
        try {
            hazelcastInstanceImpl = SeaTunnelServerStarter.createHazelcastInstance(locateAndGetSeaTunnelConfig);
            hazelcastInstanceImpl2 = SeaTunnelServerStarter.createHazelcastInstance(locateAndGetSeaTunnelConfig);
            Awaitility.await().atMost(10000L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
                Assertions.assertEquals(2, hazelcastInstanceImpl.getCluster().getMembers().size());
            });
            log.info("===================================All node is running==========================");
            Common.setDeployMode(DeployMode.CLIENT);
            ImmutablePair<String, String> createTestResources = createTestResources("testBatchJobRestoreIn2NodeWorkerDown", JobMode.BATCH, 1000L, 2);
            JobConfig jobConfig = new JobConfig();
            jobConfig.setName("testBatchJobRestoreIn2NodeWorkerDown");
            ClientConfig locateAndGetClientConfig = ConfigProvider.locateAndGetClientConfig();
            locateAndGetClientConfig.setClusterName(TestUtils.getClusterName("ClusterFaultToleranceIT_testBatchJobRestoreIn2NodeWorkerDown"));
            seaTunnelClient = new SeaTunnelClient(locateAndGetClientConfig);
            ClientJobProxy execute = seaTunnelClient.createExecutionContext((String) createTestResources.getRight(), jobConfig).execute();
            execute.getClass();
            CompletableFuture supplyAsync = CompletableFuture.supplyAsync(execute::waitForJobComplete);
            Awaitility.await().atMost(180000L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
                Thread.sleep(2000L);
                System.out.println("\n=================================" + FileUtils.getFileLineNumberFromDir((String) createTestResources.getLeft()) + "=================================\n");
                Assertions.assertTrue(JobStatus.RUNNING.equals(execute.getJobStatus()) && FileUtils.getFileLineNumberFromDir((String) createTestResources.getLeft()).longValue() > 1);
            });
            log.info("=====================================shutdown node2=================================");
            hazelcastInstanceImpl2.shutdown();
            Awaitility.await().atMost(600000L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
                Assertions.assertTrue(supplyAsync.isDone() && JobStatus.FINISHED.equals(supplyAsync.get()));
            });
            Assertions.assertEquals(1000 * 2, FileUtils.getFileLineNumberFromDir((String) createTestResources.getLeft()));
            if (seaTunnelClient != null) {
                seaTunnelClient.shutdown();
            }
            if (hazelcastInstanceImpl != null) {
                hazelcastInstanceImpl.shutdown();
            }
            if (hazelcastInstanceImpl2 != null) {
                hazelcastInstanceImpl2.shutdown();
            }
        } catch (Throwable th) {
            if (seaTunnelClient != null) {
                seaTunnelClient.shutdown();
            }
            if (hazelcastInstanceImpl != null) {
                hazelcastInstanceImpl.shutdown();
            }
            if (hazelcastInstanceImpl2 != null) {
                hazelcastInstanceImpl2.shutdown();
            }
            throw th;
        }
    }

    @Test
    public void testStreamJobRestoreIn2NodeWorkerDown() throws ExecutionException, InterruptedException {
        long j = 1000;
        int i = 6;
        HazelcastInstanceImpl hazelcastInstanceImpl = null;
        HazelcastInstanceImpl hazelcastInstanceImpl2 = null;
        SeaTunnelClient seaTunnelClient = null;
        SeaTunnelConfig locateAndGetSeaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
        locateAndGetSeaTunnelConfig.getHazelcastConfig().setClusterName(TestUtils.getClusterName("ClusterFaultToleranceIT_testStreamJobRestoreIn2NodeWorkerDown"));
        try {
            hazelcastInstanceImpl = SeaTunnelServerStarter.createHazelcastInstance(locateAndGetSeaTunnelConfig);
            hazelcastInstanceImpl2 = SeaTunnelServerStarter.createHazelcastInstance(locateAndGetSeaTunnelConfig);
            Awaitility.await().atMost(10000L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
                Assertions.assertEquals(2, hazelcastInstanceImpl.getCluster().getMembers().size());
            });
            Common.setDeployMode(DeployMode.CLIENT);
            ImmutablePair<String, String> createTestResources = createTestResources("testStreamJobRestoreIn2NodeWorkerDown", JobMode.STREAMING, 1000L, 6);
            JobConfig jobConfig = new JobConfig();
            jobConfig.setName("testStreamJobRestoreIn2NodeWorkerDown");
            ClientConfig locateAndGetClientConfig = ConfigProvider.locateAndGetClientConfig();
            locateAndGetClientConfig.setClusterName(TestUtils.getClusterName("ClusterFaultToleranceIT_testStreamJobRestoreIn2NodeWorkerDown"));
            seaTunnelClient = new SeaTunnelClient(locateAndGetClientConfig);
            ClientJobProxy execute = seaTunnelClient.createExecutionContext((String) createTestResources.getRight(), jobConfig).execute();
            execute.getClass();
            CompletableFuture supplyAsync = CompletableFuture.supplyAsync(execute::waitForJobComplete);
            Awaitility.await().atMost(60000L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
                Thread.sleep(2000L);
                System.out.println("\n=================================" + FileUtils.getFileLineNumberFromDir((String) createTestResources.getLeft()) + "=================================\n");
                Assertions.assertTrue(JobStatus.RUNNING.equals(execute.getJobStatus()) && FileUtils.getFileLineNumberFromDir((String) createTestResources.getLeft()).longValue() > 1);
            });
            Thread.sleep(5000L);
            hazelcastInstanceImpl2.shutdown();
            Awaitility.await().atMost(600000L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
                Thread.sleep(2000L);
                System.out.println(FileUtils.getFileLineNumberFromDir((String) createTestResources.getLeft()));
                Assertions.assertTrue(JobStatus.RUNNING.equals(execute.getJobStatus()) && j * ((long) i) == FileUtils.getFileLineNumberFromDir((String) createTestResources.getLeft()).longValue());
            });
            Thread.sleep(10000L);
            execute.cancelJob();
            Awaitility.await().atMost(600000L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
                Assertions.assertTrue(supplyAsync.isDone() && JobStatus.CANCELED.equals(supplyAsync.get()));
            });
            Assertions.assertEquals(1000 * 6, FileUtils.getFileLineNumberFromDir((String) createTestResources.getLeft()));
            if (seaTunnelClient != null) {
                seaTunnelClient.shutdown();
            }
            if (hazelcastInstanceImpl != null) {
                hazelcastInstanceImpl.shutdown();
            }
            if (hazelcastInstanceImpl2 != null) {
                hazelcastInstanceImpl2.shutdown();
            }
        } catch (Throwable th) {
            if (seaTunnelClient != null) {
                seaTunnelClient.shutdown();
            }
            if (hazelcastInstanceImpl != null) {
                hazelcastInstanceImpl.shutdown();
            }
            if (hazelcastInstanceImpl2 != null) {
                hazelcastInstanceImpl2.shutdown();
            }
            throw th;
        }
    }

    @Test
    public void testBatchJobRestoreIn2NodeMasterDown() throws ExecutionException, InterruptedException {
        HazelcastInstanceImpl hazelcastInstanceImpl = null;
        HazelcastInstanceImpl hazelcastInstanceImpl2 = null;
        SeaTunnelClient seaTunnelClient = null;
        SeaTunnelConfig locateAndGetSeaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
        locateAndGetSeaTunnelConfig.getHazelcastConfig().setClusterName(TestUtils.getClusterName("ClusterFaultToleranceIT_testBatchJobRestoreIn2NodeMasterDown"));
        try {
            hazelcastInstanceImpl = SeaTunnelServerStarter.createHazelcastInstance(locateAndGetSeaTunnelConfig);
            hazelcastInstanceImpl2 = SeaTunnelServerStarter.createHazelcastInstance(locateAndGetSeaTunnelConfig);
            Awaitility.await().atMost(10000L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
                Assertions.assertEquals(2, hazelcastInstanceImpl.getCluster().getMembers().size());
            });
            Common.setDeployMode(DeployMode.CLIENT);
            ImmutablePair<String, String> createTestResources = createTestResources("testBatchJobRestoreIn2NodeMasterDown", JobMode.BATCH, 1000L, 6);
            JobConfig jobConfig = new JobConfig();
            jobConfig.setName("testBatchJobRestoreIn2NodeMasterDown");
            ClientConfig locateAndGetClientConfig = ConfigProvider.locateAndGetClientConfig();
            locateAndGetClientConfig.setClusterName(TestUtils.getClusterName("ClusterFaultToleranceIT_testBatchJobRestoreIn2NodeMasterDown"));
            seaTunnelClient = new SeaTunnelClient(locateAndGetClientConfig);
            ClientJobProxy execute = seaTunnelClient.createExecutionContext((String) createTestResources.getRight(), jobConfig).execute();
            execute.getClass();
            CompletableFuture supplyAsync = CompletableFuture.supplyAsync(execute::waitForJobComplete);
            Awaitility.await().atMost(60000L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
                Thread.sleep(2000L);
                System.out.println("\n=================================" + FileUtils.getFileLineNumberFromDir((String) createTestResources.getLeft()) + "=================================\n");
                Assertions.assertTrue(JobStatus.RUNNING.equals(execute.getJobStatus()) && FileUtils.getFileLineNumberFromDir((String) createTestResources.getLeft()).longValue() > 1);
            });
            hazelcastInstanceImpl.shutdown();
            Awaitility.await().atMost(600000L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
                Thread.sleep(2000L);
                System.out.println("\n=================================" + FileUtils.getFileLineNumberFromDir((String) createTestResources.getLeft()) + "=================================\n");
                Assertions.assertTrue(supplyAsync.isDone() && JobStatus.FINISHED.equals(supplyAsync.get()));
            });
            Assertions.assertEquals(1000 * 6, FileUtils.getFileLineNumberFromDir((String) createTestResources.getLeft()));
            if (seaTunnelClient != null) {
                seaTunnelClient.shutdown();
            }
            if (hazelcastInstanceImpl != null) {
                hazelcastInstanceImpl.shutdown();
            }
            if (hazelcastInstanceImpl2 != null) {
                hazelcastInstanceImpl2.shutdown();
            }
        } catch (Throwable th) {
            if (seaTunnelClient != null) {
                seaTunnelClient.shutdown();
            }
            if (hazelcastInstanceImpl != null) {
                hazelcastInstanceImpl.shutdown();
            }
            if (hazelcastInstanceImpl2 != null) {
                hazelcastInstanceImpl2.shutdown();
            }
            throw th;
        }
    }

    @Test
    public void testStreamJobRestoreIn2NodeMasterDown() throws ExecutionException, InterruptedException {
        long j = 1000;
        int i = 6;
        HazelcastInstanceImpl hazelcastInstanceImpl = null;
        HazelcastInstanceImpl hazelcastInstanceImpl2 = null;
        SeaTunnelClient seaTunnelClient = null;
        SeaTunnelConfig locateAndGetSeaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
        locateAndGetSeaTunnelConfig.getHazelcastConfig().setClusterName(TestUtils.getClusterName("ClusterFaultToleranceIT_testStreamJobRestoreIn2NodeMasterDown"));
        try {
            hazelcastInstanceImpl = SeaTunnelServerStarter.createHazelcastInstance(locateAndGetSeaTunnelConfig);
            hazelcastInstanceImpl2 = SeaTunnelServerStarter.createHazelcastInstance(locateAndGetSeaTunnelConfig);
            Awaitility.await().atMost(10000L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
                Assertions.assertEquals(2, hazelcastInstanceImpl.getCluster().getMembers().size());
            });
            Common.setDeployMode(DeployMode.CLIENT);
            ImmutablePair<String, String> createTestResources = createTestResources("testStreamJobRestoreIn2NodeMasterDown", JobMode.STREAMING, 1000L, 6);
            JobConfig jobConfig = new JobConfig();
            jobConfig.setName("testStreamJobRestoreIn2NodeMasterDown");
            ClientConfig locateAndGetClientConfig = ConfigProvider.locateAndGetClientConfig();
            locateAndGetClientConfig.setClusterName(TestUtils.getClusterName("ClusterFaultToleranceIT_testStreamJobRestoreIn2NodeMasterDown"));
            seaTunnelClient = new SeaTunnelClient(locateAndGetClientConfig);
            ClientJobProxy execute = seaTunnelClient.createExecutionContext((String) createTestResources.getRight(), jobConfig).execute();
            execute.getClass();
            CompletableFuture supplyAsync = CompletableFuture.supplyAsync(execute::waitForJobComplete);
            Awaitility.await().atMost(60000L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
                Thread.sleep(2000L);
                System.out.println("\n=================================" + FileUtils.getFileLineNumberFromDir((String) createTestResources.getLeft()) + "=================================\n");
                Assertions.assertTrue(JobStatus.RUNNING.equals(execute.getJobStatus()) && FileUtils.getFileLineNumberFromDir((String) createTestResources.getLeft()).longValue() > 1);
            });
            hazelcastInstanceImpl.shutdown();
            Awaitility.await().atMost(600000L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
                Thread.sleep(2000L);
                System.out.println("\n=================================" + FileUtils.getFileLineNumberFromDir((String) createTestResources.getLeft()) + "=================================\n");
                Assertions.assertTrue(JobStatus.RUNNING.equals(execute.getJobStatus()) && j * ((long) i) == FileUtils.getFileLineNumberFromDir((String) createTestResources.getLeft()).longValue());
            });
            Thread.sleep(10000L);
            execute.cancelJob();
            Awaitility.await().atMost(600000L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
                Assertions.assertTrue(supplyAsync.isDone() && JobStatus.CANCELED.equals(supplyAsync.get()));
            });
            Assertions.assertEquals(1000 * 6, FileUtils.getFileLineNumberFromDir((String) createTestResources.getLeft()));
            if (seaTunnelClient != null) {
                seaTunnelClient.shutdown();
            }
            if (hazelcastInstanceImpl != null) {
                hazelcastInstanceImpl.shutdown();
            }
            if (hazelcastInstanceImpl2 != null) {
                hazelcastInstanceImpl2.shutdown();
            }
        } catch (Throwable th) {
            if (seaTunnelClient != null) {
                seaTunnelClient.shutdown();
            }
            if (hazelcastInstanceImpl != null) {
                hazelcastInstanceImpl.shutdown();
            }
            if (hazelcastInstanceImpl2 != null) {
                hazelcastInstanceImpl2.shutdown();
            }
            throw th;
        }
    }

    @Disabled
    @Test
    public void testFor() throws ExecutionException, InterruptedException {
        for (int i = 0; i < 200; i++) {
            testStreamJobRestoreInAllNodeDown();
        }
    }

    @Test
    public void testStreamJobRestoreInAllNodeDown() throws ExecutionException, InterruptedException {
        String str = "ClusterFaultToleranceIT_testStreamJobRestoreInAllNodeDown_" + System.currentTimeMillis();
        int i = 1000;
        int i2 = 6;
        HazelcastInstanceImpl hazelcastInstanceImpl = null;
        HazelcastInstanceImpl hazelcastInstanceImpl2 = null;
        SeaTunnelClient seaTunnelClient = null;
        try {
            Config loadFromString = Config.loadFromString("hazelcast:\n  cluster-name: seatunnel\n  network:\n    rest-api:\n      enabled: true\n      endpoint-groups:\n        CLUSTER_WRITE:\n          enabled: true\n    join:\n      tcp-ip:\n        enabled: true\n        member-list:\n          - localhost\n    port:\n      auto-increment: true\n      port-count: 100\n      port: 5801\n  map:\n    engine*:\n      map-store:\n        enabled: true\n        initial-mode: EAGER\n        factory-class-name: org.apache.seatunnel.engine.server.persistence.FileMapStoreFactory\n        properties:\n          type: hdfs\n          namespace: /tmp/seatunnel/imap\n          clusterName: " + str + "\n          fs.defaultFS: file:///\n\n  properties:\n    hazelcast.invocation.max.retry.count: 200\n    hazelcast.tcp.join.port.try.count: 30\n    hazelcast.invocation.retry.pause.millis: 2000\n    hazelcast.slow.operation.detector.stacktrace.logging.enabled: true\n    hazelcast.logging.type: log4j2\n    hazelcast.operation.generic.thread.count: 200\n");
            loadFromString.setClusterName(TestUtils.getClusterName(str));
            SeaTunnelConfig locateAndGetSeaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
            locateAndGetSeaTunnelConfig.setHazelcastConfig(loadFromString);
            HazelcastInstanceImpl createHazelcastInstance = SeaTunnelServerStarter.createHazelcastInstance(locateAndGetSeaTunnelConfig);
            HazelcastInstanceImpl createHazelcastInstance2 = SeaTunnelServerStarter.createHazelcastInstance(locateAndGetSeaTunnelConfig);
            Awaitility.await().atMost(10000L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
                Assertions.assertEquals(2, createHazelcastInstance.getCluster().getMembers().size());
            });
            Common.setDeployMode(DeployMode.CLIENT);
            ImmutablePair<String, String> createTestResources = createTestResources("testStreamJobRestoreInAllNodeDown", JobMode.STREAMING, 1000, 6);
            JobConfig jobConfig = new JobConfig();
            jobConfig.setName("testStreamJobRestoreInAllNodeDown");
            ClientConfig locateAndGetClientConfig = ConfigProvider.locateAndGetClientConfig();
            locateAndGetClientConfig.setClusterName(TestUtils.getClusterName(str));
            ClientJobProxy execute = new SeaTunnelClient(locateAndGetClientConfig).createExecutionContext((String) createTestResources.getRight(), jobConfig).execute();
            Long valueOf = Long.valueOf(execute.getJobId());
            Awaitility.await().atMost(600000L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
                Thread.sleep(2000L);
                System.out.println("\n=================================" + FileUtils.getFileLineNumberFromDir((String) createTestResources.getLeft()) + "=================================\n");
                Assertions.assertTrue(JobStatus.RUNNING.equals(execute.getJobStatus()) && FileUtils.getFileLineNumberFromDir((String) createTestResources.getLeft()).longValue() > 1);
            });
            Thread.sleep(5000L);
            createHazelcastInstance.shutdown();
            createHazelcastInstance2.shutdown();
            log.info("==========================================All node is done========================================");
            Thread.sleep(10000L);
            hazelcastInstanceImpl = SeaTunnelServerStarter.createHazelcastInstance(locateAndGetSeaTunnelConfig);
            hazelcastInstanceImpl2 = SeaTunnelServerStarter.createHazelcastInstance(locateAndGetSeaTunnelConfig);
            log.info("==========================================All node is start, begin check node size ========================================");
            Awaitility.await().atMost(60000L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
                Assertions.assertEquals(2, hazelcastInstanceImpl.getCluster().getMembers().size());
            });
            log.info("==========================================All node is running========================================");
            seaTunnelClient = new SeaTunnelClient(locateAndGetClientConfig);
            ClientJobProxy jobProxy = seaTunnelClient.createJobClient().getJobProxy(valueOf);
            jobProxy.getClass();
            CompletableFuture supplyAsync = CompletableFuture.supplyAsync(jobProxy::waitForJobComplete);
            Thread.sleep(10000L);
            Awaitility.await().atMost(100000L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
                Thread.sleep(2000L);
                System.out.println("\n=================================" + FileUtils.getFileLineNumberFromDir((String) createTestResources.getLeft()) + "=================================\n");
                JobStatus jobStatus = null;
                try {
                    jobStatus = jobProxy.getJobStatus();
                } catch (Exception e) {
                    log.error(ExceptionUtils.getMessage(e));
                }
                Assertions.assertTrue(JobStatus.RUNNING.equals(jobStatus) && ((long) (i * i2)) == FileUtils.getFileLineNumberFromDir((String) createTestResources.getLeft()).longValue());
            });
            Thread.sleep(10000L);
            log.info("==========================================Cancel Job========================================");
            jobProxy.cancelJob();
            Awaitility.await().atMost(600000L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
                Assertions.assertTrue(supplyAsync.isDone() && JobStatus.CANCELED.equals(supplyAsync.get()));
            });
            Assertions.assertEquals(1000 * 6, FileUtils.getFileLineNumberFromDir((String) createTestResources.getLeft()));
            log.info("==========================================Clean test resource ========================================");
            if (seaTunnelClient != null) {
                seaTunnelClient.shutdown();
            }
            if (hazelcastInstanceImpl != null) {
                hazelcastInstanceImpl.shutdown();
            }
            if (hazelcastInstanceImpl2 != null) {
                hazelcastInstanceImpl2.shutdown();
            }
        } catch (Throwable th) {
            log.info("==========================================Clean test resource ========================================");
            if (seaTunnelClient != null) {
                seaTunnelClient.shutdown();
            }
            if (hazelcastInstanceImpl != null) {
                hazelcastInstanceImpl.shutdown();
            }
            if (hazelcastInstanceImpl2 != null) {
                hazelcastInstanceImpl2.shutdown();
            }
            throw th;
        }
    }

    @Disabled
    @Test
    public void testStreamJobRestoreFromOssInAllNodeDown() throws ExecutionException, InterruptedException {
        String str = "ClusterFaultToleranceIT_testStreamJobRestoreFromOssInAllNodeDown_" + System.currentTimeMillis();
        int i = 1000;
        int i2 = 6;
        HazelcastInstanceImpl hazelcastInstanceImpl = null;
        HazelcastInstanceImpl hazelcastInstanceImpl2 = null;
        SeaTunnelClient seaTunnelClient = null;
        try {
            Config loadFromString = Config.loadFromString("hazelcast:\n  cluster-name: seatunnel\n  network:\n    rest-api:\n      enabled: true\n      endpoint-groups:\n        CLUSTER_WRITE:\n          enabled: true\n    join:\n      tcp-ip:\n        enabled: true\n        member-list:\n          - localhost\n    port:\n      auto-increment: true\n      port-count: 100\n      port: 5801\n  map:\n    engine*:\n      map-store:\n        enabled: true\n        initial-mode: EAGER\n        factory-class-name: org.apache.seatunnel.engine.server.persistence.FileMapStoreFactory\n        properties:\n          type: hdfs\n          namespace: /seatunnel-test/imap\n          storage.type: oss\n          clusterName: " + str + "\n          oss.bucket: oss://your bucket name/\n          fs.oss.accessKeyId: oss accessKey id\n          fs.oss.accessKeySecret: oss accessKey secret\n          fs.oss.endpoint: your oss endpoint\n          fs.oss.credentials.provider: org.apache.hadoop.fs.aliyun.oss.AliyunCredentialsProvider\n  properties:\n    hazelcast.invocation.max.retry.count: 200\n    hazelcast.tcp.join.port.try.count: 30\n    hazelcast.invocation.retry.pause.millis: 2000\n    hazelcast.slow.operation.detector.stacktrace.logging.enabled: true\n    hazelcast.logging.type: log4j2\n    hazelcast.operation.generic.thread.count: 200\n");
            loadFromString.setClusterName(TestUtils.getClusterName(str));
            SeaTunnelConfig locateAndGetSeaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
            locateAndGetSeaTunnelConfig.setHazelcastConfig(loadFromString);
            HazelcastInstanceImpl createHazelcastInstance = SeaTunnelServerStarter.createHazelcastInstance(locateAndGetSeaTunnelConfig);
            HazelcastInstanceImpl createHazelcastInstance2 = SeaTunnelServerStarter.createHazelcastInstance(locateAndGetSeaTunnelConfig);
            Awaitility.await().atMost(10000L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
                Assertions.assertEquals(2, createHazelcastInstance.getCluster().getMembers().size());
            });
            Common.setDeployMode(DeployMode.CLIENT);
            ImmutablePair<String, String> createTestResources = createTestResources("testStreamJobRestoreFromOssInAllNodeDown", JobMode.STREAMING, 1000, 6);
            JobConfig jobConfig = new JobConfig();
            jobConfig.setName("testStreamJobRestoreFromOssInAllNodeDown");
            ClientConfig locateAndGetClientConfig = ConfigProvider.locateAndGetClientConfig();
            locateAndGetClientConfig.setClusterName(TestUtils.getClusterName(str));
            ClientJobProxy execute = new SeaTunnelClient(locateAndGetClientConfig).createExecutionContext((String) createTestResources.getRight(), jobConfig).execute();
            Long valueOf = Long.valueOf(execute.getJobId());
            Awaitility.await().atMost(600000L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
                Thread.sleep(2000L);
                System.out.println("\n=================================" + FileUtils.getFileLineNumberFromDir((String) createTestResources.getLeft()) + "=================================\n");
                Assertions.assertTrue(JobStatus.RUNNING.equals(execute.getJobStatus()) && FileUtils.getFileLineNumberFromDir((String) createTestResources.getLeft()).longValue() > 1);
            });
            Thread.sleep(5000L);
            createHazelcastInstance.shutdown();
            createHazelcastInstance2.shutdown();
            log.info("==========================================All node is done========================================");
            Thread.sleep(10000L);
            hazelcastInstanceImpl = SeaTunnelServerStarter.createHazelcastInstance(locateAndGetSeaTunnelConfig);
            hazelcastInstanceImpl2 = SeaTunnelServerStarter.createHazelcastInstance(locateAndGetSeaTunnelConfig);
            log.info("==========================================All node is start, begin check node size ========================================");
            Awaitility.await().atMost(60000L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
                Assertions.assertEquals(2, hazelcastInstanceImpl.getCluster().getMembers().size());
            });
            log.info("==========================================All node is running========================================");
            seaTunnelClient = new SeaTunnelClient(locateAndGetClientConfig);
            ClientJobProxy jobProxy = seaTunnelClient.createJobClient().getJobProxy(valueOf);
            jobProxy.getClass();
            CompletableFuture supplyAsync = CompletableFuture.supplyAsync(jobProxy::waitForJobComplete);
            Thread.sleep(10000L);
            Awaitility.await().atMost(100000L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
                Thread.sleep(2000L);
                System.out.println("\n=================================" + FileUtils.getFileLineNumberFromDir((String) createTestResources.getLeft()) + "=================================\n");
                JobStatus jobStatus = null;
                try {
                    jobStatus = jobProxy.getJobStatus();
                } catch (Exception e) {
                    log.error(ExceptionUtils.getMessage(e));
                }
                Assertions.assertTrue(JobStatus.RUNNING.equals(jobStatus) && ((long) (i * i2)) == FileUtils.getFileLineNumberFromDir((String) createTestResources.getLeft()).longValue());
            });
            Thread.sleep(10000L);
            log.info("==========================================Cancel Job========================================");
            jobProxy.cancelJob();
            Awaitility.await().atMost(600000L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
                Assertions.assertTrue(supplyAsync.isDone() && JobStatus.CANCELED.equals(supplyAsync.get()));
            });
            Assertions.assertEquals(1000 * 6, FileUtils.getFileLineNumberFromDir((String) createTestResources.getLeft()));
            log.info("==========================================Clean test resource ========================================");
            if (seaTunnelClient != null) {
                seaTunnelClient.shutdown();
            }
            if (hazelcastInstanceImpl != null) {
                hazelcastInstanceImpl.shutdown();
            }
            if (hazelcastInstanceImpl2 != null) {
                hazelcastInstanceImpl2.shutdown();
            }
        } catch (Throwable th) {
            log.info("==========================================Clean test resource ========================================");
            if (seaTunnelClient != null) {
                seaTunnelClient.shutdown();
            }
            if (hazelcastInstanceImpl != null) {
                hazelcastInstanceImpl.shutdown();
            }
            if (hazelcastInstanceImpl2 != null) {
                hazelcastInstanceImpl2.shutdown();
            }
            throw th;
        }
    }
}
