package com.weibo.rill.flow.olympicene.traversal;

import com.google.common.base.Joiner;
import com.google.common.collect.Maps;
import com.weibo.rill.flow.interfaces.model.task.TaskInfo;
import com.weibo.rill.flow.interfaces.model.task.TaskStatus;
import com.weibo.rill.flow.olympicene.core.concurrent.ExecutionRunnable;
import com.weibo.rill.flow.olympicene.core.constant.SystemConfig;
import com.weibo.rill.flow.olympicene.core.helper.DAGWalkHelper;
import com.weibo.rill.flow.olympicene.core.lock.LockerKey;
import com.weibo.rill.flow.olympicene.core.model.NotifyInfo;
import com.weibo.rill.flow.olympicene.core.model.dag.DAGInfo;
import com.weibo.rill.flow.olympicene.core.model.dag.DAGStatus;
import com.weibo.rill.flow.olympicene.core.model.task.ForeachTask;
import com.weibo.rill.flow.olympicene.core.runtime.DAGContextStorage;
import com.weibo.rill.flow.olympicene.core.runtime.DAGInfoStorage;
import com.weibo.rill.flow.olympicene.core.runtime.DAGStorageProcedure;
import com.weibo.rill.flow.olympicene.traversal.helper.ContextHelper;
import com.weibo.rill.flow.olympicene.traversal.helper.PluginHelper;
import com.weibo.rill.flow.olympicene.traversal.helper.Stasher;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/weibo/rill/flow/olympicene/traversal/DAGTraversal.class */
public class DAGTraversal {
    private static final Logger log = LoggerFactory.getLogger(DAGTraversal.class);
    private final ContextHelper contextHelper = ContextHelper.getInstance();
    private final DAGContextStorage dagContextStorage;
    private final DAGInfoStorage dagInfoStorage;
    private final DAGStorageProcedure dagStorageProcedure;
    private final ExecutorService traversalExecutor;
    private DAGOperations dagOperations;
    private Stasher stasher;

    public DAGTraversal(DAGContextStorage dAGContextStorage, DAGInfoStorage dAGInfoStorage, DAGStorageProcedure dAGStorageProcedure, ExecutorService executorService) {
        this.dagContextStorage = dAGContextStorage;
        this.dagInfoStorage = dAGInfoStorage;
        this.dagStorageProcedure = dAGStorageProcedure;
        this.traversalExecutor = executorService;
    }

    public void submitTraversal(String str, String str2) {
        this.traversalExecutor.execute(new ExecutionRunnable(str, () -> {
            try {
                log.info("submitTraversal begin lock executionId:{}, completedTaskName:{}", str, str2);
                HashMap newHashMap = Maps.newHashMap();
                newHashMap.put("executionId", str);
                newHashMap.put("completedTaskName", str2);
                DAGOperations.OPERATE_WITH_RETRY.accept(PluginHelper.pluginInvokeChain(() -> {
                    this.dagStorageProcedure.lockAndRun(LockerKey.buildDagInfoLockName(str), () -> {
                        doTraversal(str, str2);
                    });
                }, newHashMap, (List<BiConsumer<Runnable, Map<String, Object>>>) SystemConfig.TRAVERSAL_CUSTOMIZED_PLUGINS), Integer.valueOf(SystemConfig.getTraversalRetryTimes()));
            } catch (Exception e) {
                log.error("executionId:{} traversal exception with completedTaskName:{}. ", new Object[]{str, str2, e});
            }
        }));
    }

    public void submitTasks(String str, Set<TaskInfo> set, Map<String, Object> map) {
        this.traversalExecutor.execute(new ExecutionRunnable(str, () -> {
            try {
                log.info("submitTasks begin get lock executionId:{}", str);
                DAGOperations.OPERATE_WITH_RETRY.accept(() -> {
                    this.dagStorageProcedure.lockAndRun(LockerKey.buildDagInfoLockName(str), () -> {
                        log.info("submitTasks begin execute task executionId:{}", str);
                        Set<TaskInfo> readyToRunTasks = DAGWalkHelper.getInstance().getReadyToRunTasks(set);
                        if (CollectionUtils.isNotEmpty(readyToRunTasks)) {
                            runTasks(str, this.contextHelper.getContext(readyToRunTasks, map));
                        }
                    });
                }, Integer.valueOf(SystemConfig.getTraversalRetryTimes()));
            } catch (Exception e) {
                log.error("dag {} traversal exception with tasks {}. ", new Object[]{str, Joiner.on(",").join((Iterable) set.stream().map((v0) -> {
                    return v0.getName();
                }).collect(Collectors.toList())), e});
            }
        }));
    }

    public void doTraversal(String str, String str2) {
        log.info("doTraversal start, executionId:{}", str);
        if (StringUtils.isEmpty(str2) || DAGWalkHelper.getInstance().isAncestorTask(str2)) {
            traversalAncestorTasks(str);
        } else {
            traversalNestedTasks(str, str2);
        }
    }

    private void traversalAncestorTasks(String str) {
        DAGInfo basicDAGInfo = this.dagInfoStorage.getBasicDAGInfo(str);
        if (basicDAGInfo == null || basicDAGInfo.getDagStatus().isCompleted()) {
            return;
        }
        Set<TaskInfo> readyToRunTasks = DAGWalkHelper.getInstance().getReadyToRunTasks(basicDAGInfo.getTasks().values());
        if (CollectionUtils.isNotEmpty(readyToRunTasks)) {
            runTasks(str, this.contextHelper.getContext(this.dagContextStorage, str, readyToRunTasks));
            return;
        }
        DAGStatus calculateDAGStatus = DAGWalkHelper.getInstance().calculateDAGStatus(basicDAGInfo);
        if (calculateDAGStatus.isCompleted()) {
            this.dagOperations.finishDAG(str, basicDAGInfo, calculateDAGStatus, null);
        }
        if (DAGStatus.KEY_SUCCEED.equals(calculateDAGStatus)) {
            this.dagOperations.finishDAG(str, basicDAGInfo, calculateDAGStatus, null);
        }
    }

    private void traversalNestedTasks(String str, String str2) {
        TaskInfo parentTaskInfoWithSibling = this.dagInfoStorage.getParentTaskInfoWithSibling(str, str2);
        if (parentTaskInfoWithSibling == null) {
            return;
        }
        Set<TaskInfo> readyToRunTasks = DAGWalkHelper.getInstance().getReadyToRunTasks(parentTaskInfoWithSibling.getChildren().values());
        if (CollectionUtils.isNotEmpty(readyToRunTasks)) {
            runTasks(str, this.contextHelper.getContext(this.dagContextStorage, str, readyToRunTasks));
            return;
        }
        TaskStatus calculateTaskStatus = DAGWalkHelper.getInstance().calculateTaskStatus(parentTaskInfoWithSibling.getChildren().values());
        if (calculateTaskStatus.isCompleted()) {
            this.dagOperations.finishTaskAsync(str, parentTaskInfoWithSibling.getTask().getCategory(), NotifyInfo.builder().taskInfoName(parentTaskInfoWithSibling.getName()).completedGroupIndex(DAGWalkHelper.getInstance().getTaskInfoGroupIndex(str2)).groupTaskStatus(calculateTaskStatus).tasks(parentTaskInfoWithSibling.getChildren()).build(), new HashMap());
        }
    }

    private void runTasks(String str, List<Pair<TaskInfo, Map<String, Object>>> list) {
        list.forEach(pair -> {
            TaskInfo taskInfo = (TaskInfo) pair.getLeft();
            if (!this.stasher.needStash(str, taskInfo, (Map) pair.getRight())) {
                taskInfo.setTaskStatus(TaskStatus.READY);
                return;
            }
            taskInfo.setTaskStatus(TaskStatus.STASHED);
            String taskInfoGroupIndex = DAGWalkHelper.getInstance().getTaskInfoGroupIndex(taskInfo.getName());
            if (taskInfoGroupIndex != null) {
                Optional.ofNullable(taskInfo.getParent()).filter(taskInfo2 -> {
                    return taskInfo2.getTask() instanceof ForeachTask;
                }).ifPresent(taskInfo3 -> {
                    taskInfo3.getSubGroupIndexToStatus().put(taskInfoGroupIndex, TaskStatus.STASHED);
                });
            }
        });
        this.dagInfoStorage.saveTaskInfos(str, (Set) list.stream().map((v0) -> {
            return v0.getLeft();
        }).collect(Collectors.toSet()));
        Map map = (Map) list.stream().collect(Collectors.groupingBy(pair2 -> {
            return ((TaskInfo) pair2.getLeft()).getTaskStatus();
        }));
        ((List) Optional.ofNullable((List) map.get(TaskStatus.STASHED)).orElse(new ArrayList())).forEach(pair3 -> {
            this.stasher.stash(str, pair3);
        });
        Optional.ofNullable((List) map.get(TaskStatus.READY)).filter((v0) -> {
            return CollectionUtils.isNotEmpty(v0);
        }).ifPresent(list2 -> {
            this.dagOperations.runTasks(str, list2);
        });
    }

    public void setDagOperations(DAGOperations dAGOperations) {
        this.dagOperations = dAGOperations;
    }

    public void setStasher(Stasher stasher) {
        this.stasher = stasher;
    }
}
