package com.weibo.rill.flow.olympicene.traversal.runners;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.weibo.rill.flow.interfaces.model.mapping.Mapping;
import com.weibo.rill.flow.interfaces.model.strategy.DispatchInfo;
import com.weibo.rill.flow.interfaces.model.task.FunctionPattern;
import com.weibo.rill.flow.interfaces.model.task.FunctionTask;
import com.weibo.rill.flow.interfaces.model.task.TaskInfo;
import com.weibo.rill.flow.interfaces.model.task.TaskInvokeMsg;
import com.weibo.rill.flow.interfaces.model.task.TaskStatus;
import com.weibo.rill.flow.olympicene.core.lock.LockerKey;
import com.weibo.rill.flow.olympicene.core.model.NotifyInfo;
import com.weibo.rill.flow.olympicene.core.model.strategy.RetryContext;
import com.weibo.rill.flow.olympicene.core.model.strategy.RetryPolicy;
import com.weibo.rill.flow.olympicene.core.model.strategy.SimpleRetryPolicy;
import com.weibo.rill.flow.olympicene.core.model.task.ExecutionResult;
import com.weibo.rill.flow.olympicene.core.runtime.DAGContextStorage;
import com.weibo.rill.flow.olympicene.core.runtime.DAGInfoStorage;
import com.weibo.rill.flow.olympicene.core.runtime.DAGStorageProcedure;
import com.weibo.rill.flow.olympicene.core.switcher.SwitcherManager;
import com.weibo.rill.flow.olympicene.traversal.constant.TraversalErrorCode;
import com.weibo.rill.flow.olympicene.traversal.dispatcher.DAGDispatcher;
import com.weibo.rill.flow.olympicene.traversal.exception.DAGTraversalException;
import com.weibo.rill.flow.olympicene.traversal.helper.ContextHelper;
import com.weibo.rill.flow.olympicene.traversal.mappings.InputOutputMapping;
import com.weibo.rill.flow.olympicene.traversal.serialize.DAGTraversalSerializer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/weibo/rill/flow/olympicene/traversal/runners/FunctionTaskRunner.class */
public class FunctionTaskRunner extends AbstractTaskRunner {
    private static final Logger log = LoggerFactory.getLogger(FunctionTaskRunner.class);
    private final DAGDispatcher dagDispatcher;
    private final RetryPolicy retryPolicy;

    /* renamed from: com.weibo.rill.flow.olympicene.traversal.runners.FunctionTaskRunner$4, reason: invalid class name */
    /* loaded from: input_file:com/weibo/rill/flow/olympicene/traversal/runners/FunctionTaskRunner$4.class */
    static /* synthetic */ class AnonymousClass4 {
        static final /* synthetic */ int[] $SwitchMap$com$weibo$rill$flow$interfaces$model$task$FunctionPattern = new int[FunctionPattern.values().length];

        static {
            try {
                $SwitchMap$com$weibo$rill$flow$interfaces$model$task$FunctionPattern[FunctionPattern.TASK_SYNC.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$weibo$rill$flow$interfaces$model$task$FunctionPattern[FunctionPattern.TASK_SCHEDULER.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$weibo$rill$flow$interfaces$model$task$FunctionPattern[FunctionPattern.TASK_ASYNC.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$weibo$rill$flow$interfaces$model$task$FunctionPattern[FunctionPattern.FLOW_SYNC.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$com$weibo$rill$flow$interfaces$model$task$FunctionPattern[FunctionPattern.FLOW_ASYNC.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
        }
    }

    public FunctionTaskRunner(DAGDispatcher dAGDispatcher, InputOutputMapping inputOutputMapping, DAGContextStorage dAGContextStorage, DAGInfoStorage dAGInfoStorage, DAGStorageProcedure dAGStorageProcedure, SwitcherManager switcherManager) {
        this(dAGDispatcher, inputOutputMapping, dAGContextStorage, dAGInfoStorage, dAGStorageProcedure, new SimpleRetryPolicy(), switcherManager);
    }

    public FunctionTaskRunner(DAGDispatcher dAGDispatcher, InputOutputMapping inputOutputMapping, DAGContextStorage dAGContextStorage, DAGInfoStorage dAGInfoStorage, DAGStorageProcedure dAGStorageProcedure, RetryPolicy retryPolicy, SwitcherManager switcherManager) {
        super(inputOutputMapping, dAGInfoStorage, dAGContextStorage, dAGStorageProcedure, switcherManager);
        this.dagDispatcher = dAGDispatcher;
        this.retryPolicy = retryPolicy;
    }

    @Override // com.weibo.rill.flow.olympicene.traversal.runners.AbstractTaskRunner
    protected ExecutionResult doRun(String str, TaskInfo taskInfo, Map<String, Object> map) {
        log.info("function task begin to run executionId:{}, taskInfoName:{}", str, taskInfo.getName());
        AtomicReference atomicReference = new AtomicReference();
        this.dagStorageProcedure.lockAndRun(LockerKey.buildTaskInfoLockName(str, taskInfo.getName()), () -> {
            FunctionPattern pattern = taskInfo.getTask().getPattern();
            switch (AnonymousClass4.$SwitchMap$com$weibo$rill$flow$interfaces$model$task$FunctionPattern[pattern.ordinal()]) {
                case 1:
                case 2:
                case 3:
                    atomicReference.set(dispatchTask(str, taskInfo, map, pattern, (v0) -> {
                        return v0.isSuccessOrSkip();
                    }));
                    return;
                case 4:
                case 5:
                    atomicReference.set(dispatchTask(str, taskInfo, map, pattern, taskStatus -> {
                        return Boolean.valueOf(!taskStatus.isFailed());
                    }));
                    return;
                default:
                    throw new DAGTraversalException(TraversalErrorCode.OPERATION_UNSUPPORTED.getCode(), String.format("%s not supported", pattern));
            }
        });
        log.info("run function task completed, executionId:{}, taskInfoName:{}", str, taskInfo.getName());
        return (ExecutionResult) atomicReference.get();
    }

    private ExecutionResult dispatchTask(String str, TaskInfo taskInfo, Map<String, Object> map, FunctionPattern functionPattern, Function<TaskStatus, Boolean> function) {
        try {
            JsonNode retJson = getRetJson(this.dagDispatcher.dispatch(DispatchInfo.builder().taskInfo(taskInfo).input(map).executionId(str).build()));
            TaskInvokeMsg buildInvokeMsg = buildInvokeMsg(retJson);
            Map<String, Object> buildOutput = buildOutput(retJson);
            return executionCallback(str, taskInfo, NotifyInfo.builder().taskInfoName(taskInfo.getName()).taskStatus(buildTaskStatus(buildOutput, taskInfo, buildInvokeMsg)).taskInvokeMsg(buildInvokeMsg).build(), buildOutput, function);
        } catch (Exception e) {
            log.error("dispatchTask fails, executionId:{}, taskName:{}, functionPattern:{}, errorMsg:{}", new Object[]{str, taskInfo.getName(), functionPattern, e.getMessage()});
            RetryContext build = RetryContext.builder().retryConfig(taskInfo.getTask().getRetry()).taskStatus(TaskStatus.FAILED).taskInfo(taskInfo).build();
            if (!this.retryPolicy.needRetry(build)) {
                throw new DAGTraversalException(TraversalErrorCode.TRAVERSAL_FAILED.getCode(), e.getMessage());
            }
            NotifyInfo build2 = NotifyInfo.builder().retryContext(build).build();
            if (Optional.ofNullable(taskInfo.getTaskInvokeMsg()).map((v0) -> {
                return v0.getMsg();
            }).isEmpty()) {
                build2.setTaskInvokeMsg(TaskInvokeMsg.builder().msg(e.getMessage()).build());
            }
            return handleRetryCallback(str, taskInfo, build2);
        }
    }

    private Map<String, Object> buildOutput(JsonNode jsonNode) {
        if (jsonNode == null) {
            return null;
        }
        HashMap newHashMap = Maps.newHashMap();
        if (jsonNode.isObject()) {
            newHashMap.putAll((Map) DAGTraversalSerializer.MAPPER.convertValue(jsonNode, new TypeReference<Map<String, Object>>() { // from class: com.weibo.rill.flow.olympicene.traversal.runners.FunctionTaskRunner.1
            }));
        } else if (jsonNode.isArray()) {
            newHashMap.put("data", DAGTraversalSerializer.MAPPER.convertValue(jsonNode, new TypeReference<List<Object>>() { // from class: com.weibo.rill.flow.olympicene.traversal.runners.FunctionTaskRunner.2
            }));
        }
        return newHashMap;
    }

    private TaskStatus buildTaskStatus(Map<String, Object> map, TaskInfo taskInfo, TaskInvokeMsg taskInvokeMsg) {
        FunctionTask functionTask = (FunctionTask) taskInfo.getTask();
        FunctionPattern pattern = functionTask.getPattern();
        return (pattern == FunctionPattern.TASK_SCHEDULER || pattern == FunctionPattern.TASK_ASYNC) ? TaskStatus.RUNNING : pattern == FunctionPattern.TASK_SYNC ? taskTypeStatus(map, functionTask, null) : (pattern == FunctionPattern.FLOW_ASYNC || pattern == FunctionPattern.FLOW_SYNC) ? flowTypeStatus(taskInvokeMsg, pattern) : TaskStatus.RUNNING;
    }

    private TaskStatus taskTypeStatus(Map<String, Object> map, FunctionTask functionTask, TaskStatus taskStatus) {
        if (map != null) {
            return CollectionUtils.isNotEmpty(functionTask.getSuccessConditions()) ? conditionsAllMatch(functionTask.getSuccessConditions(), map, "output") ? TaskStatus.SUCCEED : TaskStatus.FAILED : CollectionUtils.isNotEmpty(functionTask.getFailConditions()) ? conditionsAllMatch(functionTask.getFailConditions(), map, "output") ? TaskStatus.FAILED : TaskStatus.SUCCEED : taskStatus != null ? taskStatus : (TaskStatus) Optional.of(map).map(map2 -> {
                return map2.get("result_type");
            }).map(String::valueOf).filter(str -> {
                return !"SUCCESS".equalsIgnoreCase(str);
            }).map(str2 -> {
                return TaskStatus.FAILED;
            }).orElse(TaskStatus.SUCCEED);
        }
        log.warn("output is null");
        return TaskStatus.FAILED;
    }

    private TaskStatus flowTypeStatus(TaskInvokeMsg taskInvokeMsg, FunctionPattern functionPattern) {
        return (TaskStatus) Optional.ofNullable(taskInvokeMsg).map((v0) -> {
            return v0.getReferencedDAGExecutionId();
        }).filter((v0) -> {
            return StringUtils.isNotBlank(v0);
        }).map(str -> {
            return functionPattern == FunctionPattern.FLOW_ASYNC ? TaskStatus.SUCCEED : TaskStatus.RUNNING;
        }).orElse(TaskStatus.FAILED);
    }

    private TaskInvokeMsg buildInvokeMsg(JsonNode jsonNode) {
        TaskInvokeMsg taskInvokeMsg = new TaskInvokeMsg();
        if (jsonNode != null) {
            if (jsonNode.isObject()) {
                appendInvokeInfo(jsonNode, taskInvokeMsg);
                JsonNode jsonNode2 = jsonNode.get("data");
                if (jsonNode2 != null && jsonNode2.isObject()) {
                    appendInvokeInfo(jsonNode2, taskInvokeMsg);
                }
                return taskInvokeMsg;
            }
        }
        return taskInvokeMsg;
    }

    private void appendInvokeInfo(JsonNode jsonNode, TaskInvokeMsg taskInvokeMsg) {
        Optional map = Optional.ofNullable(jsonNode.has("error_code") ? jsonNode.get("error_code") : jsonNode.get("code")).map((v0) -> {
            return v0.asText();
        });
        Objects.requireNonNull(taskInvokeMsg);
        map.ifPresent(taskInvokeMsg::setCode);
        Optional map2 = Optional.ofNullable(jsonNode.has("error_msg") ? jsonNode.get("error_msg") : jsonNode.get("msg")).map((v0) -> {
            return v0.asText();
        });
        Objects.requireNonNull(taskInvokeMsg);
        map2.ifPresent(taskInvokeMsg::setMsg);
        Optional map3 = Optional.ofNullable(jsonNode.get("invoke_id")).map((v0) -> {
            return v0.asText();
        });
        Objects.requireNonNull(taskInvokeMsg);
        map3.ifPresent(taskInvokeMsg::setInvokeId);
        Optional map4 = Optional.ofNullable(jsonNode.get("execution_id")).map((v0) -> {
            return v0.asText();
        });
        Objects.requireNonNull(taskInvokeMsg);
        map4.ifPresent(taskInvokeMsg::setReferencedDAGExecutionId);
        if (this.switcherManager.getSwitcherState("ENABLE_SET_INPUT_OUTPUT")) {
            taskInvokeMsg.setOutput((Map) new ObjectMapper().convertValue(jsonNode, new TypeReference<Map<String, Object>>() { // from class: com.weibo.rill.flow.olympicene.traversal.runners.FunctionTaskRunner.3
            }));
        }
    }

    private JsonNode getRetJson(String str) {
        try {
            if (StringUtils.isBlank(str)) {
                return null;
            }
            return DAGTraversalSerializer.MAPPER.readTree(str);
        } catch (Exception e) {
            log.warn("getRetJson fails, dispatchRet:{}, errorMsg:{}", str, e.getMessage());
            return null;
        }
    }

    @Override // com.weibo.rill.flow.olympicene.traversal.runners.AbstractTaskRunner, com.weibo.rill.flow.olympicene.traversal.runners.TaskRunner
    public ExecutionResult finish(String str, NotifyInfo notifyInfo, Map<String, Object> map) {
        log.info("function task begin to finish executionId:{}, notifyInfo:{}", str, notifyInfo);
        AtomicReference atomicReference = new AtomicReference();
        this.dagStorageProcedure.lockAndRun(LockerKey.buildTaskInfoLockName(str, notifyInfo.getTaskInfoName()), () -> {
            validateDAGInfo(str);
            TaskInfo basicTaskInfo = this.dagInfoStorage.getBasicTaskInfo(str, notifyInfo.getTaskInfoName());
            finishActionValidateTaskInfo(str, notifyInfo.getTaskInfoName(), basicTaskInfo);
            FunctionTask functionTask = (FunctionTask) basicTaskInfo.getTask();
            notifyInfo.setTaskStatus(taskTypeStatus(map, functionTask, notifyInfo.getTaskStatus()));
            atomicReference.set(executionCallback(str, basicTaskInfo, notifyInfo, map, taskStatus -> {
                return Boolean.valueOf(!taskStatus.isFailed() || CollectionUtils.isNotEmpty(functionTask.getSuccessConditions()) || CollectionUtils.isNotEmpty(functionTask.getFailConditions()));
            }));
        });
        return (ExecutionResult) atomicReference.get();
    }

    private ExecutionResult executionCallback(String str, TaskInfo taskInfo, NotifyInfo notifyInfo, Map<String, Object> map, Function<TaskStatus, Boolean> function) {
        RetryContext build = RetryContext.builder().retryConfig(taskInfo.getTask().getRetry()).taskStatus(notifyInfo.getTaskStatus()).taskInfo(taskInfo).build();
        boolean needRetry = this.retryPolicy.needRetry(build);
        log.info("executionCallback start executionId:{}, taskInfoName:{}, needRetry:{}", new Object[]{str, taskInfo.getName(), Boolean.valueOf(needRetry)});
        if (!needRetry) {
            return handleNormalCallback(str, taskInfo, notifyInfo, map, function);
        }
        notifyInfo.setRetryContext(build);
        return handleRetryCallback(str, taskInfo, notifyInfo);
    }

    private ExecutionResult handleNormalCallback(String str, TaskInfo taskInfo, NotifyInfo notifyInfo, Map<String, Object> map, Function<TaskStatus, Boolean> function) {
        log.info("handleNormalCallback executionId:{}, taskInfoName:{}", str, taskInfo.getName());
        TaskStatus taskStatus = notifyInfo.getTaskStatus();
        taskInfo.updateInvokeMsg(notifyInfo.getTaskInvokeMsg());
        taskInfo.setTaskStatus((taskStatus == TaskStatus.FAILED && taskInfo.getTask().isTolerance()) ? TaskStatus.SKIPPED : taskStatus);
        if (taskInfo.getTaskStatus().isCompleted()) {
            updateTaskInvokeEndTime(taskInfo);
        }
        Map<String, Object> map2 = null;
        List<Mapping> outputMappings = taskInfo.getTask().getOutputMappings();
        if (function.apply(taskInfo.getTaskStatus()).booleanValue() && MapUtils.isNotEmpty(map) && CollectionUtils.isNotEmpty(outputMappings)) {
            map2 = ContextHelper.getInstance().getContext(this.dagContextStorage, str, taskInfo);
            outputMappings(map2, new HashMap(), map, outputMappings);
            saveContext(str, map2, Sets.newHashSet(new TaskInfo[]{taskInfo}));
        }
        this.dagInfoStorage.saveTaskInfos(str, ImmutableSet.of(taskInfo));
        return ExecutionResult.builder().taskStatus(taskInfo.getTaskStatus()).taskInfo(taskInfo).context(map2).build();
    }

    private ExecutionResult handleRetryCallback(String str, TaskInfo taskInfo, NotifyInfo notifyInfo) {
        log.info("handleRetryCallback executionId:{} taskInfoName:{}", str, taskInfo.getName());
        taskInfo.setTaskStatus(TaskStatus.READY);
        taskInfo.updateInvokeMsg(notifyInfo.getTaskInvokeMsg());
        updateTaskInvokeEndTime(taskInfo);
        this.dagInfoStorage.saveTaskInfos(str, ImmutableSet.of(taskInfo));
        int calculateRetryInterval = this.retryPolicy.calculateRetryInterval(notifyInfo.getRetryContext());
        return ExecutionResult.builder().needRetry(true).retryIntervalInSeconds(calculateRetryInterval).taskStatus(taskInfo.getTaskStatus()).taskInfo(taskInfo).context(calculateRetryInterval != 0 ? null : ContextHelper.getInstance().getContext(this.dagContextStorage, str, taskInfo)).build();
    }
}
