package org.apache.pig.impl.builtin;

import com.google.common.base.Charsets;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
import org.apache.pig.EvalFunc;
import org.apache.pig.ExecType;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.Launcher;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.io.BufferedPositionedInputStream;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.streaming.InputHandler;
import org.apache.pig.impl.streaming.OutputHandler;
import org.apache.pig.impl.streaming.PigStreamingUDF;
import org.apache.pig.impl.streaming.StreamingCommand;
import org.apache.pig.impl.streaming.StreamingUDFException;
import org.apache.pig.impl.streaming.StreamingUDFInputHandler;
import org.apache.pig.impl.streaming.StreamingUDFOutputHandler;
import org.apache.pig.impl.streaming.StreamingUDFOutputSchemaException;
import org.apache.pig.impl.streaming.StreamingUtil;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.impl.util.Utils;
import org.apache.pig.parser.ParserException;
import org.apache.pig.scripting.ScriptingOutputCapturer;

/* loaded from: input_file:org/apache/pig/impl/builtin/StreamingUDF.class */
public class StreamingUDF extends EvalFunc<Object> {
    private static final String PYTHON_CONTROLLER_JAR_PATH = "/python/streaming/controller.py";
    private static final String PYTHON_PIG_UTIL_PATH = "/python/streaming/pig_util.py";
    private static final int UDF_LANGUAGE = 0;
    private static final int PATH_TO_CONTROLLER_FILE = 1;
    private static final int UDF_FILE_NAME = 2;
    private static final int UDF_FILE_PATH = 3;
    private static final int UDF_NAME = 4;
    private static final int PATH_TO_FILE_CACHE = 5;
    private static final int STD_OUT_OUTPUT_PATH = 6;
    private static final int STD_ERR_OUTPUT_PATH = 7;
    private static final int CONTROLLER_LOG_FILE_PATH = 8;
    private static final int IS_ILLUSTRATE = 9;
    private String language;
    private String filePath;
    private String funcName;
    private Schema schema;
    private ExecType execType;
    private String isIllustrate;
    private boolean initialized = false;
    private ScriptingOutputCapturer soc;
    private Process process;
    private ProcessErrorThread stderrThread;
    private ProcessInputThread stdinThread;
    private ProcessOutputThread stdoutThread;
    private InputHandler inputHandler;
    private OutputHandler outputHandler;
    private BlockingQueue<Tuple> inputQueue;
    private BlockingQueue<Object> outputQueue;
    private DataOutputStream stdin;
    private InputStream stdout;
    private InputStream stderr;
    private volatile StreamingUDFException outerrThreadsError;
    public static final String TURN_ON_OUTPUT_CAPTURING = "TURN_ON_OUTPUT_CAPTURING";
    private static final int WAIT_FOR_ERROR_LENGTH = 500;
    private static final int MAX_WAIT_FOR_ERROR_ATTEMPTS = 5;
    private static final Log log = LogFactory.getLog(StreamingUDF.class);
    private static final Object ERROR_OUTPUT = new Object();
    private static final Object NULL_OBJECT = new Object();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/pig/impl/builtin/StreamingUDF$ProcessErrorThread.class */
    public class ProcessErrorThread extends Thread {
        public ProcessErrorThread() {
            setDaemon(true);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                StreamingUDF.log.debug("Starting PET");
                Integer num = null;
                StringBuffer stringBuffer = new StringBuffer();
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(StreamingUDF.this.stderr, Charsets.UTF_8));
                while (true) {
                    String readLine = bufferedReader.readLine();
                    if (readLine == null) {
                        break;
                    }
                    if (num == null) {
                        try {
                            num = Integer.valueOf(readLine);
                        } catch (NumberFormatException e) {
                            stringBuffer.append(readLine + "\n");
                        }
                    } else {
                        stringBuffer.append(readLine + "\n");
                    }
                }
                StreamingUDF.this.outerrThreadsError = new StreamingUDFException(StreamingUDF.this.language, stringBuffer.toString(), num);
                if (StreamingUDF.this.outputQueue != null) {
                    StreamingUDF.this.outputQueue.put(StreamingUDF.ERROR_OUTPUT);
                }
                if (StreamingUDF.this.stderr != null) {
                    StreamingUDF.this.stderr.close();
                    StreamingUDF.this.stderr = null;
                }
            } catch (IOException e2) {
                StreamingUDF.log.debug("Process Ended");
            } catch (Exception e3) {
                StreamingUDF.log.error("standard error problem", e3);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/pig/impl/builtin/StreamingUDF$ProcessInputThread.class */
    public class ProcessInputThread extends Thread {
        ProcessInputThread() {
            setDaemon(true);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                StreamingUDF.log.debug("Starting PIT");
                while (true) {
                    StreamingUDF.this.inputHandler.putNext((Tuple) StreamingUDF.this.inputQueue.take());
                    try {
                        StreamingUDF.this.stdin.flush();
                    } catch (Exception e) {
                        return;
                    }
                }
            } catch (Exception e2) {
                StreamingUDF.log.error(e2);
            }
        }
    }

    /* loaded from: input_file:org/apache/pig/impl/builtin/StreamingUDF$ProcessKiller.class */
    public class ProcessKiller implements Runnable {
        public ProcessKiller() {
        }

        @Override // java.lang.Runnable
        public void run() {
            StreamingUDF.this.process.destroy();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/pig/impl/builtin/StreamingUDF$ProcessOutputThread.class */
    public class ProcessOutputThread extends Thread {
        ProcessOutputThread() {
            setDaemon(true);
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                StreamingUDF.log.debug("Starting POT");
                Object obj = StreamingUDF.this.outputHandler.getNext().get(0);
                while (obj != OutputHandler.END_OF_OUTPUT) {
                    if (obj != null) {
                        StreamingUDF.this.outputQueue.put(obj);
                    } else {
                        StreamingUDF.this.outputQueue.put(StreamingUDF.NULL_OBJECT);
                    }
                    obj = StreamingUDF.this.outputHandler.getNext().get(0);
                }
            } catch (Exception e) {
                if (StreamingUDF.this.outputQueue != null) {
                    for (int i = 0; StreamingUDF.this.stderrThread.isAlive() && i < 5; i++) {
                        try {
                            Thread.sleep(500L);
                        } catch (InterruptedException e2) {
                            StreamingUDF.log.error(e2);
                            return;
                        }
                    }
                    if (StreamingUDF.this.outerrThreadsError == null) {
                        StreamingUDF.this.outerrThreadsError = new StreamingUDFException(StreamingUDF.this.language, "Error deserializing output.  Please check that the declared outputSchema for function " + StreamingUDF.this.funcName + " matches the data type being returned.", e);
                    }
                    StreamingUDF.this.outputQueue.put(StreamingUDF.ERROR_OUTPUT);
                }
            }
        }
    }

    public StreamingUDF(String str, String str2, String str3, String str4, String str5, String str6, String str7) throws StreamingUDFOutputSchemaException, ExecException {
        this.language = str;
        this.filePath = str2;
        this.funcName = str3;
        try {
            this.schema = Utils.getSchemaFromString(str4);
            if (str6.equals("local")) {
                this.execType = ExecType.LOCAL;
            } else if (str6.equals(ShuffleHeader.DEFAULT_HTTP_HEADER_NAME)) {
                this.execType = ExecType.MAPREDUCE;
            } else {
                this.execType = ExecType.fromString(str6);
            }
            this.isIllustrate = str7;
        } catch (ParserException e) {
            throw new StreamingUDFOutputSchemaException(e.getMessage(), Integer.valueOf(str5).intValue());
        } catch (IOException e2) {
            log.error("Invalid exectype passed to StreamingUDF. Should be local or mapreduce", e2);
            throw new ExecException("Invalid exectype passed to StreamingUDF. Should be local or mapreduce", e2);
        }
    }

    @Override // org.apache.pig.EvalFunc
    public Object exec(Tuple tuple) throws IOException {
        if (!this.initialized) {
            initialize();
            this.initialized = true;
        }
        return getOutput(tuple);
    }

    private void initialize() throws ExecException, IOException {
        this.inputQueue = new ArrayBlockingQueue(1);
        this.outputQueue = new ArrayBlockingQueue(2);
        this.soc = new ScriptingOutputCapturer(this.execType);
        startUdfController();
        createInputHandlers();
        setStreams();
        startThreads();
    }

    private StreamingCommand startUdfController() throws IOException {
        StreamingCommand streamingCommand = new StreamingCommand(null, constructCommand());
        this.process = StreamingUtil.createProcess(streamingCommand).start();
        Runtime.getRuntime().addShutdownHook(new Thread(new ProcessKiller()));
        return streamingCommand;
    }

    private String[] constructCommand() throws IOException {
        String str;
        String str2;
        String str3;
        String[] strArr = new String[10];
        String str4 = UDFContext.getUDFContext().getJobConf().get("mapred.jar");
        String parent = str4 != null ? new File(str4).getParent() : "";
        String standardOutputRootWriteLocation = this.soc.getStandardOutputRootWriteLocation();
        if (this.execType == ExecType.LOCAL) {
            str = standardOutputRootWriteLocation + this.funcName + "_python.log";
            str2 = standardOutputRootWriteLocation + "cpython_" + this.funcName + "_" + ScriptingOutputCapturer.getRunId() + ".out";
            str3 = standardOutputRootWriteLocation + "cpython_" + this.funcName + "_" + ScriptingOutputCapturer.getRunId() + ".err";
        } else {
            str = standardOutputRootWriteLocation + this.funcName + "_python.log";
            str2 = standardOutputRootWriteLocation + this.funcName + ".out";
            str3 = standardOutputRootWriteLocation + this.funcName + ".err";
        }
        this.soc.registerOutputLocation(this.funcName, str2);
        strArr[0] = this.language;
        strArr[1] = getControllerPath(parent);
        int lastIndexOf = this.filePath.lastIndexOf(File.separator) + 1;
        strArr[2] = this.filePath.substring(lastIndexOf);
        strArr[3] = lastIndexOf <= 0 ? "." : this.filePath.substring(0, lastIndexOf - 1);
        strArr[4] = this.funcName;
        strArr[5] = "\"" + parent + this.filePath.substring(0, lastIndexOf) + "\"";
        strArr[6] = str2;
        strArr[7] = str3;
        strArr[8] = str;
        strArr[9] = this.isIllustrate;
        return strArr;
    }

    private void createInputHandlers() throws ExecException, FrontendException {
        this.inputHandler = new StreamingUDFInputHandler(new PigStreamingUDF());
        this.outputHandler = new StreamingUDFOutputHandler(new PigStreamingUDF(this.schema.getField(0)));
    }

    private void setStreams() throws IOException {
        this.stdout = new DataInputStream(new BufferedInputStream(this.process.getInputStream()));
        this.outputHandler.bindTo("", new BufferedPositionedInputStream(this.stdout), 0L, Long.MAX_VALUE);
        this.stdin = new DataOutputStream(new BufferedOutputStream(this.process.getOutputStream()));
        this.inputHandler.bindTo(this.stdin);
        this.stderr = new DataInputStream(new BufferedInputStream(this.process.getErrorStream()));
    }

    private void startThreads() {
        this.stdinThread = new ProcessInputThread();
        this.stdinThread.start();
        this.stdoutThread = new ProcessOutputThread();
        this.stdoutThread.start();
        this.stderrThread = new ProcessErrorThread();
        this.stderrThread.start();
    }

    private String getControllerPath(String str) throws IOException {
        if (!this.language.toLowerCase().equals("python")) {
            throw new ExecException("Invalid language: " + this.language);
        }
        String str2 = str + PYTHON_CONTROLLER_JAR_PATH;
        if (!new File(str2).exists()) {
            File createTempFile = File.createTempFile("controller", ".py");
            InputStream resourceAsStream = Launcher.class.getResourceAsStream(PYTHON_CONTROLLER_JAR_PATH);
            try {
                FileUtils.copyInputStreamToFile(resourceAsStream, createTempFile);
                resourceAsStream.close();
                createTempFile.deleteOnExit();
                File file = new File(createTempFile.getParent() + "/pig_util.py");
                file.deleteOnExit();
                resourceAsStream = Launcher.class.getResourceAsStream(PYTHON_PIG_UTIL_PATH);
                try {
                    FileUtils.copyInputStreamToFile(resourceAsStream, file);
                    resourceAsStream.close();
                    str2 = createTempFile.getAbsolutePath();
                } finally {
                }
            } finally {
            }
        }
        return str2;
    }

    public static List<String> getResourcesForJar() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(PYTHON_CONTROLLER_JAR_PATH);
        arrayList.add(PYTHON_PIG_UTIL_PATH);
        return arrayList;
    }

    private Object getOutput(Tuple tuple) throws ExecException {
        if (this.outputQueue == null) {
            throw new ExecException("Process has already been shut down.  No way to retrieve output for input: " + tuple);
        }
        if (ScriptingOutputCapturer.isClassCapturingOutput() && !this.soc.isInstanceCapturingOutput()) {
            try {
                this.inputQueue.put(TupleFactory.getInstance().newTuple(TURN_ON_OUTPUT_CAPTURING));
                this.soc.setInstanceCapturingOutput(true);
            } catch (InterruptedException e) {
                throw new ExecException("Failed adding capture input flag to inputQueue");
            }
        }
        try {
            if (getInputSchema() == null || getInputSchema().size() == 0) {
                tuple = TupleFactory.getInstance().newTuple(0);
            }
            this.inputQueue.put(tuple);
            Object obj = null;
            try {
                if (this.outputQueue != null) {
                    obj = this.outputQueue.take();
                    if (obj == NULL_OBJECT) {
                        obj = null;
                    }
                }
                if (obj != ERROR_OUTPUT) {
                    return obj;
                }
                this.outputQueue = null;
                if (this.outerrThreadsError == null) {
                    this.outerrThreadsError = new StreamingUDFException(this.language, "Problem with streaming udf.  Can't recreate exception.");
                }
                throw this.outerrThreadsError;
            } catch (Exception e2) {
                throw new ExecException("Problem getting output", e2);
            }
        } catch (Exception e3) {
            throw new ExecException("Failed adding input to inputQueue", e3);
        }
    }

    @Override // org.apache.pig.EvalFunc
    public Schema outputSchema(Schema schema) {
        return this.schema;
    }
}
