package org.apache.seatunnel.engine.e2e;

import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.instance.impl.HazelcastInstanceImpl;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.NonNull;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.common.utils.FileUtils;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
import org.apache.seatunnel.engine.client.SeaTunnelClient;
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
import org.apache.seatunnel.format.text.constant.TextFormatConstant;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.shaded.org.apache.commons.lang3.tuple.ImmutablePair;

/* loaded from: input_file:org/apache/seatunnel/engine/e2e/TextHeaderIT.class */
public class TextHeaderIT {
    private static final Logger log = LoggerFactory.getLogger(TextHeaderIT.class);
    private String FILE_FORMAT_TYPE = "file_format_type";
    private String ENABLE_HEADER_WRITE = "enable_header_write";

    /* loaded from: input_file:org/apache/seatunnel/engine/e2e/TextHeaderIT$ContentHeader.class */
    static class ContentHeader {
        private String fileStyle;
        private String enableWriteHeader;
        private String headerName;

        public ContentHeader(String str, String str2, String str3) {
            this.fileStyle = str;
            this.enableWriteHeader = str2;
            this.headerName = str3;
        }

        public String getFileStyle() {
            return this.fileStyle;
        }

        public String getEnableWriteHeader() {
            return this.enableWriteHeader;
        }

        public String getHeaderName() {
            return this.headerName;
        }

        public void setFileStyle(String str) {
            this.fileStyle = str;
        }

        public void setEnableWriteHeader(String str) {
            this.enableWriteHeader = str;
        }

        public void setHeaderName(String str) {
            this.headerName = str;
        }
    }

    @Test
    public void testEnableWriteHeader() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new ContentHeader("text", "true", "name" + TextFormatConstant.SEPARATOR[0] + "age"));
        arrayList.add(new ContentHeader("text", "false", "name" + TextFormatConstant.SEPARATOR[0] + "age"));
        arrayList.add(new ContentHeader("csv", "true", "name,age"));
        arrayList.add(new ContentHeader("csv", "false", "name,age"));
        arrayList.forEach(contentHeader -> {
            try {
                enableWriteHeader(contentHeader.getFileStyle(), contentHeader.getEnableWriteHeader(), contentHeader.getHeaderName());
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }

    public void enableWriteHeader(String str, String str2, String str3) throws Exception {
        HazelcastInstanceImpl hazelcastInstanceImpl = null;
        SeaTunnelClient seaTunnelClient = null;
        SeaTunnelConfig locateAndGetSeaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
        locateAndGetSeaTunnelConfig.getHazelcastConfig().setClusterName(TestUtils.getClusterName("ClusterFaultToleranceIT_EnableWriteHeaderNode"));
        try {
            hazelcastInstanceImpl = SeaTunnelServerStarter.createHazelcastInstance(locateAndGetSeaTunnelConfig);
            Awaitility.await().atMost(10000L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
                Assertions.assertEquals(1, hazelcastInstanceImpl.getCluster().getMembers().size());
            });
            Common.setDeployMode(DeployMode.CLIENT);
            ImmutablePair<String, String> createTestResources = createTestResources(str2, str);
            JobConfig jobConfig = new JobConfig();
            jobConfig.setName(str2);
            ClientConfig locateAndGetClientConfig = ConfigProvider.locateAndGetClientConfig();
            locateAndGetClientConfig.setClusterName(TestUtils.getClusterName("ClusterFaultToleranceIT_EnableWriteHeaderNode"));
            seaTunnelClient = new SeaTunnelClient(locateAndGetClientConfig);
            ClientJobProxy execute = seaTunnelClient.createExecutionContext((String) createTestResources.getRight(), jobConfig, locateAndGetSeaTunnelConfig).execute();
            execute.getClass();
            CompletableFuture supplyAsync = CompletableFuture.supplyAsync(execute::waitForJobComplete);
            Awaitility.await().atMost(600000L, TimeUnit.MILLISECONDS).untilAsserted(() -> {
                Thread.sleep(2000L);
                Assertions.assertTrue(supplyAsync.isDone() && JobStatus.FINISHED.equals(supplyAsync.get()));
            });
            for (File file : new File((String) createTestResources.getLeft()).listFiles()) {
                String[] split = FileUtils.readFileToStr(file.toPath()).split((String) BaseSinkConfig.ROW_DELIMITER.defaultValue());
                if (str2.equals("true")) {
                    Assertions.assertEquals(str3, split[0]);
                } else {
                    Assertions.assertNotEquals(str3, split[0]);
                }
            }
            log.info("========================clean test resource====================");
            if (seaTunnelClient != null) {
                seaTunnelClient.close();
            }
            if (hazelcastInstanceImpl != null) {
                hazelcastInstanceImpl.shutdown();
            }
        } catch (Throwable th) {
            if (seaTunnelClient != null) {
                seaTunnelClient.close();
            }
            if (hazelcastInstanceImpl != null) {
                hazelcastInstanceImpl.shutdown();
            }
            throw th;
        }
    }

    private ImmutablePair<String, String> createTestResources(@NonNull String str, @NonNull String str2) throws IOException {
        if (str == null) {
            throw new NullPointerException("headerWrite is marked non-null but is null");
        }
        if (str2 == null) {
            throw new NullPointerException("formatType is marked non-null but is null");
        }
        HashMap hashMap = new HashMap();
        hashMap.put(this.ENABLE_HEADER_WRITE, str);
        hashMap.put(this.FILE_FORMAT_TYPE, str2);
        String replace = "/tmp/text".replace("/", File.separator);
        FileUtils.createNewDir(replace);
        String str3 = File.separator + "tmp" + File.separator + "test_conf" + File.separator + str + ".conf";
        TestUtils.createTestConfigFileFromTemplate("batch_fakesource_to_file_header.conf", hashMap, str3);
        return new ImmutablePair<>(replace, str3);
    }
}
