package org.apache.pulsar.functions.worker.rest.api;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Type;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.Base64;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import net.jodah.typetools.TypeResolver;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.utils.FunctionConfig;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.functions.utils.Utils;
import org.apache.pulsar.functions.utils.validation.ConfigValidation;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.request.RequestResult;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.Source;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.com.google.gson.Gson;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.io.netty.buffer.ByteBufUtil;
import org.apache.pulsar.shade.io.netty.buffer.Unpooled;
import org.apache.pulsar.shade.javax.ws.rs.WebApplicationException;
import org.apache.pulsar.shade.javax.ws.rs.core.Response;
import org.apache.pulsar.shade.javax.ws.rs.core.StreamingOutput;
import org.apache.pulsar.shade.org.apache.bookkeeper.api.StorageClient;
import org.apache.pulsar.shade.org.apache.bookkeeper.api.kv.Table;
import org.apache.pulsar.shade.org.apache.bookkeeper.api.kv.result.KeyValue;
import org.apache.pulsar.shade.org.apache.bookkeeper.clients.StorageClientBuilder;
import org.apache.pulsar.shade.org.apache.bookkeeper.clients.config.StorageClientSettings;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.shade.org.apache.commons.io.IOUtils;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.shade.org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.shade.org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.class */
public class FunctionsImpl {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) FunctionsImpl.class);
    private final Supplier<WorkerService> workerServiceSupplier;

    public FunctionsImpl(Supplier<WorkerService> supplier) {
        this.workerServiceSupplier = supplier;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public WorkerService worker() {
        try {
            return (WorkerService) Preconditions.checkNotNull(this.workerServiceSupplier.get());
        } catch (Throwable th) {
            log.info("Failed to get worker service", th);
            throw th;
        }
    }

    private boolean isWorkerServiceAvailable() {
        WorkerService workerService = this.workerServiceSupplier.get();
        return workerService != null && workerService.isInitialized();
    }

    public Response registerFunction(String str, String str2, String str3, InputStream inputStream, FormDataContentDisposition formDataContentDisposition, String str4, String str5, String str6, String str7) {
        if (!isWorkerServiceAvailable()) {
            return getUnavailableResponse();
        }
        try {
            if (!isAuthorizedRole(str, str7)) {
                log.error("{}/{}/{} Client [{}] is not admin and authorized to register function", str, str2, str3, str7);
                return Response.status(Response.Status.UNAUTHORIZED).type("application/json").entity(new ErrorData("client is not authorize to perform operation")).build();
            }
            if (worker().getFunctionMetaDataManager().containsFunction(str, str2, str3)) {
                log.error("Function {}/{}/{} already exists", str, str2, str3);
                return Response.status(Response.Status.BAD_REQUEST).type("application/json").entity(new ErrorData(String.format("Function %s already exists", str3))).build();
            }
            boolean isNotBlank = StringUtils.isNotBlank(str4);
            File file = null;
            if (inputStream != null) {
                file = dumpToTmpFile(inputStream);
            }
            try {
                Function.FunctionDetails validateUpdateRequestParamsWithPkgUrl = isNotBlank ? validateUpdateRequestParamsWithPkgUrl(str, str2, str3, str4, str5, str6) : validateUpdateRequestParams(str, str2, str3, file, formDataContentDisposition, str5, str6);
                try {
                    worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(validateUpdateRequestParamsWithPkgUrl);
                    Function.FunctionMetaData.Builder version = Function.FunctionMetaData.newBuilder().setFunctionDetails(validateUpdateRequestParamsWithPkgUrl).setCreateTime(System.currentTimeMillis()).setVersion(0L);
                    Function.PackageLocationMetaData.Builder newBuilder = Function.PackageLocationMetaData.newBuilder();
                    boolean isFunctionCodeBuiltin = isFunctionCodeBuiltin(validateUpdateRequestParamsWithPkgUrl);
                    if (isFunctionCodeBuiltin) {
                        newBuilder.setPackagePath("builtin://" + getFunctionCodeBuiltin(validateUpdateRequestParamsWithPkgUrl));
                    } else {
                        newBuilder.setPackagePath(isNotBlank ? str4 : createPackagePath(str, str2, str3, formDataContentDisposition.getFileName()));
                        if (!isNotBlank) {
                            newBuilder.setOriginalFileName(formDataContentDisposition.getFileName());
                        }
                    }
                    version.setPackageLocation(newBuilder);
                    return (isNotBlank || isFunctionCodeBuiltin) ? updateRequest(version.build()) : updateRequest(version.build(), file);
                } catch (Exception e) {
                    log.error("Function {}/{}/{} cannot be admitted by the runtime factory", str, str2, str3);
                    return Response.status(Response.Status.BAD_REQUEST).type("application/json").entity(new ErrorData(String.format("Function %s cannot be admitted:- %s", str3, e.getMessage()))).build();
                }
            } catch (Exception e2) {
                log.error("Invalid register function request @ /{}/{}/{}", str, str2, str3, e2);
                return Response.status(Response.Status.BAD_REQUEST).type("application/json").entity(new ErrorData(e2.getMessage())).build();
            }
        } catch (PulsarAdminException e3) {
            log.error("{}/{}/{} Failed to authorize [{}]", str, str2, str3, e3);
            return Response.status(Response.Status.INTERNAL_SERVER_ERROR).type("application/json").entity(new ErrorData(e3.getMessage())).build();
        }
    }

    public Response updateFunction(String str, String str2, String str3, InputStream inputStream, FormDataContentDisposition formDataContentDisposition, String str4, String str5, String str6, String str7) {
        if (!isWorkerServiceAvailable()) {
            return getUnavailableResponse();
        }
        try {
            if (!isAuthorizedRole(str, str7)) {
                log.error("{}/{}/{} Client [{}] is not admin and authorized to update function", str, str2, str3, str7);
                return Response.status(Response.Status.UNAUTHORIZED).type("application/json").entity(new ErrorData("client is not authorize to perform operation")).build();
            }
            if (!worker().getFunctionMetaDataManager().containsFunction(str, str2, str3)) {
                return Response.status(Response.Status.BAD_REQUEST).type("application/json").entity(new ErrorData(String.format("Function %s doesn't exist", str3))).build();
            }
            boolean isNotBlank = StringUtils.isNotBlank(str4);
            File file = null;
            if (inputStream != null) {
                file = dumpToTmpFile(inputStream);
            }
            try {
                Function.FunctionDetails validateUpdateRequestParamsWithPkgUrl = isNotBlank ? validateUpdateRequestParamsWithPkgUrl(str, str2, str3, str4, str5, str6) : validateUpdateRequestParams(str, str2, str3, file, formDataContentDisposition, str5, str6);
                try {
                    worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(validateUpdateRequestParamsWithPkgUrl);
                    Function.FunctionMetaData.Builder version = Function.FunctionMetaData.newBuilder().setFunctionDetails(validateUpdateRequestParamsWithPkgUrl).setCreateTime(System.currentTimeMillis()).setVersion(0L);
                    Function.PackageLocationMetaData.Builder newBuilder = Function.PackageLocationMetaData.newBuilder();
                    boolean isFunctionCodeBuiltin = isFunctionCodeBuiltin(validateUpdateRequestParamsWithPkgUrl);
                    if (isFunctionCodeBuiltin) {
                        newBuilder.setPackagePath("builtin://" + getFunctionCodeBuiltin(validateUpdateRequestParamsWithPkgUrl));
                    } else {
                        newBuilder.setPackagePath(isNotBlank ? str4 : createPackagePath(str, str2, str3, formDataContentDisposition.getFileName()));
                        if (!isNotBlank) {
                            newBuilder.setOriginalFileName(formDataContentDisposition.getFileName());
                        }
                    }
                    version.setPackageLocation(newBuilder);
                    return (isNotBlank || isFunctionCodeBuiltin) ? updateRequest(version.build()) : updateRequest(version.build(), file);
                } catch (Exception e) {
                    log.error("Updated Function {}/{}/{} cannot be submitted to runtime factory", str, str2, str3);
                    return Response.status(Response.Status.BAD_REQUEST).type("application/json").entity(new ErrorData(String.format("Function %s cannot be admitted:- %s", str3, e.getMessage()))).build();
                }
            } catch (Exception e2) {
                log.error("Invalid register function request @ /{}/{}/{}", str, str2, str3, e2);
                return Response.status(Response.Status.BAD_REQUEST).type("application/json").entity(new ErrorData(e2.getMessage())).build();
            }
        } catch (PulsarAdminException e3) {
            log.error("{}/{}/{} Failed to authorize [{}]", str, str2, str3, e3);
            return Response.status(Response.Status.INTERNAL_SERVER_ERROR).type("application/json").entity(new ErrorData(e3.getMessage())).build();
        }
    }

    public Response deregisterFunction(String str, String str2, String str3, String str4) {
        if (!isWorkerServiceAvailable()) {
            return getUnavailableResponse();
        }
        try {
            if (!isAuthorizedRole(str, str4)) {
                log.error("{}/{}/{} Client [{}] is not admin and authorized to deregister function", str, str2, str3, str4);
                return Response.status(Response.Status.UNAUTHORIZED).type("application/json").entity(new ErrorData("client is not authorize to perform operation")).build();
            }
            try {
                validateDeregisterRequestParams(str, str2, str3);
                FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
                if (!functionMetaDataManager.containsFunction(str, str2, str3)) {
                    log.error("Function to deregister does not exist @ /{}/{}/{}", str, str2, str3);
                    return Response.status(Response.Status.NOT_FOUND).type("application/json").entity(new ErrorData(String.format("Function %s doesn't exist", str3))).build();
                }
                try {
                    RequestResult requestResult = functionMetaDataManager.deregisterFunction(str, str2, str3).get();
                    return !requestResult.isSuccess() ? Response.status(Response.Status.BAD_REQUEST).type("application/json").entity(new ErrorData(requestResult.getMessage())).build() : Response.status(Response.Status.OK).entity(requestResult.toJson()).build();
                } catch (InterruptedException e) {
                    log.error("Interrupted Exception while deregistering function @ /{}/{}/{}", str, str2, str3, e);
                    return Response.status(Response.Status.REQUEST_TIMEOUT).type("application/json").build();
                } catch (ExecutionException e2) {
                    log.error("Execution Exception while deregistering function @ /{}/{}/{}", str, str2, str3, e2);
                    return Response.serverError().type("application/json").entity(new ErrorData(e2.getCause().getMessage())).build();
                }
            } catch (IllegalArgumentException e3) {
                log.error("Invalid deregister function request @ /{}/{}/{}", str, str2, str3, e3);
                return Response.status(Response.Status.BAD_REQUEST).type("application/json").entity(new ErrorData(e3.getMessage())).build();
            }
        } catch (PulsarAdminException e4) {
            log.error("{}/{}/{} Failed to authorize [{}]", str, str2, str3, e4);
            return Response.status(Response.Status.INTERNAL_SERVER_ERROR).type("application/json").entity(new ErrorData(e4.getMessage())).build();
        }
    }

    public Response getFunctionInfo(String str, String str2, String str3) throws IOException {
        if (!isWorkerServiceAvailable()) {
            return getUnavailableResponse();
        }
        try {
            validateGetFunctionRequestParams(str, str2, str3);
            FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
            if (functionMetaDataManager.containsFunction(str, str2, str3)) {
                return Response.status(Response.Status.OK).entity(Utils.printJson(functionMetaDataManager.getFunctionMetaData(str, str2, str3).getFunctionDetails())).build();
            }
            log.error("Function in getFunction does not exist @ /{}/{}/{}", str, str2, str3);
            return Response.status(Response.Status.NOT_FOUND).type("application/json").entity(new ErrorData(String.format("Function %s doesn't exist", str3))).build();
        } catch (IllegalArgumentException e) {
            log.error("Invalid getFunction request @ /{}/{}/{}", str, str2, str3, e);
            return Response.status(Response.Status.BAD_REQUEST).type("application/json").entity(new ErrorData(e.getMessage())).build();
        }
    }

    public Response getFunctionInstanceStatus(String str, String str2, String str3, String str4, URI uri) throws IOException {
        if (!isWorkerServiceAvailable()) {
            return getUnavailableResponse();
        }
        try {
            validateGetFunctionInstanceRequestParams(str, str2, str3, str4);
            FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
            if (!functionMetaDataManager.containsFunction(str, str2, str3)) {
                log.error("Function in getFunctionStatus does not exist @ /{}/{}/{}", str, str2, str3);
                return Response.status(Response.Status.NOT_FOUND).type("application/json").entity(new ErrorData(String.format("Function %s doesn't exist", str3))).build();
            }
            Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(str, str2, str3);
            int parseInt = Integer.parseInt(str4);
            if (parseInt < 0 || parseInt >= functionMetaData.getFunctionDetails().getParallelism()) {
                log.error("instanceId in getFunctionStatus out of bounds @ /{}/{}/{}", str, str2, str3);
                return Response.status(Response.Status.BAD_REQUEST).type("application/json").entity(new ErrorData(String.format("Invalid InstanceId", new Object[0]))).build();
            }
            try {
                return Response.status(Response.Status.OK).entity(Utils.printJson(worker().getFunctionRuntimeManager().getFunctionInstanceStatus(str, str2, str3, Integer.parseInt(str4), uri))).build();
            } catch (WebApplicationException e) {
                throw e;
            } catch (Exception e2) {
                log.error("{}/{}/{} Got Exception Getting Status", str, str2, str3, e2);
                return Response.status(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), e2.getMessage()).build();
            }
        } catch (IllegalArgumentException e3) {
            log.error("Invalid getFunctionStatus request @ /{}/{}/{}", str, str2, str3, e3);
            return Response.status(Response.Status.BAD_REQUEST).type("application/json").entity(new ErrorData(e3.getMessage())).build();
        }
    }

    public Response stopFunctionInstance(String str, String str2, String str3, String str4, URI uri) {
        return stopFunctionInstance(str, str2, str3, str4, false, uri);
    }

    public Response restartFunctionInstance(String str, String str2, String str3, String str4, URI uri) {
        return stopFunctionInstance(str, str2, str3, str4, true, uri);
    }

    public Response stopFunctionInstance(String str, String str2, String str3, String str4, boolean z, URI uri) {
        if (!isWorkerServiceAvailable()) {
            return getUnavailableResponse();
        }
        try {
            validateGetFunctionInstanceRequestParams(str, str2, str3, str4);
            if (!worker().getFunctionMetaDataManager().containsFunction(str, str2, str3)) {
                log.error("Function does not exist @ /{}/{}/{}", str, str2, str3);
                return Response.status(Response.Status.NOT_FOUND).type("application/json").entity(new ErrorData(String.format("Function %s doesn't exist", str3))).build();
            }
            try {
                return worker().getFunctionRuntimeManager().stopFunctionInstance(str, str2, str3, Integer.parseInt(str4), z, uri);
            } catch (WebApplicationException e) {
                throw e;
            } catch (Exception e2) {
                log.error("Failed to restart function: {}/{}/{}/{}", str, str2, str3, str4, e2);
                return Response.status(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), e2.getMessage()).build();
            }
        } catch (IllegalArgumentException e3) {
            log.error("Invalid restart-function request @ /{}/{}/{}", str, str2, str3, e3);
            return Response.status(Response.Status.BAD_REQUEST).type("application/json").entity(new ErrorData(e3.getMessage())).build();
        }
    }

    public Response stopFunctionInstances(String str, String str2, String str3) {
        return stopFunctionInstances(str, str2, str3, false);
    }

    public Response restartFunctionInstances(String str, String str2, String str3) {
        return stopFunctionInstances(str, str2, str3, true);
    }

    public Response stopFunctionInstances(String str, String str2, String str3, boolean z) {
        if (!isWorkerServiceAvailable()) {
            return getUnavailableResponse();
        }
        try {
            validateGetFunctionRequestParams(str, str2, str3);
            if (!worker().getFunctionMetaDataManager().containsFunction(str, str2, str3)) {
                log.error("Function in getFunctionStatus does not exist @ /{}/{}/{}", str, str2, str3);
                return Response.status(Response.Status.NOT_FOUND).type("application/json").entity(new ErrorData(String.format("Function %s doesn't exist", str3))).build();
            }
            try {
                return worker().getFunctionRuntimeManager().stopFunctionInstances(str, str2, str3, z);
            } catch (WebApplicationException e) {
                throw e;
            } catch (Exception e2) {
                log.error("Failed to restart function: {}/{}/{}", str, str2, str3, e2);
                return Response.status(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), e2.getMessage()).build();
            }
        } catch (IllegalArgumentException e3) {
            log.error("Invalid restart-Function request @ /{}/{}/{}", str, str2, str3, e3);
            return Response.status(Response.Status.BAD_REQUEST).type("application/json").entity(new ErrorData(e3.getMessage())).build();
        }
    }

    public Response getFunctionStatus(String str, String str2, String str3, URI uri) throws IOException {
        if (!isWorkerServiceAvailable()) {
            return getUnavailableResponse();
        }
        try {
            validateGetFunctionRequestParams(str, str2, str3);
            if (!worker().getFunctionMetaDataManager().containsFunction(str, str2, str3)) {
                log.error("Function in getFunctionStatus does not exist @ /{}/{}/{}", str, str2, str3);
                return Response.status(Response.Status.NOT_FOUND).type("application/json").entity(new ErrorData(String.format("Function %s doesn't exist", str3))).build();
            }
            try {
                return Response.status(Response.Status.OK).entity(Utils.printJson(worker().getFunctionRuntimeManager().getAllFunctionStatus(str, str2, str3, uri))).build();
            } catch (WebApplicationException e) {
                throw e;
            } catch (Exception e2) {
                log.error("Got Exception Getting Status", (Throwable) e2);
                InstanceCommunication.FunctionStatus.Builder newBuilder = InstanceCommunication.FunctionStatus.newBuilder();
                newBuilder.setRunning(false);
                return Response.status(Response.Status.OK).entity(Utils.printJson(newBuilder.build())).build();
            }
        } catch (IllegalArgumentException e3) {
            log.error("Invalid getFunctionStatus request @ /{}/{}/{}", str, str2, str3, e3);
            return Response.status(Response.Status.BAD_REQUEST).type("application/json").entity(new ErrorData(e3.getMessage())).build();
        }
    }

    public Response listFunctions(String str, String str2) {
        if (!isWorkerServiceAvailable()) {
            return getUnavailableResponse();
        }
        try {
            validateListFunctionRequestParams(str, str2);
            return Response.status(Response.Status.OK).entity(new Gson().toJson(worker().getFunctionMetaDataManager().listFunctions(str, str2).toArray())).build();
        } catch (IllegalArgumentException e) {
            log.error("Invalid listFunctions request @ /{}/{}", str, str2, e);
            return Response.status(Response.Status.BAD_REQUEST).type("application/json").entity(new ErrorData(e.getMessage())).build();
        }
    }

    private Response updateRequest(Function.FunctionMetaData functionMetaData, File file) {
        try {
            log.info("Uploading function package to {}", functionMetaData.getPackageLocation());
            org.apache.pulsar.functions.worker.Utils.uploadToBookeeper(worker().getDlogNamespace(), new FileInputStream(file), functionMetaData.getPackageLocation().getPackagePath());
            return updateRequest(functionMetaData);
        } catch (IOException e) {
            log.error("Error uploading file {}", functionMetaData.getPackageLocation(), e);
            return Response.serverError().type("application/json").entity(new ErrorData(e.getMessage())).build();
        }
    }

    private Response updateRequest(Function.FunctionMetaData functionMetaData) {
        try {
            RequestResult requestResult = worker().getFunctionMetaDataManager().updateFunction(functionMetaData).get();
            return !requestResult.isSuccess() ? Response.status(Response.Status.BAD_REQUEST).type("application/json").entity(new ErrorData(requestResult.getMessage())).build() : Response.status(Response.Status.OK).build();
        } catch (InterruptedException e) {
            return Response.status(Response.Status.REQUEST_TIMEOUT).type("application/json").entity(new ErrorData(e.getCause().getMessage())).build();
        } catch (ExecutionException e2) {
            return Response.serverError().type("application/json").entity(new ErrorData(e2.getCause().getMessage())).build();
        }
    }

    public List<ConnectorDefinition> getListOfConnectors() {
        if (isWorkerServiceAvailable()) {
            return worker().getConnectorsManager().getConnectors();
        }
        throw new WebApplicationException(Response.status(Response.Status.SERVICE_UNAVAILABLE).type("application/json").entity(new ErrorData("Function worker service is not avaialable")).build());
    }

    public Response triggerFunction(String str, String str2, String str3, String str4, InputStream inputStream, String str5) {
        String next;
        byte[] bytes;
        Message<byte[]> readNext;
        if (!isWorkerServiceAvailable()) {
            return getUnavailableResponse();
        }
        try {
            validateTriggerRequestParams(str, str2, str3, str5, str4, inputStream);
            FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
            if (!functionMetaDataManager.containsFunction(str, str2, str3)) {
                log.error("Function in trigger function does not exist @ /{}/{}/{}", str, str2, str3);
                return Response.status(Response.Status.NOT_FOUND).type("application/json").entity(new ErrorData(String.format("Function %s doesn't exist", str3))).build();
            }
            Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(str, str2, str3);
            if (str5 != null) {
                next = str5;
            } else {
                if (functionMetaData.getFunctionDetails().getSource().getInputSpecsCount() != 1) {
                    log.error("Function in trigger function has more than 1 input topics @ /{}/{}/{}", str, str2, str3);
                    return Response.status(Response.Status.BAD_REQUEST).build();
                }
                next = functionMetaData.getFunctionDetails().getSource().getInputSpecsMap().keySet().iterator().next();
            }
            if (functionMetaData.getFunctionDetails().getSource().getInputSpecsCount() == 0 || !functionMetaData.getFunctionDetails().getSource().getInputSpecsMap().containsKey(next)) {
                log.error("Function in trigger function has unidentified topic @ /{}/{}/{} {}", str, str2, str3, next);
                return Response.status(Response.Status.BAD_REQUEST).build();
            }
            String topic = functionMetaData.getFunctionDetails().getSink().getTopic();
            Reader<byte[]> reader = null;
            Producer producer = null;
            try {
                if (topic != null) {
                    try {
                        if (!topic.isEmpty()) {
                            reader = worker().getClient().newReader().topic(topic).startMessageId(MessageId.latest).create();
                        }
                    } catch (Exception e) {
                        Response build = Response.status(Response.Status.INTERNAL_SERVER_ERROR).build();
                        if (0 != 0) {
                            reader.closeAsync();
                        }
                        if (0 != 0) {
                            producer.closeAsync();
                        }
                        return build;
                    }
                }
                Producer create = worker().getClient().newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(next).create();
                if (inputStream != null) {
                    bytes = new byte[inputStream.available()];
                    inputStream.read(bytes);
                } else {
                    bytes = str4.getBytes();
                }
                MessageId send = create.send((Producer) bytes);
                if (reader == null) {
                    Response build2 = Response.status(Response.Status.OK).build();
                    if (reader != null) {
                        reader.closeAsync();
                    }
                    if (create != null) {
                        create.closeAsync();
                    }
                    return build2;
                }
                long currentTimeMillis = System.currentTimeMillis();
                long j = currentTimeMillis + 1000;
                while (currentTimeMillis < j && (readNext = reader.readNext(10000, TimeUnit.MILLISECONDS)) != null) {
                    if (readNext.getProperties().containsKey("__pfn_input_msg_id__") && readNext.getProperties().containsKey("__pfn_input_topic__") && send.equals(MessageId.fromByteArray(Base64.getDecoder().decode(readNext.getProperties().get("__pfn_input_msg_id__")))) && readNext.getProperties().get("__pfn_input_topic__").equals(next)) {
                        Response build3 = Response.status(Response.Status.OK).entity(readNext.getData()).build();
                        if (reader != null) {
                            reader.closeAsync();
                        }
                        if (create != null) {
                            create.closeAsync();
                        }
                        return build3;
                    }
                    currentTimeMillis = System.currentTimeMillis();
                }
                Response build4 = Response.status(Response.Status.REQUEST_TIMEOUT).build();
                if (reader != null) {
                    reader.closeAsync();
                }
                if (create != null) {
                    create.closeAsync();
                }
                return build4;
            } catch (Throwable th) {
                if (0 != 0) {
                    reader.closeAsync();
                }
                if (0 != 0) {
                    producer.closeAsync();
                }
                throw th;
            }
        } catch (IllegalArgumentException e2) {
            log.error("Invalid trigger function request @ /{}/{}/{}", str, str2, str3, e2);
            return Response.status(Response.Status.BAD_REQUEST).type("application/json").entity(new ErrorData(e2.getMessage())).build();
        }
    }

    /* JADX WARN: Failed to calculate best type for var: r17v1 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r17v1 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Failed to calculate best type for var: r18v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r18v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Failed to calculate best type for var: r19v1 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r19v1 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Failed to calculate best type for var: r20v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r20v0 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.RegisterArg.getSVar()" because the return value of "jadx.core.dex.nodes.InsnNode.getResult()" is null
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.collectRelatedVars(AbstractTypeConstraint.java:31)
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.<init>(AbstractTypeConstraint.java:19)
    	at jadx.core.dex.visitors.typeinference.TypeSearch$1.<init>(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeMoveConstraint(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeConstraint(TypeSearch.java:361)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.collectConstraints(TypeSearch.java:341)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.run(TypeSearch.java:60)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.runMultiVariableSearch(FixTypesVisitor.java:116)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Not initialized variable reg: 17, insn: 0x02ce: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r17 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:118:0x02ce */
    /* JADX WARN: Not initialized variable reg: 18, insn: 0x02d3: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r18 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:120:0x02d3 */
    /* JADX WARN: Not initialized variable reg: 19, insn: 0x0299: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r19 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:92:0x0299 */
    /* JADX WARN: Not initialized variable reg: 20, insn: 0x029e: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r20 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:94:0x029e */
    /* JADX WARN: Type inference failed for: r17v1, types: [org.apache.pulsar.shade.org.apache.bookkeeper.api.kv.Table] */
    /* JADX WARN: Type inference failed for: r18v0, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r19v1, types: [org.apache.pulsar.shade.org.apache.bookkeeper.api.kv.result.KeyValue] */
    /* JADX WARN: Type inference failed for: r20v0, types: [java.lang.Throwable] */
    public Response getFunctionState(String str, String str2, String str3, String str4) {
        ?? r17;
        ?? r18;
        ?? r19;
        ?? r20;
        if (!isWorkerServiceAvailable()) {
            return getUnavailableResponse();
        }
        try {
            validateGetFunctionStateParams(str, str2, str3, str4);
            StorageClient build = StorageClientBuilder.newBuilder().withSettings(StorageClientSettings.newBuilder().serviceUri(worker().getWorkerConfig().getStateStorageServiceUrl()).clientName("functions-admin").build()).withNamespace(String.format("%s_%s", str, str2).replace('-', '_')).build();
            Throwable th = null;
            try {
                try {
                    try {
                        Table table = (Table) FutureUtils.result(build.openTable(str3));
                        Throwable th2 = null;
                        try {
                            KeyValue keyValue = (KeyValue) FutureUtils.result(table.getKv(Unpooled.wrappedBuffer(str4.getBytes(StandardCharsets.UTF_8))));
                            Throwable th3 = null;
                            if (null == keyValue) {
                                Response build2 = Response.status(Response.Status.NOT_FOUND).entity(new String("key '" + str4 + "' doesn't exist.")).build();
                                if (keyValue != null) {
                                    if (0 != 0) {
                                        try {
                                            keyValue.close();
                                        } catch (Throwable th4) {
                                            th3.addSuppressed(th4);
                                        }
                                    } else {
                                        keyValue.close();
                                    }
                                }
                                if (table != null) {
                                    if (0 != 0) {
                                        try {
                                            table.close();
                                        } catch (Throwable th5) {
                                            th2.addSuppressed(th5);
                                        }
                                    } else {
                                        table.close();
                                    }
                                }
                                return build2;
                            }
                            Response build3 = Response.status(Response.Status.OK).entity(new String(keyValue.isNumber() ? "value : " + keyValue.numberValue() + ", version : " + keyValue.version() : "value : " + new String(ByteBufUtil.getBytes((ByteBuf) keyValue.value()), StandardCharsets.UTF_8) + ", version : " + keyValue.version())).build();
                            if (keyValue != null) {
                                if (0 != 0) {
                                    try {
                                        keyValue.close();
                                    } catch (Throwable th6) {
                                        th3.addSuppressed(th6);
                                    }
                                } else {
                                    keyValue.close();
                                }
                            }
                            if (table != null) {
                                if (0 != 0) {
                                    try {
                                        table.close();
                                    } catch (Throwable th7) {
                                        th2.addSuppressed(th7);
                                    }
                                } else {
                                    table.close();
                                }
                            }
                            if (build != null) {
                                if (0 != 0) {
                                    try {
                                        build.close();
                                    } catch (Throwable th8) {
                                        th.addSuppressed(th8);
                                    }
                                } else {
                                    build.close();
                                }
                            }
                            return build3;
                        } catch (Throwable th9) {
                            if (r19 != 0) {
                                if (r20 != 0) {
                                    try {
                                        r19.close();
                                    } catch (Throwable th10) {
                                        r20.addSuppressed(th10);
                                    }
                                } else {
                                    r19.close();
                                }
                            }
                            throw th9;
                        }
                    } catch (Exception e) {
                        log.error("Error while getFunctionState request @ /{}/{}/{}/{}", str, str2, str3, str4, e);
                        Response build4 = Response.status(Response.Status.INTERNAL_SERVER_ERROR).type("application/json").entity(new ErrorData(e.getMessage())).build();
                        if (build != null) {
                            if (0 != 0) {
                                try {
                                    build.close();
                                } catch (Throwable th11) {
                                    th.addSuppressed(th11);
                                }
                            } else {
                                build.close();
                            }
                        }
                        return build4;
                    }
                } catch (Throwable th12) {
                    if (r17 != 0) {
                        if (r18 != 0) {
                            try {
                                r17.close();
                            } catch (Throwable th13) {
                                r18.addSuppressed(th13);
                            }
                        } else {
                            r17.close();
                        }
                    }
                    throw th12;
                }
            } finally {
                if (build != null) {
                    if (0 != 0) {
                        try {
                            build.close();
                        } catch (Throwable th14) {
                            th.addSuppressed(th14);
                        }
                    } else {
                        build.close();
                    }
                }
            }
        } catch (IllegalArgumentException e2) {
            log.error("Invalid getFunctionState request @ /{}/{}/{}/{}", str, str2, str3, str4, e2);
            return Response.status(Response.Status.BAD_REQUEST).type("application/json").entity(new ErrorData(e2.getMessage())).build();
        }
    }

    public Response uploadFunction(InputStream inputStream, String str) {
        if (inputStream == null || str == null) {
            try {
                throw new IllegalArgumentException("Function Package is not provided " + str);
            } catch (IllegalArgumentException e) {
                log.error("Invalid upload function request @ /{}", str, e);
                return Response.status(Response.Status.BAD_REQUEST).type("application/json").entity(new ErrorData(e.getMessage())).build();
            }
        }
        try {
            log.info("Uploading function package to {}", str);
            org.apache.pulsar.functions.worker.Utils.uploadToBookeeper(worker().getDlogNamespace(), inputStream, str);
            return Response.status(Response.Status.OK).build();
        } catch (IOException e2) {
            log.error("Error uploading file {}", str, e2);
            return Response.serverError().type("application/json").entity(new ErrorData(e2.getMessage())).build();
        }
    }

    public Response downloadFunction(final String str) {
        return Response.status(Response.Status.OK).entity(new StreamingOutput() { // from class: org.apache.pulsar.functions.worker.rest.api.FunctionsImpl.1
            @Override // org.apache.pulsar.shade.javax.ws.rs.core.StreamingOutput
            public void write(OutputStream outputStream) throws IOException {
                if (str.startsWith(Utils.HTTP)) {
                    IOUtils.copy(new URL(str).openStream(), outputStream);
                } else if (!str.startsWith(Utils.FILE)) {
                    org.apache.pulsar.functions.worker.Utils.downloadFromBookkeeper(FunctionsImpl.this.worker().getDlogNamespace(), outputStream, str);
                } else {
                    try {
                        IOUtils.copy(new FileInputStream(new File(new URL(str).toURI())), outputStream);
                    } catch (URISyntaxException e) {
                        throw new IllegalArgumentException("invalid file url path: " + str);
                    }
                }
            }
        }).build();
    }

    private void validateListFunctionRequestParams(String str, String str2) throws IllegalArgumentException {
        if (str == null) {
            throw new IllegalArgumentException("Tenant is not provided");
        }
        if (str2 == null) {
            throw new IllegalArgumentException("Namespace is not provided");
        }
    }

    private void validateGetFunctionInstanceRequestParams(String str, String str2, String str3, String str4) throws IllegalArgumentException {
        validateGetFunctionRequestParams(str, str2, str3);
        if (str4 == null) {
            throw new IllegalArgumentException("Function Instance Id is not provided");
        }
    }

    private void validateGetFunctionRequestParams(String str, String str2, String str3) throws IllegalArgumentException {
        if (str == null) {
            throw new IllegalArgumentException("Tenant is not provided");
        }
        if (str2 == null) {
            throw new IllegalArgumentException("Namespace is not provided");
        }
        if (str3 == null) {
            throw new IllegalArgumentException("Function Name is not provided");
        }
    }

    private void validateDeregisterRequestParams(String str, String str2, String str3) throws IllegalArgumentException {
        if (str == null) {
            throw new IllegalArgumentException("Tenant is not provided");
        }
        if (str2 == null) {
            throw new IllegalArgumentException("Namespace is not provided");
        }
        if (str3 == null) {
            throw new IllegalArgumentException("Function Name is not provided");
        }
    }

    private Function.FunctionDetails validateUpdateRequestParamsWithPkgUrl(String str, String str2, String str3, String str4, String str5, String str6) throws IllegalArgumentException, IOException, URISyntaxException {
        if (Utils.isFunctionPackageUrlSupported(str4)) {
            return validateUpdateRequestParams(str, str2, str3, str5, str6, str4, (File) null);
        }
        throw new IllegalArgumentException("Function Package url is not valid. supported url (http/https/file)");
    }

    private Function.FunctionDetails validateUpdateRequestParams(String str, String str2, String str3, File file, FormDataContentDisposition formDataContentDisposition, String str4, String str5) throws IllegalArgumentException, IOException, URISyntaxException {
        Function.FunctionDetails validateUpdateRequestParams = validateUpdateRequestParams(str, str2, str3, str4, str5, (String) null, file);
        if (isFunctionCodeBuiltin(validateUpdateRequestParams) || !(file == null || formDataContentDisposition == null)) {
            return validateUpdateRequestParams;
        }
        throw new IllegalArgumentException("Function Package is not provided");
    }

    private static File dumpToTmpFile(InputStream inputStream) {
        try {
            File createTempFile = File.createTempFile("functions", null);
            createTempFile.deleteOnExit();
            Files.copy(inputStream, createTempFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
            return createTempFile;
        } catch (IOException e) {
            throw new RuntimeException("Cannot create a temporary file", e);
        }
    }

    private void validateGetFunctionStateParams(String str, String str2, String str3, String str4) throws IllegalArgumentException {
        if (str == null) {
            throw new IllegalArgumentException("Tenant is not provided");
        }
        if (str2 == null) {
            throw new IllegalArgumentException("Namespace is not provided");
        }
        if (str3 == null) {
            throw new IllegalArgumentException("Function Name is not provided");
        }
        if (str4 == null) {
            throw new IllegalArgumentException("Key is not provided");
        }
    }

    private boolean isFunctionCodeBuiltin(Function.FunctionDetails functionDetails) {
        if (!functionDetails.hasSource() || StringUtils.isEmpty(functionDetails.getSource().getBuiltin())) {
            return functionDetails.hasSink() && !StringUtils.isEmpty(functionDetails.getSink().getBuiltin());
        }
        return true;
    }

    private String getFunctionCodeBuiltin(Function.FunctionDetails functionDetails) {
        if (functionDetails.hasSource()) {
            Function.SourceSpec source = functionDetails.getSource();
            if (!StringUtils.isEmpty(source.getBuiltin())) {
                return source.getBuiltin();
            }
        }
        if (!functionDetails.hasSink()) {
            return null;
        }
        Function.SinkSpec sink = functionDetails.getSink();
        if (StringUtils.isEmpty(sink.getBuiltin())) {
            return null;
        }
        return sink.getBuiltin();
    }

    private Function.FunctionDetails validateUpdateRequestParams(String str, String str2, String str3, String str4, String str5, String str6, File file) throws IOException, URISyntaxException {
        if (str == null) {
            throw new IllegalArgumentException("Tenant is not provided");
        }
        if (str2 == null) {
            throw new IllegalArgumentException("Namespace is not provided");
        }
        if (str3 == null) {
            throw new IllegalArgumentException("Function Name is not provided");
        }
        if (StringUtils.isEmpty(str4) && StringUtils.isEmpty(str5)) {
            throw new IllegalArgumentException("FunctionConfig is not provided");
        }
        if (!StringUtils.isEmpty(str4) && !StringUtils.isEmpty(str5)) {
            throw new IllegalArgumentException("Only one of FunctionDetails or FunctionConfig should be provided");
        }
        if (!StringUtils.isEmpty(str5)) {
            FunctionConfig functionConfig = (FunctionConfig) new Gson().fromJson(str5, FunctionConfig.class);
            ClassLoader classLoader = null;
            if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
                classLoader = extractClassLoader(str6, file);
            }
            if (functionConfig.getRuntime() == null) {
                throw new IllegalArgumentException("Function Runtime no specified");
            }
            ConfigValidation.validateConfig(functionConfig, functionConfig.getRuntime().name(), classLoader);
            return FunctionConfigUtils.convert(functionConfig, classLoader);
        }
        Function.FunctionDetails.Builder newBuilder = Function.FunctionDetails.newBuilder();
        Utils.mergeJson(str4, newBuilder);
        if (StringUtils.isNotBlank(str6)) {
            newBuilder.setPackageUrl(str6);
        }
        ClassLoader classLoader2 = null;
        if (newBuilder.getRuntime() == Function.FunctionDetails.Runtime.JAVA) {
            classLoader2 = extractClassLoader(str6, file);
        }
        validateFunctionClassTypes(classLoader2, newBuilder);
        Function.FunctionDetails build = newBuilder.build();
        LinkedList linkedList = new LinkedList();
        if (build.getTenant() == null || build.getTenant().isEmpty()) {
            linkedList.add("Tenant");
        }
        if (build.getNamespace() == null || build.getNamespace().isEmpty()) {
            linkedList.add("Namespace");
        }
        if (build.getName() == null || build.getName().isEmpty()) {
            linkedList.add("Name");
        }
        if (build.getClassName() == null || build.getClassName().isEmpty()) {
            linkedList.add("ClassName");
        }
        if (!build.getSource().isInitialized()) {
            linkedList.add("Source");
        }
        if (!build.getSink().isInitialized()) {
            linkedList.add("Sink");
        }
        if (!linkedList.isEmpty()) {
            throw new IllegalArgumentException(StringUtils.join(linkedList, ",") + " is not provided");
        }
        if (build.getParallelism() <= 0) {
            throw new IllegalArgumentException("Parallelism needs to be set to a positive number");
        }
        return build;
    }

    private ClassLoader extractClassLoader(String str, File file) throws URISyntaxException, IOException {
        if (StringUtils.isNotBlank(str)) {
            return org.apache.pulsar.functions.worker.Utils.validateFileUrl(str, this.workerServiceSupplier.get().getWorkerConfig().getDownloadDirectory());
        }
        if (file == null) {
            return null;
        }
        try {
            return Reflections.loadJar(file);
        } catch (MalformedURLException e) {
            throw new IllegalArgumentException("Corrupted Jar File", e);
        }
    }

    private void validateFunctionClassTypes(ClassLoader classLoader, Function.FunctionDetails.Builder builder) {
        if (classLoader == null) {
            return;
        }
        if (StringUtils.isBlank(builder.getClassName())) {
            throw new IllegalArgumentException("function class-name can't be empty");
        }
        Object createInstance = Reflections.createInstance(builder.getClassName(), classLoader);
        Class<?>[] functionTypes = Utils.getFunctionTypes(createInstance, false);
        if (!(createInstance instanceof org.apache.pulsar.functions.api.Function) && !(createInstance instanceof java.util.function.Function)) {
            throw new RuntimeException("User class must either be Function or java.util.Function");
        }
        if (builder.hasSource() && builder.getSource() != null && StringUtils.isNotBlank(builder.getSource().getClassName())) {
            try {
                String name = getTypeArg(builder.getSource().getClassName(), Source.class, classLoader).getName();
                builder.setSource(builder.getSourceBuilder().setTypeClassName(name));
                if (!builder.hasSink() || StringUtils.isBlank(builder.getSink().getClassName())) {
                    builder.setSink(builder.getSinkBuilder().setTypeClassName(name));
                }
            } catch (IllegalArgumentException e) {
                throw e;
            } catch (Exception e2) {
                log.error("Failed to validate source class", (Throwable) e2);
                throw new IllegalArgumentException("Failed to validate source class-name", e2);
            }
        } else if (StringUtils.isBlank(builder.getSourceBuilder().getTypeClassName())) {
            builder.setSource(builder.getSourceBuilder().setTypeClassName(functionTypes[0].getName()));
        }
        if (!builder.hasSink() || builder.getSink() == null || !StringUtils.isNotBlank(builder.getSink().getClassName())) {
            if (StringUtils.isBlank(builder.getSinkBuilder().getTypeClassName())) {
                builder.setSink(builder.getSinkBuilder().setTypeClassName(functionTypes[1].getName()));
                return;
            }
            return;
        }
        try {
            String name2 = getTypeArg(builder.getSink().getClassName(), Sink.class, classLoader).getName();
            builder.setSink(builder.getSinkBuilder().setTypeClassName(name2));
            if (!builder.hasSource() || StringUtils.isBlank(builder.getSource().getClassName())) {
                builder.setSource(builder.getSourceBuilder().setTypeClassName(name2));
            }
        } catch (IllegalArgumentException e3) {
            throw e3;
        } catch (Exception e4) {
            log.error("Failed to validate sink class", (Throwable) e4);
            throw new IllegalArgumentException("Failed to validate sink class-name", e4);
        }
    }

    private Class<?> getTypeArg(String str, Class<?> cls, ClassLoader classLoader) throws ClassNotFoundException {
        Class<?> loadClass = classLoader.loadClass(str);
        if (cls.isAssignableFrom(loadClass)) {
            return TypeResolver.resolveRawArgument((Type) cls, loadClass);
        }
        throw new IllegalArgumentException(String.format("class %s is not type of %s", str, cls.getName()));
    }

    private void validateTriggerRequestParams(String str, String str2, String str3, String str4, String str5, InputStream inputStream) {
        if (str == null) {
            throw new IllegalArgumentException("Tenant is not provided");
        }
        if (str2 == null) {
            throw new IllegalArgumentException("Namespace is not provided");
        }
        if (str3 == null) {
            throw new IllegalArgumentException("Function Name is not provided");
        }
        if (inputStream == null && str5 == null) {
            throw new IllegalArgumentException("Trigger Data is not provided");
        }
    }

    private Response getUnavailableResponse() {
        return Response.status(Response.Status.SERVICE_UNAVAILABLE).type("application/json").entity(new ErrorData("Function worker service is not done initializing. Please try again in a little while.")).build();
    }

    public static String createPackagePath(String str, String str2, String str3, String str4) {
        return String.format("%s/%s/%s/%s", str, str2, Codec.encode(str3), org.apache.pulsar.functions.worker.Utils.getUniquePackageName(Codec.encode(str4)));
    }

    public boolean isAuthorizedRole(String str, String str2) throws PulsarAdminException {
        if (!worker().getWorkerConfig().isAuthorizationEnabled() || isSuperUser(str2)) {
            return true;
        }
        TenantInfo tenantInfo = worker().getBrokerAdmin().tenants().getTenantInfo(str);
        return str2 != null && (tenantInfo.getAdminRoles() == null || tenantInfo.getAdminRoles().isEmpty() || tenantInfo.getAdminRoles().contains(str2));
    }

    public boolean isSuperUser(String str) {
        return str != null && worker().getWorkerConfig().getSuperUserRoles().contains(str);
    }
}
