package org.apache.beam.runners.flink;

import java.io.ByteArrayOutputStream;
import java.io.PrintStream;
import org.apache.beam.runners.flink.FlinkJobServerDriver;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Charsets;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.Is;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/flink/FlinkJobServerDriverTest.class */
public class FlinkJobServerDriverTest {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkJobServerDriverTest.class);

    @Test
    public void testConfigurationDefaults() {
        FlinkJobServerDriver.FlinkServerConfiguration flinkServerConfiguration = new FlinkJobServerDriver.FlinkServerConfiguration();
        MatcherAssert.assertThat(flinkServerConfiguration.getHost(), Is.is("localhost"));
        MatcherAssert.assertThat(Integer.valueOf(flinkServerConfiguration.getPort()), Is.is(8099));
        MatcherAssert.assertThat(Integer.valueOf(flinkServerConfiguration.getArtifactPort()), Is.is(8098));
        MatcherAssert.assertThat(Integer.valueOf(flinkServerConfiguration.getExpansionPort()), Is.is(8097));
        MatcherAssert.assertThat(flinkServerConfiguration.getFlinkMasterUrl(), Is.is("[auto]"));
        MatcherAssert.assertThat(flinkServerConfiguration.getSdkWorkerParallelism(), Is.is(1L));
        MatcherAssert.assertThat(Boolean.valueOf(flinkServerConfiguration.isCleanArtifactsPerJob()), Is.is(false));
        MatcherAssert.assertThat(FlinkJobServerDriver.fromConfig(flinkServerConfiguration), Is.is(CoreMatchers.not(CoreMatchers.nullValue())));
    }

    @Test
    public void testConfigurationFromArgs() {
        FlinkJobServerDriver.FlinkServerConfiguration flinkServerConfiguration = FlinkJobServerDriver.fromParams(new String[]{"--job-host=test", "--job-port", "42", "--artifact-port", "43", "--expansion-port", "44", "--flink-master-url=jobmanager", "--sdk-worker-parallelism=4", "--clean-artifacts-per-job"}).configuration;
        MatcherAssert.assertThat(flinkServerConfiguration.getHost(), Is.is("test"));
        MatcherAssert.assertThat(Integer.valueOf(flinkServerConfiguration.getPort()), Is.is(42));
        MatcherAssert.assertThat(Integer.valueOf(flinkServerConfiguration.getArtifactPort()), Is.is(43));
        MatcherAssert.assertThat(Integer.valueOf(flinkServerConfiguration.getExpansionPort()), Is.is(44));
        MatcherAssert.assertThat(flinkServerConfiguration.getFlinkMasterUrl(), Is.is("jobmanager"));
        MatcherAssert.assertThat(flinkServerConfiguration.getSdkWorkerParallelism(), Is.is(4L));
        MatcherAssert.assertThat(Boolean.valueOf(flinkServerConfiguration.isCleanArtifactsPerJob()), Is.is(true));
    }

    @Test
    public void testConfigurationFromConfig() {
        FlinkJobServerDriver.FlinkServerConfiguration flinkServerConfiguration = new FlinkJobServerDriver.FlinkServerConfiguration();
        MatcherAssert.assertThat(FlinkJobServerDriver.fromConfig(flinkServerConfiguration).configuration, Is.is(flinkServerConfiguration));
    }

    @Test(timeout = 30000)
    public void testJobServerDriver() throws Exception {
        FlinkJobServerDriver flinkJobServerDriver = null;
        Thread thread = null;
        PrintStream printStream = System.err;
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        PrintStream printStream2 = new PrintStream(byteArrayOutputStream);
        try {
            try {
                System.setErr(printStream2);
                flinkJobServerDriver = FlinkJobServerDriver.fromParams(new String[]{"--job-port=0", "--artifact-port=0", "--expansion-port=0"});
                thread = new Thread((Runnable) flinkJobServerDriver);
                thread.start();
                boolean z = false;
                while (!z) {
                    printStream2.flush();
                    String byteArrayOutputStream2 = byteArrayOutputStream.toString(Charsets.UTF_8.name());
                    if (byteArrayOutputStream2.contains("JobService started on localhost:") && byteArrayOutputStream2.contains("ArtifactStagingService started on localhost:") && byteArrayOutputStream2.contains("ExpansionService started on localhost:")) {
                        z = true;
                    } else {
                        Thread.sleep(100L);
                    }
                }
                MatcherAssert.assertThat(Boolean.valueOf(thread.isAlive()), Is.is(true));
                if (flinkJobServerDriver != null) {
                    flinkJobServerDriver.stop();
                }
                if (thread != null) {
                    thread.interrupt();
                    thread.join();
                }
            } finally {
                System.setErr(printStream);
            }
        } catch (Throwable th) {
            System.setErr(printStream);
            if (flinkJobServerDriver != null) {
                flinkJobServerDriver.stop();
            }
            if (thread != null) {
                thread.interrupt();
                thread.join();
            }
            throw th;
        }
    }
}
