package org.apache.beam.runners.flink;

import java.io.File;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.flink.FlinkJobServerDriver;
import org.apache.beam.runners.flink.FlinkPortableRunnerResult;
import org.apache.beam.runners.fnexecution.environment.ProcessManager;
import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
import org.apache.beam.runners.fnexecution.jobsubmission.JobInvoker;
import org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver;
import org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineResult;
import org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineRunner;
import org.apache.beam.runners.fnexecution.provisioning.JobInfo;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.flink.api.common.time.Deadline;
import org.kohsuke.args4j.CmdLineException;
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/flink/FlinkPortableClientEntryPoint.class */
public class FlinkPortableClientEntryPoint {
    private static final String JOB_ENDPOINT_FLAG = "--job_endpoint";
    private final String driverCmd;
    private FlinkJobServerDriver jobServer;
    private Thread jobServerThread;
    private DetachedJobInvokerFactory jobInvokerFactory;
    private int jobPort = 0;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) FlinkPortableClientEntryPoint.class);
    private static final Duration JOB_INVOCATION_TIMEOUT = Duration.ofSeconds(30);
    private static final Duration JOB_SERVICE_STARTUP_TIMEOUT = Duration.ofSeconds(30);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/flink/FlinkPortableClientEntryPoint$DetachedJobInvokerFactory.class */
    public class DetachedJobInvokerFactory implements JobServerDriver.JobInvokerFactory {
        private CountDownLatch latch;
        private volatile PortablePipelineRunner actualPipelineRunner;
        private volatile RunnerApi.Pipeline pipeline;
        private volatile JobInfo jobInfo;
        private PortablePipelineRunner handoverPipelineRunner;

        private DetachedJobInvokerFactory() {
            this.latch = new CountDownLatch(1);
            this.handoverPipelineRunner = new PortablePipelineRunner() { // from class: org.apache.beam.runners.flink.FlinkPortableClientEntryPoint.DetachedJobInvokerFactory.1
                @Override // org.apache.beam.runners.fnexecution.jobsubmission.PortablePipelineRunner
                public PortablePipelineResult run(RunnerApi.Pipeline pipeline, JobInfo jobInfo) {
                    DetachedJobInvokerFactory.this.pipeline = pipeline;
                    DetachedJobInvokerFactory.this.jobInfo = jobInfo;
                    FlinkPortableClientEntryPoint.LOG.info("Pipeline execution handover for {}", jobInfo.jobId());
                    DetachedJobInvokerFactory.this.latch.countDown();
                    return new FlinkPortableRunnerResult.Detached();
                }
            };
        }

        @Override // org.apache.beam.runners.fnexecution.jobsubmission.JobServerDriver.JobInvokerFactory
        public JobInvoker create() {
            return new FlinkJobInvoker((FlinkJobServerDriver.FlinkServerConfiguration) FlinkPortableClientEntryPoint.this.jobServer.configuration) { // from class: org.apache.beam.runners.flink.FlinkPortableClientEntryPoint.DetachedJobInvokerFactory.2
                /* JADX INFO: Access modifiers changed from: protected */
                @Override // org.apache.beam.runners.flink.FlinkJobInvoker
                public JobInvocation createJobInvocation(String str, String str2, ListeningExecutorService listeningExecutorService, RunnerApi.Pipeline pipeline, FlinkPipelineOptions flinkPipelineOptions, PortablePipelineRunner portablePipelineRunner) {
                    DetachedJobInvokerFactory.this.actualPipelineRunner = portablePipelineRunner;
                    return super.createJobInvocation(str, str2, listeningExecutorService, pipeline, flinkPipelineOptions, DetachedJobInvokerFactory.this.handoverPipelineRunner);
                }
            };
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void executeDetachedJob() throws Exception {
            long seconds = FlinkPortableClientEntryPoint.JOB_INVOCATION_TIMEOUT.getSeconds();
            if (!this.latch.await(seconds, TimeUnit.SECONDS)) {
                throw new TimeoutException(String.format("Timeout of %s seconds waiting for job submission.", Long.valueOf(seconds)));
            }
            this.actualPipelineRunner.run(this.pipeline, this.jobInfo);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/flink/FlinkPortableClientEntryPoint$EntryPointConfiguration.class */
    public static class EntryPointConfiguration {

        @Option(name = "--driver-cmd", required = true, usage = "Command that launches the Python driver program. (The job service endpoint will be appended as --job_endpoint=localhost:<port>.)")
        private String driverCmd;

        private EntryPointConfiguration() {
        }
    }

    public FlinkPortableClientEntryPoint(String str) {
        Preconditions.checkState(!str.contains(JOB_ENDPOINT_FLAG), "Driver command must not contain --job_endpoint");
        this.driverCmd = str;
    }

    public static void main(String[] strArr) throws Exception {
        LOG.info("entry points args: {}", Arrays.asList(strArr));
        EntryPointConfiguration parseArgs = parseArgs(strArr);
        FlinkPortableClientEntryPoint flinkPortableClientEntryPoint = new FlinkPortableClientEntryPoint(parseArgs.driverCmd);
        try {
            try {
                flinkPortableClientEntryPoint.startJobService();
                flinkPortableClientEntryPoint.runDriverProgram();
                LOG.info("Stopping job service");
                flinkPortableClientEntryPoint.stopJobService();
                LOG.info("Job submitted successfully.");
            } catch (Exception e) {
                throw new RuntimeException(String.format("Job %s failed.", parseArgs.driverCmd), e);
            }
        } catch (Throwable th) {
            LOG.info("Stopping job service");
            flinkPortableClientEntryPoint.stopJobService();
            throw th;
        }
    }

    private static EntryPointConfiguration parseArgs(String[] strArr) {
        EntryPointConfiguration entryPointConfiguration = new EntryPointConfiguration();
        CmdLineParser cmdLineParser = new CmdLineParser(entryPointConfiguration);
        try {
            cmdLineParser.parseArgument(strArr);
            return entryPointConfiguration;
        } catch (CmdLineException e) {
            LOG.error("Unable to parse command line arguments.", (Throwable) e);
            cmdLineParser.printUsage(System.err);
            throw new IllegalArgumentException("Unable to parse command line arguments.", e);
        }
    }

    private void startJobService() throws Exception {
        this.jobInvokerFactory = new DetachedJobInvokerFactory();
        this.jobServer = FlinkJobServerDriver.fromConfig(FlinkJobServerDriver.parseArgs(new String[]{"--job-port=" + this.jobPort, "--artifact-port=0", "--expansion-port=0"}), this.jobInvokerFactory);
        this.jobServerThread = new Thread(this.jobServer);
        this.jobServerThread.start();
        Deadline fromNow = Deadline.fromNow(JOB_SERVICE_STARTUP_TIMEOUT);
        while (this.jobServer.getJobServerUrl() == null && fromNow.hasTimeLeft()) {
            try {
                Thread.sleep(500L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }
        if (!this.jobServerThread.isAlive()) {
            throw new IllegalStateException("Job service thread is not alive");
        }
        if (this.jobServer.getJobServerUrl() == null) {
            throw new TimeoutException(String.format("Timeout of %s waiting for job service to start.", fromNow));
        }
    }

    private void runDriverProgram() throws Exception {
        ProcessManager create = ProcessManager.create();
        ImmutableList of = ImmutableList.of("-c", String.format("%s %s=%s", this.driverCmd, JOB_ENDPOINT_FLAG, this.jobServer.getJobServerUrl()));
        File createTempFile = File.createTempFile("beam-driver-program", ".log");
        try {
            create.startProcess("client1", "bash", of, System.getenv(), createTempFile).isAliveOrThrow();
            LOG.info("Started driver program");
            this.jobInvokerFactory.executeDetachedJob();
        } catch (Exception e) {
            try {
                create.stopProcess("client1");
            } catch (Exception e2) {
                e.addSuppressed(e2);
            }
            throw new RuntimeException(String.format("Failed to start job with driver program: %s %s output: %s", "bash", of, new String(Files.readAllBytes(createTempFile.toPath()), Charset.defaultCharset())), e);
        }
    }

    private void stopJobService() throws InterruptedException {
        if (this.jobServer != null) {
            this.jobServer.stop();
        }
        if (this.jobServerThread != null) {
            this.jobServerThread.interrupt();
            this.jobServerThread.join();
        }
    }
}
