package com.weibo.rill.flow.olympicene.traversal.runners;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.weibo.rill.flow.interfaces.model.task.TaskInfo;
import com.weibo.rill.flow.interfaces.model.task.TaskInvokeMsg;
import com.weibo.rill.flow.interfaces.model.task.TaskStatus;
import com.weibo.rill.flow.olympicene.core.lock.LockerKey;
import com.weibo.rill.flow.olympicene.core.model.NotifyInfo;
import com.weibo.rill.flow.olympicene.core.model.task.ExecutionResult;
import com.weibo.rill.flow.olympicene.core.model.task.SuspenseTask;
import com.weibo.rill.flow.olympicene.core.model.task.TaskCategory;
import com.weibo.rill.flow.olympicene.core.runtime.DAGContextStorage;
import com.weibo.rill.flow.olympicene.core.runtime.DAGInfoStorage;
import com.weibo.rill.flow.olympicene.core.runtime.DAGStorageProcedure;
import com.weibo.rill.flow.olympicene.core.switcher.SwitcherManager;
import com.weibo.rill.flow.olympicene.traversal.constant.TraversalErrorCode;
import com.weibo.rill.flow.olympicene.traversal.exception.DAGTraversalException;
import com.weibo.rill.flow.olympicene.traversal.helper.ContextHelper;
import com.weibo.rill.flow.olympicene.traversal.mappings.InputOutputMapping;
import com.weibo.rill.flow.olympicene.traversal.utils.ConditionsUtil;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Generated;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/weibo/rill/flow/olympicene/traversal/runners/SuspenseTaskRunner.class */
public class SuspenseTaskRunner extends AbstractTaskRunner {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(SuspenseTaskRunner.class);

    public SuspenseTaskRunner(InputOutputMapping inputOutputMapping, DAGInfoStorage dAGInfoStorage, DAGContextStorage dAGContextStorage, DAGStorageProcedure dAGStorageProcedure, SwitcherManager switcherManager) {
        super(inputOutputMapping, dAGInfoStorage, dAGContextStorage, dAGStorageProcedure, switcherManager);
    }

    @Override // com.weibo.rill.flow.olympicene.traversal.runners.AbstractTaskRunner
    public TaskCategory getCategory() {
        return TaskCategory.SUSPENSE;
    }

    @Override // com.weibo.rill.flow.olympicene.traversal.runners.AbstractTaskRunner
    public String getIcon() {
        return "ant-design:pause-circle-outlined";
    }

    @Override // com.weibo.rill.flow.olympicene.traversal.runners.AbstractTaskRunner
    protected ExecutionResult doRun(String str, TaskInfo taskInfo, Map<String, Object> map) {
        log.info("suspense task begin to run executionId:{}, taskInfoName:{}", str, taskInfo.getName());
        ExecutionResult build = ExecutionResult.builder().build();
        this.dagStorageProcedure.lockAndRun(LockerKey.buildTaskInfoLockName(str, taskInfo.getName()), () -> {
            Map<String, Object> context = ContextHelper.getInstance().getContext(this.dagContextStorage, str, taskInfo);
            HashMap newHashMap = Maps.newHashMap();
            inputMappings(context, newHashMap, new HashMap(), taskInfo.getTask().getInputMappings());
            taskInfo.setTaskStatus(TaskStatus.RUNNING);
            tryWakeup(str, taskInfo, newHashMap);
            tryInterruptSuspense(str, taskInfo, newHashMap);
            this.dagInfoStorage.saveTaskInfos(str, ImmutableSet.of(taskInfo));
            build.setInput(newHashMap);
            build.setTaskStatus(taskInfo.getTaskStatus());
        });
        log.info("run suspense task completed, executionId:{}, taskInfoName:{}", str, taskInfo.getName());
        return build;
    }

    @Override // com.weibo.rill.flow.olympicene.traversal.runners.AbstractTaskRunner, com.weibo.rill.flow.olympicene.traversal.runners.TaskRunner
    public ExecutionResult finish(String str, NotifyInfo notifyInfo, Map<String, Object> map) {
        String taskInfoName = notifyInfo.getTaskInfoName();
        log.info("suspense wakeup begin to run executionId:{}, taskInfoName:{}, output empty:{}", new Object[]{str, taskInfoName, Boolean.valueOf(MapUtils.isEmpty(map))});
        AtomicReference atomicReference = new AtomicReference();
        AtomicReference atomicReference2 = new AtomicReference();
        this.dagStorageProcedure.lockAndRun(LockerKey.buildTaskInfoLockName(str, taskInfoName), () -> {
            validateDAGInfo(str);
            TaskInfo basicTaskInfo = this.dagInfoStorage.getBasicTaskInfo(str, taskInfoName);
            atomicReference.set(basicTaskInfo);
            taskInfoValid(str, taskInfoName, basicTaskInfo);
            if (notifyInfo.getTaskStatus() != null && notifyInfo.getTaskStatus().isCompleted()) {
                basicTaskInfo.updateInvokeMsg(notifyInfo.getTaskInvokeMsg());
                updateTaskInvokeEndTime(basicTaskInfo);
                basicTaskInfo.setTaskStatus(notifyInfo.getTaskStatus());
                this.dagInfoStorage.saveTaskInfos(str, ImmutableSet.of(basicTaskInfo));
                return;
            }
            log.info("suspense wakeup taskInfo current taskStatus:{}", basicTaskInfo.getTaskStatus());
            if (basicTaskInfo.getTaskStatus() == TaskStatus.RUNNING || !MapUtils.isEmpty(map)) {
                Map<String, Object> context = ContextHelper.getInstance().getContext(this.dagContextStorage, str, basicTaskInfo);
                atomicReference2.set(context);
                if (MapUtils.isNotEmpty(map) && CollectionUtils.isNotEmpty(basicTaskInfo.getTask().getOutputMappings())) {
                    outputMappings(context, new HashMap(), map, basicTaskInfo.getTask().getOutputMappings());
                    saveContext(str, context, Sets.newHashSet(new TaskInfo[]{basicTaskInfo}));
                }
                if (basicTaskInfo.getTaskStatus() != TaskStatus.RUNNING) {
                    return;
                }
                HashMap newHashMap = Maps.newHashMap();
                inputMappings(context, newHashMap, new HashMap(), basicTaskInfo.getTask().getInputMappings());
                if (tryWakeup(str, basicTaskInfo, newHashMap) || tryInterruptSuspense(str, basicTaskInfo, newHashMap)) {
                    this.dagInfoStorage.saveTaskInfos(str, Sets.newHashSet(new TaskInfo[]{basicTaskInfo}));
                }
            }
        });
        TaskInfo taskInfo = (TaskInfo) atomicReference.get();
        log.info("run suspense wakeup completed, executionId:{}, taskInfoName:{}, taskStatus:{}", new Object[]{str, taskInfoName, taskInfo.getTaskStatus()});
        return ExecutionResult.builder().taskStatus(taskInfo.getTaskStatus()).taskInfo(taskInfo).context((Map) atomicReference2.get()).build();
    }

    private boolean tryWakeup(String str, TaskInfo taskInfo, Map<String, Object> map) {
        if (taskInfo.getTaskStatus() != TaskStatus.RUNNING) {
            return false;
        }
        boolean isNeedWakeup = isNeedWakeup(taskInfo, map);
        log.info("suspense task need wakeup value:{}, executionId:{}, taskInfoName:{}", new Object[]{Boolean.valueOf(isNeedWakeup), str, taskInfo.getName()});
        if (isNeedWakeup) {
            taskInfo.setTaskStatus(TaskStatus.SUCCEED);
            updateTaskInvokeEndTime(taskInfo);
        }
        return isNeedWakeup;
    }

    private boolean tryInterruptSuspense(String str, TaskInfo taskInfo, Map<String, Object> map) {
        if (taskInfo.getTaskStatus() != TaskStatus.RUNNING) {
            return false;
        }
        boolean isNeedInterrupt = isNeedInterrupt(taskInfo, map);
        log.info("suspense task need interrupt value:{}, executionId:{}, taskInfoName:{}", new Object[]{Boolean.valueOf(isNeedInterrupt), str, taskInfo.getName()});
        if (isNeedInterrupt) {
            taskInfo.setTaskStatus(TaskStatus.FAILED);
            taskInfo.updateInvokeMsg(TaskInvokeMsg.builder().msg("interruption").build());
            updateTaskInvokeEndTime(taskInfo);
        }
        return isNeedInterrupt;
    }

    private boolean isNeedWakeup(TaskInfo taskInfo, Map<String, Object> map) {
        return ConditionsUtil.conditionsAllMatch(taskInfo.getTask().getConditions(), map, "input");
    }

    private boolean isNeedInterrupt(TaskInfo taskInfo, Map<String, Object> map) {
        SuspenseTask task = taskInfo.getTask();
        if (CollectionUtils.isEmpty(task.getInterruptions())) {
            return false;
        }
        return ConditionsUtil.conditionsAnyMatch(task.getInterruptions(), map, "input");
    }

    private void taskInfoValid(String str, String str2, TaskInfo taskInfo) {
        if (taskInfo == null) {
            log.info("taskInfoValid taskInfo null, executionId:{}, taskInfoName:{}", str, str2);
            throw new DAGTraversalException(TraversalErrorCode.DAG_ILLEGAL_STATE.getCode(), String.format("dag %s can not get task %s", str, str2));
        }
        if (!Objects.equals(taskInfo.getTask().getCategory(), TaskCategory.SUSPENSE.getValue())) {
            log.info("taskInfoValid taskInfo type is not suspense, executionId:{}, taskInfoName:{}", str, str2);
            throw new DAGTraversalException(TraversalErrorCode.DAG_ILLEGAL_STATE.getCode(), String.format("task %s category is not suspense type", str2));
        }
        if (taskInfo.getTaskStatus().isCompleted()) {
            log.info("taskInfoValid taskInfo is already complete, executionId:{}, taskInfoName:{}", str, str2);
            throw new DAGTraversalException(TraversalErrorCode.DAG_ILLEGAL_STATE.getCode(), String.format("repeated finish task %s", str2));
        }
    }
}
