package com.weibo.rill.flow.olympicene.traversal;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.weibo.rill.flow.interfaces.model.mapping.Mapping;
import com.weibo.rill.flow.interfaces.model.strategy.Timeline;
import com.weibo.rill.flow.interfaces.model.task.FunctionPattern;
import com.weibo.rill.flow.interfaces.model.task.FunctionTask;
import com.weibo.rill.flow.interfaces.model.task.TaskInfo;
import com.weibo.rill.flow.interfaces.model.task.TaskInvokeMsg;
import com.weibo.rill.flow.interfaces.model.task.TaskStatus;
import com.weibo.rill.flow.olympicene.core.concurrent.ExecutionRunnable;
import com.weibo.rill.flow.olympicene.core.constant.SystemConfig;
import com.weibo.rill.flow.olympicene.core.event.Callback;
import com.weibo.rill.flow.olympicene.core.model.DAGSettings;
import com.weibo.rill.flow.olympicene.core.model.NotifyInfo;
import com.weibo.rill.flow.olympicene.core.model.dag.DAG;
import com.weibo.rill.flow.olympicene.core.model.dag.DAGInfo;
import com.weibo.rill.flow.olympicene.core.model.dag.DAGInvokeMsg;
import com.weibo.rill.flow.olympicene.core.model.dag.DAGResult;
import com.weibo.rill.flow.olympicene.core.model.dag.DAGStatus;
import com.weibo.rill.flow.olympicene.core.model.task.ExecutionResult;
import com.weibo.rill.flow.olympicene.core.model.task.TaskCategory;
import com.weibo.rill.flow.olympicene.core.result.DAGResultHandler;
import com.weibo.rill.flow.olympicene.storage.redis.api.RedisClient;
import com.weibo.rill.flow.olympicene.traversal.callback.CallbackInvoker;
import com.weibo.rill.flow.olympicene.traversal.callback.DAGCallbackInfo;
import com.weibo.rill.flow.olympicene.traversal.callback.DAGEvent;
import com.weibo.rill.flow.olympicene.traversal.constant.TraversalErrorCode;
import com.weibo.rill.flow.olympicene.traversal.exception.DAGTraversalException;
import com.weibo.rill.flow.olympicene.traversal.helper.PluginHelper;
import com.weibo.rill.flow.olympicene.traversal.runners.DAGRunner;
import com.weibo.rill.flow.olympicene.traversal.runners.TaskRunner;
import com.weibo.rill.flow.olympicene.traversal.runners.TimeCheckRunner;
import io.opentelemetry.javaagent.shaded.io.opentelemetry.api.trace.Span;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/weibo/rill/flow/olympicene/traversal/DAGOperations.class */
public class DAGOperations {
    private static final String EXECUTION_ID = "executionId";
    private static final String INVALID_TRACE_ID = "00000000000000000000000000000000";
    private static final int EXPIRE_7DAYS_SECOND = 604800;
    private static final String TRACE_ID_PREFIX = "trace_id_";
    private final ExecutorService runnerExecutor;
    private final Map<String, TaskRunner> taskRunners;
    private final DAGRunner dagRunner;
    private final TimeCheckRunner timeCheckRunner;
    private final DAGTraversal dagTraversal;
    private final Callback<DAGCallbackInfo> callback;
    private final DAGResultHandler dagResultHandler;
    private final RedisClient redisClient;
    private static final Logger log = LoggerFactory.getLogger(DAGOperations.class);
    public static final BiConsumer<Runnable, Integer> OPERATE_WITH_RETRY = (runnable, num) -> {
        int intValue = num.intValue();
        for (int i = 1; i <= intValue; i++) {
            try {
                runnable.run();
                return;
            } catch (Exception e) {
                log.warn("operateWithRetry fails, invokeTimes:{}", Integer.valueOf(i), e);
            }
        }
        runnable.run();
    };

    public DAGOperations(ExecutorService executorService, Map<String, TaskRunner> map, DAGRunner dAGRunner, TimeCheckRunner timeCheckRunner, DAGTraversal dAGTraversal, Callback<DAGCallbackInfo> callback, DAGResultHandler dAGResultHandler, RedisClient redisClient) {
        this.runnerExecutor = executorService;
        this.taskRunners = map;
        this.dagRunner = dAGRunner;
        this.timeCheckRunner = timeCheckRunner;
        this.dagTraversal = dAGTraversal;
        this.callback = callback;
        this.dagResultHandler = dAGResultHandler;
        this.redisClient = redisClient;
    }

    public void runTasks(String str, Collection<Pair<TaskInfo, Map<String, Object>>> collection) {
        log.info("runTasks begin submit task executionId:{}", str);
        collection.forEach(pair -> {
            this.runnerExecutor.execute(new ExecutionRunnable(str, () -> {
                TaskInfo taskInfo = (TaskInfo) pair.getLeft();
                try {
                    log.info("runTasks task begin to execute executionId:{} taskInfoName:{}", str, taskInfo.getName());
                    runTask(str, taskInfo, (Map) pair.getRight());
                } catch (Exception e) {
                    log.error("runTasks fails, executionId:{}, taskName:{}", new Object[]{str, taskInfo.getName(), e});
                }
            }));
        });
    }

    private void runTask(String str, TaskInfo taskInfo, Map<String, Object> map) {
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put(EXECUTION_ID, str);
        newHashMap.put("taskInfo", taskInfo);
        newHashMap.put("context", map);
        TaskRunner selectRunner = selectRunner(taskInfo);
        ExecutionResult executionResult = PluginHelper.pluginInvokeChain((Supplier<ExecutionResult>) () -> {
            return selectRunner.run(str, taskInfo, map);
        }, newHashMap, (List<BiFunction<Supplier<ExecutionResult>, Map<String, Object>, ExecutionResult>>) SystemConfig.TASK_RUN_CUSTOMIZED_PLUGINS).get();
        if (isTaskCompleted(executionResult)) {
            this.dagTraversal.submitTraversal(str, taskInfo.getName());
            invokeTaskCallback(str, taskInfo, map);
        }
        if (executionResult.getTaskStatus() == TaskStatus.RUNNING) {
            Optional.ofNullable(getTimeoutSeconds(executionResult.getInput(), new HashMap(), (Timeline) Optional.ofNullable(taskInfo.getTask()).map((v0) -> {
                return v0.getTimeline();
            }).orElse(null))).ifPresent(l -> {
                this.timeCheckRunner.addTaskToTimeoutCheck(str, taskInfo, l.longValue());
            });
        }
        if (executionResult.getTaskStatus() == TaskStatus.READY && executionResult.isNeedRetry()) {
            runTaskWithTimeInterval(str, executionResult.getTaskInfo(), executionResult.getContext(), executionResult.getRetryIntervalInSeconds());
        }
        if (CollectionUtils.isNotEmpty(executionResult.getSubTaskInfosAndContext())) {
            executionResult.getSubTaskInfosAndContext().forEach(pair -> {
                this.dagTraversal.submitTasks(str, (Set) pair.getLeft(), (Map) pair.getRight());
                safeSleep(10L);
            });
        }
    }

    private Long getTimeoutSeconds(Map<String, Object> map, Map<String, Object> map2, Timeline timeline) {
        if (timeline == null) {
            return null;
        }
        try {
            if (StringUtils.isBlank(timeline.getTimeoutInSeconds())) {
                return null;
            }
            ArrayList newArrayList = Lists.newArrayList();
            newArrayList.add(new Mapping(timeline.getTimeoutInSeconds(), "$.output.timeout"));
            HashMap newHashMap = Maps.newHashMap();
            this.taskRunners.get("function").inputMappings(map2, map, newHashMap, newArrayList);
            return (Long) Optional.ofNullable(newHashMap.get("timeout")).map(String::valueOf).map(Long::valueOf).filter(l -> {
                return l.longValue() > 0;
            }).orElse(null);
        } catch (Exception e) {
            log.warn("can not get timeout in seconds, source:{} errorMsg:{}", timeline, e.getMessage());
            return null;
        }
    }

    private void runTaskWithTimeInterval(String str, TaskInfo taskInfo, Map<String, Object> map, int i) {
        log.info("runTaskWithTimeInterval task start to check executionId:{} taskInfoName:{} intervalInSeconds:{}", new Object[]{str, taskInfo.getName(), Integer.valueOf(i)});
        if (i > 0) {
            this.timeCheckRunner.addTaskToWaitCheck(str, taskInfo, i);
        } else {
            runTasks(str, Lists.newArrayList(new Pair[]{Pair.of(taskInfo, map)}));
        }
    }

    public void finishTaskAsync(String str, String str2, NotifyInfo notifyInfo, Map<String, Object> map) {
        this.runnerExecutor.execute(new ExecutionRunnable(str, () -> {
            try {
                finishTaskSync(str, str2, notifyInfo, map);
            } catch (Exception e) {
                log.error("finishTaskAsync fails, executionId:{}, taskCategory:{}, notifyInfo:{}", new Object[]{str, str2, notifyInfo, e});
            }
        }));
    }

    public void finishTaskSync(String str, String str2, NotifyInfo notifyInfo, Map<String, Object> map) {
        log.info("finishTask task begin to execute executionId:{} notifyInfo:{}", str, notifyInfo);
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put(EXECUTION_ID, str);
        newHashMap.put("taskCategory", str2);
        newHashMap.put("notifyInfo", notifyInfo);
        newHashMap.put("output", map);
        TaskRunner selectRunner = selectRunner(str2);
        ExecutionResult executionResult = PluginHelper.pluginInvokeChain((Supplier<ExecutionResult>) () -> {
            return selectRunner.finish(str, notifyInfo, map);
        }, newHashMap, (List<BiFunction<Supplier<ExecutionResult>, Map<String, Object>, ExecutionResult>>) SystemConfig.TASK_FINISH_CUSTOMIZED_PLUGINS).get();
        if (executionResult.getTaskStatus() == TaskStatus.READY && executionResult.isNeedRetry()) {
            this.timeCheckRunner.remTaskFromTimeoutCheck(str, executionResult.getTaskInfo());
            runTaskWithTimeInterval(str, executionResult.getTaskInfo(), executionResult.getContext(), executionResult.getRetryIntervalInSeconds());
        }
        if (isTaskCompleted(executionResult)) {
            this.timeCheckRunner.remTaskFromTimeoutCheck(str, executionResult.getTaskInfo());
            this.dagTraversal.submitTraversal(str, executionResult.getTaskInfo().getName());
            invokeTaskCallback(str, executionResult.getTaskInfo(), executionResult.getContext());
        }
        if (StringUtils.isNotBlank(executionResult.getTaskNameNeedToTraversal())) {
            this.dagTraversal.submitTraversal(str, executionResult.getTaskNameNeedToTraversal());
        }
        if (isForeachTaskKeyCompleted(executionResult, notifyInfo.getCompletedGroupIndex()) || isSubFlowTaskKeyCompleted(executionResult)) {
            this.dagTraversal.submitTraversal(str, executionResult.getTaskInfo().getName());
        }
    }

    public void redoTask(String str, List<String> list, Map<String, Object> map) {
        log.info("redoTask task begin to execute executionId:{} taskNames:{}", str, list);
        this.dagRunner.resetTask(str, list, map);
        this.dagTraversal.submitTraversal(str, null);
    }

    public void submitDAG(String str, DAG dag, DAGSettings dAGSettings, Map<String, Object> map, NotifyInfo notifyInfo) {
        log.info("submitDAG task begin to execute executionId:{} notifyInfo:{}", str, notifyInfo);
        storageTraceIdAndExecutionIdToRedis(str);
        Optional.ofNullable(getTimeoutSeconds(new HashMap(), this.dagRunner.submitDAG(str, dag, dAGSettings, map, notifyInfo).getContext(), dag.getTimeline())).ifPresent(l -> {
            this.timeCheckRunner.addDAGToTimeoutCheck(str, l.longValue());
        });
        this.dagTraversal.submitTraversal(str, null);
    }

    private void storageTraceIdAndExecutionIdToRedis(String str) {
        if (StringUtils.isNotBlank(this.redisClient.get("trace_id_" + str))) {
            return;
        }
        Span current = Span.current();
        if (Objects.isNull(current)) {
            log.warn("submitDAG currentSpan is null executionId:{}", str);
            return;
        }
        String traceId = current.getSpanContext().getTraceId();
        log.info("submitDAG executionId:{} traceId:{}", str, traceId);
        if (StringUtils.isEmpty(traceId) || StringUtils.equals(traceId, INVALID_TRACE_ID)) {
            return;
        }
        this.redisClient.setex("trace_id_" + str, EXPIRE_7DAYS_SECOND, traceId);
    }

    public void finishDAG(String str, DAGInfo dAGInfo, DAGStatus dAGStatus, DAGInvokeMsg dAGInvokeMsg) {
        log.info("finishDAG task begin to execute executionId:{} dagStatus:{}", str, dAGStatus);
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put(EXECUTION_ID, str);
        newHashMap.put("dagInfo", dAGInfo);
        newHashMap.put("dagStatus", dAGStatus);
        newHashMap.put("dagInvokeMsg", dAGInvokeMsg);
        ExecutionResult executionResult = PluginHelper.pluginInvokeChain((Supplier<ExecutionResult>) () -> {
            return this.dagRunner.finishDAG(str, dAGInfo, dAGStatus, dAGInvokeMsg);
        }, newHashMap, (List<BiFunction<Supplier<ExecutionResult>, Map<String, Object>, ExecutionResult>>) SystemConfig.DAG_FINISH_CUSTOMIZED_PLUGINS).get();
        DAGInfo dagInfo = executionResult.getDagInfo();
        Map<String, Object> context = executionResult.getContext();
        this.timeCheckRunner.remDAGFromTimeoutCheck(str, dagInfo.getDag());
        ((List) Optional.ofNullable(dagInfo.getDagInvokeMsg()).map((v0) -> {
            return v0.getExecutionRoutes();
        }).orElse(new ArrayList())).stream().max(Comparator.comparingInt((v0) -> {
            return v0.getIndex();
        })).filter(executionInfo -> {
            return executionInfo.getExecutionType() == FunctionPattern.FLOW_SYNC;
        }).ifPresent(executionInfo2 -> {
            try {
                String executionId = executionInfo2.getExecutionId();
                String taskInfoName = executionInfo2.getTaskInfoName();
                TaskStatus calculateSubFlowTaskStatus = calculateSubFlowTaskStatus(dagInfo);
                finishTaskSync(executionId, TaskCategory.FUNCTION.getValue(), NotifyInfo.builder().taskInfoName(taskInfoName).taskStatus(calculateSubFlowTaskStatus).taskInvokeMsg((TaskInvokeMsg) Optional.ofNullable(dagInfo.getDagInvokeMsg()).map(dAGInvokeMsg2 -> {
                    return TaskInvokeMsg.builder().code(dAGInvokeMsg2.getCode()).msg(dAGInvokeMsg2.getMsg()).ext(dAGInvokeMsg2.getExt()).build();
                }).orElse(null)).build(), context);
            } catch (Exception e) {
                log.warn("finishDAG fails to finish task, executionInfo:{}", executionInfo2, e);
            }
        });
        trialClose(str, dAGStatus, dagInfo, context);
    }

    private TaskStatus calculateSubFlowTaskStatus(DAGInfo dAGInfo) {
        DAGStatus dagStatus = dAGInfo.getDagStatus();
        return dagStatus == DAGStatus.SUCCEED ? TaskStatus.SUCCEED : dagStatus == DAGStatus.KEY_SUCCEED ? TaskStatus.KEY_SUCCEED : TaskStatus.FAILED;
    }

    private void trialClose(String str, DAGStatus dAGStatus, DAGInfo dAGInfo, Map<String, Object> map) {
        invokeCallback(str, dAGStatus == DAGStatus.SUCCEED ? DAGEvent.DAG_SUCCEED : dAGStatus == DAGStatus.KEY_SUCCEED ? DAGEvent.DAG_KEY_SUCCEED : DAGEvent.DAG_FAILED, dAGInfo, null, map);
        setDAGResult(str, dAGInfo, map);
    }

    private void setDAGResult(String str, DAGInfo dAGInfo, Map<String, Object> map) {
        log.info("setDAGResult operations executionId:{}", str);
        if (this.dagResultHandler == null) {
            log.info("setDAGResult dagResultHandler null");
        } else {
            this.dagResultHandler.updateDAGResult(str, DAGResult.builder().dagInfo(dAGInfo).context(map).build());
        }
    }

    private void invokeTaskCallback(String str, TaskInfo taskInfo, Map<String, Object> map) {
        DAGEvent dAGEvent = DAGEvent.TASK_FINISH;
        if (taskInfo.getTaskStatus() == TaskStatus.FAILED) {
            dAGEvent = DAGEvent.TASK_FAILED;
        } else if (taskInfo.getTaskStatus() == TaskStatus.SKIPPED) {
            dAGEvent = DAGEvent.TASK_SKIPPED;
        }
        DAGInfo dAGInfo = new DAGInfo();
        dAGInfo.setExecutionId(str);
        invokeCallback(str, dAGEvent, dAGInfo, taskInfo, map);
    }

    private void invokeCallback(String str, DAGEvent dAGEvent, DAGInfo dAGInfo, TaskInfo taskInfo, Map<String, Object> map) {
        log.info("invokeCallback operations executionId:{} dagEvent:{} taskName:{}", new Object[]{str, dAGEvent, Optional.ofNullable(taskInfo).map((v0) -> {
            return v0.getName();
        }).orElse(null)});
        CallbackInvoker.getInstance().callback(this.callback, dAGEvent, dAGInfo, map, taskInfo);
    }

    private boolean isTaskCompleted(ExecutionResult executionResult) {
        return ((Boolean) Optional.ofNullable(executionResult.getTaskStatus()).map((v0) -> {
            return v0.isCompleted();
        }).orElse(false)).booleanValue();
    }

    private boolean isForeachTaskKeyCompleted(ExecutionResult executionResult, String str) {
        Optional ofNullable = Optional.ofNullable(executionResult.getTaskStatus());
        TaskStatus taskStatus = TaskStatus.KEY_SUCCEED;
        Objects.requireNonNull(taskStatus);
        if (((Boolean) ofNullable.map((v1) -> {
            return r1.equals(v1);
        }).orElse(false)).booleanValue() && StringUtils.isNotEmpty(str)) {
            Optional map = Optional.ofNullable(executionResult.getTaskInfo()).map((v0) -> {
                return v0.getSubGroupKeyJudgementMapping();
            }).map(map2 -> {
                return (Boolean) map2.get(str);
            });
            Boolean bool = Boolean.TRUE;
            Objects.requireNonNull(bool);
            if (((Boolean) map.filter((v1) -> {
                return r1.equals(v1);
            }).orElse(false)).booleanValue()) {
                return true;
            }
        }
        return false;
    }

    private boolean isSubFlowTaskKeyCompleted(ExecutionResult executionResult) {
        Optional ofNullable = Optional.ofNullable(executionResult.getTaskStatus());
        TaskStatus taskStatus = TaskStatus.KEY_SUCCEED;
        Objects.requireNonNull(taskStatus);
        if (((Boolean) ofNullable.map((v1) -> {
            return r1.equals(v1);
        }).orElse(false)).booleanValue()) {
            Optional map = Optional.ofNullable(executionResult.getTaskInfo()).map((v0) -> {
                return v0.getTask();
            });
            Class<FunctionTask> cls = FunctionTask.class;
            Objects.requireNonNull(FunctionTask.class);
            Optional filter = map.filter((v1) -> {
                return r1.isInstance(v1);
            });
            Class<FunctionTask> cls2 = FunctionTask.class;
            Objects.requireNonNull(FunctionTask.class);
            if (((Boolean) filter.map((v1) -> {
                return r1.cast(v1);
            }).map((v0) -> {
                return v0.getPattern();
            }).map(functionPattern -> {
                return Boolean.valueOf(FunctionPattern.FLOW_SYNC.equals(functionPattern) || FunctionPattern.FLOW_ASYNC.equals(functionPattern));
            }).orElse(false)).booleanValue()) {
                return true;
            }
        }
        return false;
    }

    private TaskRunner selectRunner(String str) {
        TaskRunner taskRunner = this.taskRunners.get(str);
        if (taskRunner == null) {
            throw new DAGTraversalException(TraversalErrorCode.TRAVERSAL_FAILED.getCode(), "runner is null.");
        }
        return taskRunner;
    }

    private TaskRunner selectRunner(TaskInfo taskInfo) {
        return selectRunner(taskInfo.getTask().getCategory());
    }

    private void safeSleep(long j) {
        try {
            Thread.sleep(j);
        } catch (Exception e) {
        }
    }
}
