package com.weibo.rill.flow.olympicene.traversal.runners;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.weibo.rill.flow.interfaces.model.mapping.Mapping;
import com.weibo.rill.flow.interfaces.model.resource.BaseResource;
import com.weibo.rill.flow.interfaces.model.task.BaseTask;
import com.weibo.rill.flow.interfaces.model.task.FunctionTask;
import com.weibo.rill.flow.interfaces.model.task.InvokeTimeInfo;
import com.weibo.rill.flow.interfaces.model.task.TaskInfo;
import com.weibo.rill.flow.interfaces.model.task.TaskStatus;
import com.weibo.rill.flow.olympicene.core.constant.SystemConfig;
import com.weibo.rill.flow.olympicene.core.helper.DAGInfoMaker;
import com.weibo.rill.flow.olympicene.core.helper.DAGWalkHelper;
import com.weibo.rill.flow.olympicene.core.lock.LockerKey;
import com.weibo.rill.flow.olympicene.core.model.DAGSettings;
import com.weibo.rill.flow.olympicene.core.model.NotifyInfo;
import com.weibo.rill.flow.olympicene.core.model.dag.DAG;
import com.weibo.rill.flow.olympicene.core.model.dag.DAGInfo;
import com.weibo.rill.flow.olympicene.core.model.dag.DAGInvokeMsg;
import com.weibo.rill.flow.olympicene.core.model.dag.DAGStatus;
import com.weibo.rill.flow.olympicene.core.model.strategy.CallbackConfig;
import com.weibo.rill.flow.olympicene.core.model.task.ExecutionResult;
import com.weibo.rill.flow.olympicene.core.runtime.DAGContextStorage;
import com.weibo.rill.flow.olympicene.core.runtime.DAGInfoStorage;
import com.weibo.rill.flow.olympicene.core.runtime.DAGStorageProcedure;
import com.weibo.rill.flow.olympicene.traversal.constant.TraversalErrorCode;
import com.weibo.rill.flow.olympicene.traversal.exception.DAGTraversalException;
import com.weibo.rill.flow.olympicene.traversal.helper.Stasher;
import com.weibo.rill.flow.olympicene.traversal.mappings.JSONPathInputOutputMapping;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.Generated;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/weibo/rill/flow/olympicene/traversal/runners/DAGRunner.class */
public class DAGRunner {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(DAGRunner.class);
    private final DAGContextStorage dagContextStorage;
    private final DAGInfoStorage dagInfoStorage;
    private final DAGStorageProcedure dagStorageProcedure;
    private Stasher stasher;

    public DAGRunner(DAGContextStorage dAGContextStorage, DAGInfoStorage dAGInfoStorage, DAGStorageProcedure dAGStorageProcedure) {
        this.dagContextStorage = dAGContextStorage;
        this.dagInfoStorage = dAGInfoStorage;
        this.dagStorageProcedure = dAGStorageProcedure;
    }

    public ExecutionResult submitDAG(String str, DAG dag, DAGSettings dAGSettings, Map<String, Object> map, NotifyInfo notifyInfo) {
        ExecutionResult build = ExecutionResult.builder().build();
        this.dagStorageProcedure.lockAndRun(LockerKey.buildDagInfoLockName(str), () -> {
            submitValidate(str, dag, dAGSettings.isIgnoreExist(), this.dagInfoStorage.getBasicDAGInfo(str));
            HashMap newHashMap = Maps.newHashMap();
            ((Map) Optional.ofNullable(dag.getDefaultContext()).orElse(Collections.emptyMap())).forEach((str2, str3) -> {
                newHashMap.put(str2, JSONPathInputOutputMapping.parseSource(str3));
            });
            Optional ofNullable = Optional.ofNullable(map);
            Objects.requireNonNull(newHashMap);
            ofNullable.ifPresent(newHashMap::putAll);
            build.setContext(newHashMap);
            dag.setDefaultContext((Map) null);
            handleMappingReference(1, dag.getCommonMapping(), dag.getTasks());
            handleMappingReference(dag.getCommonMapping(), dag.getCallbackConfig());
            dag.setCommonMapping((Map) null);
            Optional.ofNullable(dag.getResources()).ifPresent(list -> {
                handleResources(1, (Map) list.stream().collect(Collectors.toMap((v0) -> {
                    return v0.getName();
                }, baseResource -> {
                    return baseResource;
                })), dag.getTasks());
            });
            dag.setResources((List) null);
            DAGInvokeMsg buildInvokeMsg = buildInvokeMsg(str, dAGSettings, notifyInfo);
            DAGInfo make = new DAGInfoMaker().dag(dag).executionId(str).dagInvokeMsg(buildInvokeMsg).dagStatus(DAGStatus.RUNNING).make();
            build.setDagInfo(make);
            Optional.ofNullable(buildInvokeMsg).map((v0) -> {
                return v0.getExecutionRoutes();
            }).filter((v0) -> {
                return CollectionUtils.isNotEmpty(v0);
            }).map(list2 -> {
                return (DAGInvokeMsg.ExecutionInfo) list2.get(0);
            }).map((v0) -> {
                return v0.getExecutionId();
            }).filter((v0) -> {
                return StringUtils.isNotBlank(v0);
            }).ifPresent(str4 -> {
                newHashMap.putIfAbsent("flow_root_execution_id", str4);
            });
            this.dagContextStorage.updateContext(str, newHashMap);
            this.dagInfoStorage.saveDAGInfo(str, make);
        });
        return build;
    }

    private void handleResources(int i, Map<String, BaseResource> map, List<BaseTask> list) {
        if (MapUtils.isEmpty(map) || CollectionUtils.isEmpty(list)) {
            return;
        }
        if (i > SystemConfig.getTaskMaxDepth()) {
            throw new DAGTraversalException(TraversalErrorCode.DAG_ILLEGAL_STATE.getCode(), "exceed max depth");
        }
        list.stream().peek(baseTask -> {
            handleResources(i + 1, map, baseTask.subTasks());
        }).filter(baseTask2 -> {
            return baseTask2 instanceof FunctionTask;
        }).map(baseTask3 -> {
            return (FunctionTask) baseTask3;
        }).filter(functionTask -> {
            return functionTask.getResource() == null;
        }).filter(functionTask2 -> {
            return StringUtils.isNotBlank(functionTask2.getResourceName());
        }).forEach(functionTask3 -> {
            String[] split = functionTask3.getResourceName().split("://");
            if (split.length == 2 && "resource".equals(split[0])) {
                functionTask3.setResource((BaseResource) Optional.ofNullable((BaseResource) map.get(split[1])).orElseThrow(() -> {
                    return new DAGTraversalException(TraversalErrorCode.DAG_ILLEGAL_STATE.getCode(), functionTask3.getName() + " can not find task resource " + functionTask3.getResourceName());
                }));
            }
        });
    }

    private void handleMappingReference(Map<String, List<Mapping>> map, CallbackConfig callbackConfig) {
        List<Mapping> list = (List) Optional.ofNullable(callbackConfig).map((v0) -> {
            return v0.getInputMappings();
        }).orElse(null);
        if (MapUtils.isEmpty(map) || CollectionUtils.isEmpty(list)) {
            return;
        }
        callbackConfig.setInputMappings(includeReferenceMappings(map, list));
    }

    private void handleMappingReference(int i, Map<String, List<Mapping>> map, List<BaseTask> list) {
        if (MapUtils.isEmpty(map) || CollectionUtils.isEmpty(list)) {
            return;
        }
        if (i > SystemConfig.getTaskMaxDepth()) {
            throw new DAGTraversalException(TraversalErrorCode.DAG_ILLEGAL_STATE.getCode(), "exceed max depth");
        }
        list.forEach(baseTask -> {
            baseTask.setInputMappings(includeReferenceMappings(map, baseTask.getInputMappings()));
            baseTask.setOutputMappings(includeReferenceMappings(map, baseTask.getOutputMappings()));
            handleMappingReference(i + 1, map, baseTask.subTasks());
        });
    }

    private List<Mapping> includeReferenceMappings(Map<String, List<Mapping>> map, List<Mapping> list) {
        if (CollectionUtils.isEmpty(list)) {
            return list;
        }
        ArrayList newArrayList = Lists.newArrayList();
        for (Mapping mapping : list) {
            if (StringUtils.isBlank(mapping.getReference())) {
                newArrayList.add(mapping);
            } else {
                Optional ofNullable = Optional.ofNullable(map.get(mapping.getReference()));
                Objects.requireNonNull(newArrayList);
                ofNullable.ifPresent((v1) -> {
                    r1.addAll(v1);
                });
            }
        }
        return newArrayList;
    }

    private DAGInvokeMsg buildInvokeMsg(String str, DAGSettings dAGSettings, NotifyInfo notifyInfo) {
        DAGInvokeMsg build = DAGInvokeMsg.builder().invokeTimeInfos(Lists.newArrayList(new InvokeTimeInfo[]{InvokeTimeInfo.builder().startTimeInMillisecond(Long.valueOf(System.currentTimeMillis())).build()})).callbackConfig((CallbackConfig) Optional.ofNullable(notifyInfo).map((v0) -> {
            return v0.getCallbackConfig();
        }).orElse(null)).build();
        if (notifyInfo == null || StringUtils.isBlank(notifyInfo.getParentDAGExecutionId())) {
            return build;
        }
        List list = (List) Optional.ofNullable(this.dagInfoStorage.getBasicDAGInfo(notifyInfo.getParentDAGExecutionId()).getDagInvokeMsg()).map((v0) -> {
            return v0.getExecutionRoutes();
        }).orElse(new ArrayList());
        ArrayList newArrayList = Lists.newArrayList(list);
        newArrayList.add(DAGInvokeMsg.ExecutionInfo.builder().index(list.size() + 1).executionId(notifyInfo.getParentDAGExecutionId()).taskInfoName(notifyInfo.getParentDAGTaskInfoName()).executionType(notifyInfo.getParentDAGTaskExecutionType()).build());
        if (newArrayList.size() < dAGSettings.getDagMaxDepth()) {
            build.setExecutionRoutes(newArrayList);
            return build;
        }
        String str2 = (String) newArrayList.stream().sorted(Comparator.comparingInt((v0) -> {
            return v0.getIndex();
        })).map(executionInfo -> {
            return executionInfo.getExecutionId() + "#" + executionInfo.getTaskInfoName();
        }).collect(Collectors.joining("->"));
        log.warn("submitDAG exceed max dag depth, executionId:{}, maxDAGDepth:{}, route:{}", new Object[]{str, Integer.valueOf(dAGSettings.getDagMaxDepth()), str2});
        throw new DAGTraversalException(TraversalErrorCode.OPERATION_UNSUPPORTED.getCode(), "exceed max depth, dag route: " + str2);
    }

    private void submitValidate(String str, DAG dag, boolean z, DAGInfo dAGInfo) {
        if (dAGInfo != null && !z) {
            throw new DAGTraversalException(TraversalErrorCode.DAG_ALREADY_EXIST.getCode(), "dag info " + str + " already exists");
        }
        if (dag == null) {
            throw new DAGTraversalException(TraversalErrorCode.DAG_EXECUTION_NOT_FOUND.getCode(), "dag not found");
        }
    }

    public ExecutionResult finishDAG(String str, DAGInfo dAGInfo, DAGStatus dAGStatus, DAGInvokeMsg dAGInvokeMsg) {
        log.info("finishDAG action start, executionId:{}, dagStatus:{}", str, dAGStatus);
        if (dAGInfo == null) {
            dAGInfo = this.dagInfoStorage.getBasicDAGInfo(str);
        }
        dAGInfo.setDagStatus(dAGStatus);
        dAGInfo.updateInvokeMsg();
        Map tasks = dAGInfo.getTasks();
        dAGInfo.setTasks(new LinkedHashMap());
        dAGInfo.updateInvokeMsg(dAGInvokeMsg);
        updateDAGInvokeEndTime(dAGInfo);
        this.dagInfoStorage.saveDAGInfo(str, dAGInfo);
        dAGInfo.setTasks(tasks);
        DAGInfo dAGInfo2 = this.dagInfoStorage.getDAGInfo(str);
        Map<String, Object> context = this.dagContextStorage.getContext(str);
        this.dagInfoStorage.clearDAGInfo(str);
        this.dagContextStorage.clearContext(str);
        if (this.stasher.needStashFlow(dAGInfo, dAGStatus)) {
            this.stasher.stashFlow(dAGInfo2, context);
        }
        log.info("finishDAG finish, executionId:{}", str);
        return ExecutionResult.builder().dagInfo(dAGInfo2).context(context).build();
    }

    private void updateDAGInvokeStartTime(DAGInfo dAGInfo) {
        getInvokeTimeInfoList(dAGInfo).add(InvokeTimeInfo.builder().startTimeInMillisecond(Long.valueOf(System.currentTimeMillis())).build());
    }

    private void updateDAGInvokeEndTime(DAGInfo dAGInfo) {
        List<InvokeTimeInfo> invokeTimeInfoList = getInvokeTimeInfoList(dAGInfo);
        if (CollectionUtils.isNotEmpty(invokeTimeInfoList)) {
            invokeTimeInfoList.get(invokeTimeInfoList.size() - 1).setEndTimeInMillisecond(Long.valueOf(System.currentTimeMillis()));
        }
    }

    private List<InvokeTimeInfo> getInvokeTimeInfoList(DAGInfo dAGInfo) {
        DAGInvokeMsg dAGInvokeMsg = (DAGInvokeMsg) Optional.ofNullable(dAGInfo.getDagInvokeMsg()).orElseGet(() -> {
            dAGInfo.setDagInvokeMsg(new DAGInvokeMsg());
            return dAGInfo.getDagInvokeMsg();
        });
        return (List) Optional.ofNullable(dAGInvokeMsg.getInvokeTimeInfos()).orElseGet(() -> {
            dAGInvokeMsg.setInvokeTimeInfos(Lists.newArrayList());
            return dAGInvokeMsg.getInvokeTimeInfos();
        });
    }

    public void resetTask(String str, List<String> list, Map<String, Object> map) {
        this.dagStorageProcedure.lockAndRun(LockerKey.buildDagInfoLockName(str), () -> {
            DAGInfo dAGInfo = this.dagInfoStorage.getDAGInfo(str);
            if (dAGInfo == null) {
                throw new DAGTraversalException(TraversalErrorCode.DAG_NOT_FOUND.getCode(), "can not find dag info");
            }
            ArrayList newArrayList = Lists.newArrayList();
            if (CollectionUtils.isNotEmpty(list)) {
                Stream filter = list.stream().filter((v0) -> {
                    return StringUtils.isNotBlank(v0);
                }).map(str2 -> {
                    return DAGWalkHelper.getInstance().getAncestorTaskName(str2);
                }).distinct().map(str3 -> {
                    return (TaskInfo) dAGInfo.getTasks().get(str3);
                }).filter((v0) -> {
                    return Objects.nonNull(v0);
                }).filter(taskInfo -> {
                    return taskInfo.getTaskStatus() != TaskStatus.NOT_STARTED;
                });
                Objects.requireNonNull(newArrayList);
                filter.forEach((v1) -> {
                    r1.add(v1);
                });
            } else {
                Stream filter2 = dAGInfo.getTasks().values().stream().filter(taskInfo2 -> {
                    return taskInfo2.getTaskStatus() != TaskStatus.NOT_STARTED;
                }).filter(taskInfo3 -> {
                    return !taskInfo3.getTaskStatus().isSuccessOrSkip();
                });
                Objects.requireNonNull(newArrayList);
                filter2.forEach((v1) -> {
                    r1.add(v1);
                });
            }
            Optional.of(newArrayList).filter((v0) -> {
                return CollectionUtils.isNotEmpty(v0);
            }).ifPresent(list2 -> {
                if (dAGInfo.getDagStatus().isCompleted()) {
                    updateDAGInvokeStartTime(dAGInfo);
                }
                dAGInfo.setDagStatus(DAGStatus.RUNNING);
                resetTaskStatus(1, list2);
                this.dagInfoStorage.clearDAGInfo(str, 0);
                this.dagInfoStorage.saveDAGInfo(str, dAGInfo);
            });
            Optional.ofNullable(map).filter(MapUtils::isNotEmpty).ifPresent(map2 -> {
                this.dagContextStorage.updateContext(str, map2);
            });
        });
    }

    private void resetTaskStatus(int i, List<TaskInfo> list) {
        if (CollectionUtils.isEmpty(list) || i > 1000) {
            return;
        }
        list.forEach(taskInfo -> {
            taskInfo.setTaskStatus(TaskStatus.NOT_STARTED);
            taskInfo.setChildren(new LinkedHashMap());
            taskInfo.setSubGroupIndexToStatus((Map) null);
            taskInfo.setSubGroupIndexToIdentity((Map) null);
            resetTaskStatus(i + 1, taskInfo.getNext());
        });
    }

    @Generated
    public void setStasher(Stasher stasher) {
        this.stasher = stasher;
    }
}
