package com.weibo.rill.flow.olympicene.traversal;

import com.google.common.collect.Maps;
import com.weibo.rill.flow.interfaces.model.task.TaskInfo;
import com.weibo.rill.flow.olympicene.core.concurrent.ExecutionRunnable;
import com.weibo.rill.flow.olympicene.core.constant.SystemConfig;
import com.weibo.rill.flow.olympicene.core.model.DAGSettings;
import com.weibo.rill.flow.olympicene.core.model.NotifyInfo;
import com.weibo.rill.flow.olympicene.core.model.dag.DAG;
import com.weibo.rill.flow.olympicene.core.model.dag.DAGResult;
import com.weibo.rill.flow.olympicene.core.model.task.TaskCategory;
import com.weibo.rill.flow.olympicene.core.result.DAGResultHandler;
import com.weibo.rill.flow.olympicene.core.runtime.DAGInfoStorage;
import com.weibo.rill.flow.olympicene.core.runtime.DAGInteraction;
import com.weibo.rill.flow.olympicene.traversal.constant.TraversalErrorCode;
import com.weibo.rill.flow.olympicene.traversal.exception.DAGTraversalException;
import com.weibo.rill.flow.olympicene.traversal.helper.PluginHelper;
import com.weibo.rill.flow.olympicene.traversal.notify.NotifyType;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/weibo/rill/flow/olympicene/traversal/Olympicene.class */
public class Olympicene implements DAGInteraction {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(Olympicene.class);
    private final DAGInfoStorage dagInfoStorage;
    private final DAGOperations dagOperations;
    private final ExecutorService notifyExecutor;
    private final DAGResultHandler dagResultHandler;
    private long dagResultGetTimeoutInMillisecond = 5000;

    public Olympicene(DAGInfoStorage dAGInfoStorage, DAGOperations dAGOperations, ExecutorService executorService, DAGResultHandler dAGResultHandler) {
        this.dagInfoStorage = dAGInfoStorage;
        this.dagOperations = dAGOperations;
        this.notifyExecutor = executorService;
        this.dagResultHandler = dAGResultHandler;
    }

    public void submit(String str, DAG dag, Map<String, Object> map) {
        submit(str, dag, map, DAGSettings.DEFAULT, null);
    }

    public void submit(String str, DAG dag, Map<String, Object> map, DAGSettings dAGSettings, NotifyInfo notifyInfo) {
        runNotify(str, NotifyType.SUBMIT, notifyInfo, () -> {
            this.dagOperations.submitDAG(str, dag, dAGSettings, map, notifyInfo);
        });
    }

    public void runNotify(String str, NotifyType notifyType, NotifyInfo notifyInfo, Runnable runnable) {
        this.notifyExecutor.execute(new ExecutionRunnable(str, () -> {
            doRunNotify(str, notifyType, notifyInfo, runnable);
        }));
    }

    public void doRunNotify(String str, NotifyType notifyType, NotifyInfo notifyInfo, Runnable runnable) {
        try {
            log.info("runNotify start executionId:{}, notifyType:{}, notifyInfo:{}", new Object[]{str, notifyType, notifyInfo});
            HashMap newHashMap = Maps.newHashMap();
            newHashMap.put("executionId", str);
            newHashMap.put("notifyType", notifyType);
            newHashMap.put("notifyInfo", notifyInfo);
            PluginHelper.pluginInvokeChain(runnable, newHashMap, (List<BiConsumer<Runnable, Map<String, Object>>>) SystemConfig.NOTIFY_CUSTOMIZED_PLUGINS).run();
        } catch (Exception e) {
            log.error("runNotify fails, executionId:{}, notifyType:{}, notifyInfo:{}", new Object[]{str, notifyType, notifyInfo, e});
            throw e;
        }
    }

    public void finish(String str, DAGSettings dAGSettings, Map<String, Object> map, NotifyInfo notifyInfo) {
        if (notifyInfo == null || StringUtils.isEmpty(notifyInfo.getTaskInfoName())) {
            log.warn("finish can not get dag taskName executionId:{}", str);
            throw new DAGTraversalException(TraversalErrorCode.OPERATION_UNSUPPORTED.getCode(), "finish taskName can not be null");
        }
        runNotify(str, NotifyType.FINISH, notifyInfo, () -> {
            this.dagOperations.finishTaskSync(str, "function", notifyInfo, map);
        });
    }

    public void wakeup(String str, Map<String, Object> map, NotifyInfo notifyInfo) {
        runNotify(str, NotifyType.WAKEUP, notifyInfo, () -> {
            Optional.ofNullable(notifyInfo.getTaskInfoName()).filter((v0) -> {
                return StringUtils.isNotEmpty(v0);
            }).ifPresent(str2 -> {
                this.dagOperations.finishTaskSync(str, TaskCategory.SUSPENSE.getValue(), notifyInfo, map);
            });
            Optional.ofNullable(notifyInfo.getParentTaskInfoName()).filter((v0) -> {
                return StringUtils.isNotEmpty(v0);
            }).filter(str3 -> {
                return !str3.equals(notifyInfo.getTaskInfoName());
            }).ifPresent(str4 -> {
                wakeupSubTasks(str, map, str4);
            });
        });
    }

    private void wakeupSubTasks(String str, Map<String, Object> map, String str2) {
        TaskInfo taskInfo = this.dagInfoStorage.getTaskInfo(str, str2);
        if (taskInfo.getTaskStatus().isCompleted()) {
            log.info("wakeupSubTasks parent task is completed, executionId:{}, parentTaskInfoName:{}", str, str2);
        } else {
            ((Set) taskInfo.getChildren().values().stream().filter((v0) -> {
                return Objects.nonNull(v0);
            }).filter(taskInfo2 -> {
                return Objects.equals(taskInfo2.getTask().getCategory(), TaskCategory.SUSPENSE.getValue());
            }).filter(taskInfo3 -> {
                return !taskInfo3.getTaskStatus().isCompleted();
            }).map((v0) -> {
                return v0.getName();
            }).collect(Collectors.toSet())).forEach(str3 -> {
                this.dagOperations.finishTaskSync(str, TaskCategory.SUSPENSE.getValue(), NotifyInfo.builder().taskInfoName(str3).build(), map);
            });
        }
    }

    public void redo(String str, Map<String, Object> map, NotifyInfo notifyInfo) {
        if (StringUtils.isEmpty(str)) {
            log.warn("redo executionId empty");
            throw new DAGTraversalException(TraversalErrorCode.OPERATION_UNSUPPORTED.getCode(), "redo executionId can not be empty");
        }
        List list = (List) Optional.ofNullable(notifyInfo).map((v0) -> {
            return v0.getTaskInfoNames();
        }).orElse(Collections.emptyList());
        runNotify(str, NotifyType.REDO, notifyInfo, () -> {
            this.dagOperations.redoTask(str, list, map);
        });
    }

    public DAGResult run(String str, DAG dag, Map<String, Object> map) {
        return run(str, dag, map, DAGSettings.DEFAULT, null);
    }

    public DAGResult run(String str, DAG dag, Map<String, Object> map, DAGSettings dAGSettings, NotifyInfo notifyInfo) {
        return run(str, dag, map, dAGSettings, notifyInfo, this.dagResultGetTimeoutInMillisecond);
    }

    public DAGResult run(String str, DAG dag, Map<String, Object> map, long j) {
        return run(str, dag, map, DAGSettings.DEFAULT, null, j);
    }

    public DAGResult run(String str, DAG dag, Map<String, Object> map, DAGSettings dAGSettings, NotifyInfo notifyInfo, long j) {
        if (this.dagResultHandler == null) {
            log.warn("run nonsupport due to dagResultHandler is null executionId:{}", str);
            throw new DAGTraversalException(TraversalErrorCode.OPERATION_UNSUPPORTED.getCode(), "run nonsupport due to dagResultHandler is null");
        }
        this.dagResultHandler.initEnv(str);
        doRunNotify(str, NotifyType.RUN, notifyInfo, () -> {
            this.dagOperations.submitDAG(str, dag, dAGSettings, map, notifyInfo);
        });
        return this.dagResultHandler.getDAGResult(str, j);
    }

    @Generated
    public void setDagResultGetTimeoutInMillisecond(long j) {
        this.dagResultGetTimeoutInMillisecond = j;
    }
}
