package com.weibo.rill.flow.olympicene.traversal.runners;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Predicate;
import com.weibo.rill.flow.interfaces.model.task.TaskInfo;
import com.weibo.rill.flow.interfaces.model.task.TaskInvokeMsg;
import com.weibo.rill.flow.interfaces.model.task.TaskStatus;
import com.weibo.rill.flow.olympicene.core.helper.DAGWalkHelper;
import com.weibo.rill.flow.olympicene.core.helper.TaskInfoMaker;
import com.weibo.rill.flow.olympicene.core.model.NotifyInfo;
import com.weibo.rill.flow.olympicene.core.model.task.ExecutionResult;
import com.weibo.rill.flow.olympicene.core.runtime.DAGContextStorage;
import com.weibo.rill.flow.olympicene.core.runtime.DAGInfoStorage;
import com.weibo.rill.flow.olympicene.core.runtime.DAGStorageProcedure;
import com.weibo.rill.flow.olympicene.core.switcher.SwitcherManager;
import com.weibo.rill.flow.olympicene.traversal.helper.ContextHelper;
import com.weibo.rill.flow.olympicene.traversal.mappings.InputOutputMapping;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/weibo/rill/flow/olympicene/traversal/runners/ChoiceTaskRunner.class */
public class ChoiceTaskRunner extends AbstractTaskRunner {
    private static final Logger log = LoggerFactory.getLogger(ChoiceTaskRunner.class);

    public ChoiceTaskRunner(InputOutputMapping inputOutputMapping, DAGContextStorage dAGContextStorage, DAGInfoStorage dAGInfoStorage, DAGStorageProcedure dAGStorageProcedure, SwitcherManager switcherManager) {
        super(inputOutputMapping, dAGInfoStorage, dAGContextStorage, dAGStorageProcedure, switcherManager);
    }

    @Override // com.weibo.rill.flow.olympicene.traversal.runners.AbstractTaskRunner
    protected ExecutionResult doRun(String str, TaskInfo taskInfo, Map<String, Object> map) {
        log.info("choice task begin to run executionId:{}, taskInfoName:{}", str, taskInfo.getName());
        List choices = taskInfo.getTask().getChoices();
        if (CollectionUtils.isEmpty(choices)) {
            taskInfo.updateInvokeMsg(TaskInvokeMsg.builder().msg("choices collection empty").build());
            updateTaskInvokeEndTime(taskInfo);
            taskInfo.setTaskStatus(TaskStatus.SUCCEED);
            this.dagInfoStorage.saveTaskInfos(str, ImmutableSet.of(taskInfo));
            return ExecutionResult.builder().taskStatus(taskInfo.getTaskStatus()).build();
        }
        log.info("choice group size:{} executionId:{}, taskInfoName:{}", new Object[]{Integer.valueOf(choices.size()), str, taskInfo.getName()});
        ConcurrentMap newConcurrentMap = Maps.newConcurrentMap();
        taskInfo.setSubGroupIndexToStatus(newConcurrentMap);
        taskInfo.setTaskStatus(TaskStatus.RUNNING);
        AtomicInteger atomicInteger = new AtomicInteger(0);
        ArrayList newArrayList = Lists.newArrayList();
        choices.stream().sorted((choice, choice2) -> {
            return choice.getCondition().compareToIgnoreCase(choice2.getCondition());
        }).forEach(choice3 -> {
            int andIncrement = atomicInteger.getAndIncrement();
            HashSet hashSet = new HashSet(TaskInfoMaker.getMaker().makeTaskInfos(choice3.getTasks(), taskInfo, Integer.valueOf(andIncrement)).values());
            taskInfo.setChildren((Map) Optional.ofNullable(taskInfo.getChildren()).orElse(Maps.newConcurrentMap()));
            taskInfo.getChildren().putAll((Map) hashSet.stream().collect(Collectors.toMap((v0) -> {
                return v0.getName();
            }, taskInfo2 -> {
                return taskInfo2;
            })));
            newConcurrentMap.put(String.valueOf(andIncrement), TaskStatus.READY);
            boolean z = false;
            try {
                z = !((List) JsonPath.using(this.valuePathConf).parse(ImmutableMap.of("input", map)).read(choice3.getCondition(), new Predicate[0])).isEmpty();
            } catch (Exception e) {
                log.warn("choiceTask {} evaluation condition expression {} exception. ", new Object[]{taskInfo.getName(), choice3.getCondition(), e});
            }
            if (!z) {
                hashSet.forEach(taskInfo3 -> {
                    taskInfo3.setTaskStatus(TaskStatus.SKIPPED);
                });
                taskInfo.getSubGroupIndexToStatus().put(String.valueOf(andIncrement), TaskStatus.SKIPPED);
                return;
            }
            ConcurrentMap newConcurrentMap2 = Maps.newConcurrentMap();
            newConcurrentMap2.putAll(map);
            HashMap newHashMap = Maps.newHashMap();
            newHashMap.put(DAGWalkHelper.getInstance().buildSubTaskContextFieldName(((TaskInfo) hashSet.iterator().next()).getRouteName()), newConcurrentMap2);
            newArrayList.add(Pair.of(hashSet, newHashMap));
        });
        Map map2 = (Map) newArrayList.stream().map((v0) -> {
            return v0.getRight();
        }).flatMap(map3 -> {
            return map3.entrySet().stream();
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }, (obj, obj2) -> {
            return obj2;
        }));
        TaskStatus calculateParentStatus = DAGWalkHelper.getInstance().calculateParentStatus(taskInfo);
        if (calculateParentStatus.isCompleted()) {
            taskInfo.setTaskStatus(calculateParentStatus);
            updateTaskInvokeEndTime(taskInfo);
        }
        this.dagContextStorage.updateContext(str, map2);
        this.dagInfoStorage.saveTaskInfos(str, ImmutableSet.of(taskInfo));
        log.info("run choice task completed, executionId:{}, taskInfoName:{}", str, taskInfo.getName());
        return ExecutionResult.builder().taskStatus(taskInfo.getTaskStatus()).subTaskInfosAndContext(newArrayList).build();
    }

    @Override // com.weibo.rill.flow.olympicene.traversal.runners.AbstractTaskRunner, com.weibo.rill.flow.olympicene.traversal.runners.TaskRunner
    public ExecutionResult finish(String str, NotifyInfo notifyInfo, Map<String, Object> map) {
        return finishParentTask(str, notifyInfo);
    }

    @Override // com.weibo.rill.flow.olympicene.traversal.runners.AbstractTaskRunner
    protected Map<String, Object> getSubTaskContextMap(String str, TaskInfo taskInfo) {
        List<Map<String, Object>> subContextList = ContextHelper.getInstance().getSubContextList(this.dagContextStorage, str, taskInfo);
        ConcurrentMap newConcurrentMap = Maps.newConcurrentMap();
        Stream<Map<String, Object>> filter = subContextList.stream().filter(MapUtils::isNotEmpty);
        Objects.requireNonNull(newConcurrentMap);
        filter.forEach(newConcurrentMap::putAll);
        return newConcurrentMap;
    }
}
