package com.weibo.rill.flow.olympicene.traversal.runners;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.weibo.rill.flow.interfaces.model.mapping.Mapping;
import com.weibo.rill.flow.interfaces.model.task.TaskInfo;
import com.weibo.rill.flow.interfaces.model.task.TaskInvokeMsg;
import com.weibo.rill.flow.interfaces.model.task.TaskStatus;
import com.weibo.rill.flow.olympicene.core.helper.DAGWalkHelper;
import com.weibo.rill.flow.olympicene.core.helper.TaskInfoMaker;
import com.weibo.rill.flow.olympicene.core.model.NotifyInfo;
import com.weibo.rill.flow.olympicene.core.model.mapping.IterationMapping;
import com.weibo.rill.flow.olympicene.core.model.strategy.Synchronization;
import com.weibo.rill.flow.olympicene.core.model.task.ExecutionResult;
import com.weibo.rill.flow.olympicene.core.model.task.ForeachTask;
import com.weibo.rill.flow.olympicene.core.model.task.TaskCategory;
import com.weibo.rill.flow.olympicene.core.runtime.DAGContextStorage;
import com.weibo.rill.flow.olympicene.core.runtime.DAGInfoStorage;
import com.weibo.rill.flow.olympicene.core.runtime.DAGStorageProcedure;
import com.weibo.rill.flow.olympicene.core.switcher.SwitcherManager;
import com.weibo.rill.flow.olympicene.traversal.helper.Stasher;
import com.weibo.rill.flow.olympicene.traversal.mappings.InputOutputMapping;
import com.weibo.rill.flow.olympicene.traversal.mappings.JSONPath;
import com.weibo.rill.flow.olympicene.traversal.utils.ConditionsUtil;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import lombok.Generated;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/weibo/rill/flow/olympicene/traversal/runners/ForeachTaskRunner.class */
public class ForeachTaskRunner extends AbstractTaskRunner {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(ForeachTaskRunner.class);
    private final JSONPath jsonPath;
    private Stasher stasher;

    public ForeachTaskRunner(InputOutputMapping inputOutputMapping, JSONPath jSONPath, DAGContextStorage dAGContextStorage, DAGInfoStorage dAGInfoStorage, DAGStorageProcedure dAGStorageProcedure, SwitcherManager switcherManager) {
        super(inputOutputMapping, dAGInfoStorage, dAGContextStorage, dAGStorageProcedure, switcherManager);
        this.jsonPath = jSONPath;
    }

    @Override // com.weibo.rill.flow.olympicene.traversal.runners.AbstractTaskRunner
    public TaskCategory getCategory() {
        return TaskCategory.FOREACH;
    }

    @Override // com.weibo.rill.flow.olympicene.traversal.runners.AbstractTaskRunner
    public String getIcon() {
        return "ant-design:sync-outlined";
    }

    @Override // com.weibo.rill.flow.olympicene.traversal.runners.AbstractTaskRunner
    protected ExecutionResult doRun(String str, TaskInfo taskInfo, Map<String, Object> map) {
        log.info("foreach task begin to run executionId:{}, taskInfoName:{}", str, taskInfo.getName());
        ForeachTask task = taskInfo.getTask();
        IterationMapping iterationMapping = task.getIterationMapping();
        Collection collection = (Collection) this.jsonPath.getValue(ImmutableMap.of("input", map), iterationMapping.getCollection());
        if (CollectionUtils.isEmpty(collection) || CollectionUtils.isEmpty(task.getTasks())) {
            taskInfo.updateInvokeMsg(TaskInvokeMsg.builder().msg("loop collection or subTasks empty").build());
            updateTaskInvokeEndTime(taskInfo);
            taskInfo.setTaskStatus(TaskStatus.SUCCEED);
            this.dagInfoStorage.saveTaskInfos(str, ImmutableSet.of(taskInfo));
            return ExecutionResult.builder().taskStatus(taskInfo.getTaskStatus()).build();
        }
        log.info("foreach group size:{} executionId:{}, taskInfoName:{}", new Object[]{Integer.valueOf(collection.size()), str, taskInfo.getName()});
        int maxGroupsToRun = maxGroupsToRun(str, taskInfo, map);
        ConcurrentMap newConcurrentMap = Maps.newConcurrentMap();
        taskInfo.setSubGroupIndexToStatus(newConcurrentMap);
        ConcurrentMap newConcurrentMap2 = Maps.newConcurrentMap();
        taskInfo.setSubGroupKeyJudgementMapping(newConcurrentMap2);
        taskInfo.setTaskStatus(TaskStatus.RUNNING);
        taskInfo.setChildren((Map) Optional.ofNullable(taskInfo.getChildren()).orElse(Maps.newConcurrentMap()));
        this.jsonPath.delete(ImmutableMap.of("input", map), iterationMapping.getCollection());
        AtomicInteger atomicInteger = new AtomicInteger(0);
        HashMap newHashMap = Maps.newHashMap();
        ArrayList newArrayList = Lists.newArrayList();
        collection.forEach(obj -> {
            int andIncrement = atomicInteger.getAndIncrement();
            HashSet hashSet = new HashSet(TaskInfoMaker.getMaker().makeTaskInfos(task.getTasks(), taskInfo, Integer.valueOf(andIncrement)).values());
            ConcurrentMap newConcurrentMap3 = Maps.newConcurrentMap();
            newConcurrentMap3.putAll(map);
            newConcurrentMap3.put(iterationMapping.getItem(), obj);
            if (existKeyExp(taskInfo)) {
                Iterator it = hashSet.iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    TaskInfo taskInfo2 = (TaskInfo) it.next();
                    boolean isKeySubTask = isKeySubTask(str, newConcurrentMap3, taskInfo2);
                    if (existKeyExp(taskInfo2) && isKeySubTask) {
                        newConcurrentMap2.put(String.valueOf(andIncrement), true);
                        break;
                    }
                }
            }
            taskInfo.getChildren().putAll((Map) hashSet.stream().collect(Collectors.toMap((v0) -> {
                return v0.getName();
            }, taskInfo3 -> {
                return taskInfo3;
            })));
            newConcurrentMap.put(String.valueOf(andIncrement), TaskStatus.READY);
            updateGroupIdentity(str, obj, taskInfo, iterationMapping.getIdentity(), Integer.valueOf(andIncrement));
            HashMap newHashMap2 = Maps.newHashMap();
            newHashMap2.put(DAGWalkHelper.getInstance().buildSubTaskContextFieldName(((TaskInfo) hashSet.iterator().next()).getRouteName()), newConcurrentMap3);
            newHashMap.putAll(newHashMap2);
            if (maxGroupsToRun <= 0 || andIncrement < maxGroupsToRun) {
                newArrayList.add(Pair.of(hashSet, newHashMap2));
                newConcurrentMap.put(String.valueOf(andIncrement), TaskStatus.RUNNING);
            }
        });
        this.dagContextStorage.updateContext(str, newHashMap);
        this.dagInfoStorage.saveTaskInfos(str, ImmutableSet.of(taskInfo));
        log.info("run foreach task completed, executionId:{}, taskInfoName:{}", str, taskInfo.getName());
        return ExecutionResult.builder().taskStatus(taskInfo.getTaskStatus()).subTaskInfosAndContext(newArrayList).build();
    }

    private boolean isKeySubTask(String str, Map<String, Object> map, TaskInfo taskInfo) {
        return !this.stasher.needStash(str, taskInfo, map);
    }

    private boolean existKeyExp(TaskInfo taskInfo) {
        return Optional.ofNullable(taskInfo).map((v0) -> {
            return v0.getTask();
        }).map((v0) -> {
            return v0.getKeyExp();
        }).isPresent();
    }

    private int maxGroupsToRun(String str, TaskInfo taskInfo, Map<String, Object> map) {
        try {
            Synchronization synchronization = taskInfo.getTask().getSynchronization();
            if (synchronization == null || CollectionUtils.isEmpty(synchronization.getConditions()) || StringUtils.isBlank(synchronization.getMaxConcurrency())) {
                return -1;
            }
            if (!ConditionsUtil.conditionsAllMatch(synchronization.getConditions(), map, "input")) {
                log.warn("maxGroupsToRun conditions mismatch executionId:{}, taskInfoName:{}", str, taskInfo.getName());
                return -1;
            }
            HashMap newHashMap = Maps.newHashMap();
            inputMappings(Maps.newHashMap(), map, newHashMap, Lists.newArrayList(new Mapping[]{new Mapping(synchronization.getMaxConcurrency(), "$.output.maxConcurrency")}));
            int intValue = ((Integer) Optional.ofNullable(newHashMap.get("maxConcurrency")).map(String::valueOf).map(Integer::valueOf).filter(num -> {
                return ((long) num.intValue()) > 0;
            }).orElse(-1)).intValue();
            log.warn("maxGroupsToRun executionId:{}, taskInfoName:{}, maxConcurrency:{}", new Object[]{str, taskInfo.getName(), Integer.valueOf(intValue)});
            return intValue;
        } catch (Exception e) {
            log.warn("maxGroupsToRun fails, executionId:{}, taskInfoName:{}, errorMsg:{}", new Object[]{str, taskInfo.getName(), e.getMessage()});
            return -1;
        }
    }

    private void updateGroupIdentity(String str, Object obj, TaskInfo taskInfo, String str2, Integer num) {
        try {
            if (StringUtils.isBlank(str2)) {
                return;
            }
            HashMap newHashMap = Maps.newHashMap();
            newHashMap.put("element", obj);
            HashMap newHashMap2 = Maps.newHashMap();
            inputMappings(new HashMap(), newHashMap, newHashMap2, Lists.newArrayList(new Mapping[]{new Mapping(str2.replace("$.iteration.element", "$.input.element"), "$.output.identity")}));
            String str3 = (String) Optional.ofNullable(newHashMap2.get("identity")).map(String::valueOf).orElse(null);
            if (StringUtils.isNotBlank(str3)) {
                taskInfo.setSubGroupIndexToIdentity((Map) Optional.ofNullable(taskInfo.getSubGroupIndexToIdentity()).orElse(Maps.newConcurrentMap()));
                taskInfo.getSubGroupIndexToIdentity().put(String.valueOf(num), str3);
            }
        } catch (Exception e) {
            log.warn("updateGroupIdentity fails, executionId:{}, taskInfoName:{}, groupIndex:{}, errorMsg:{}", new Object[]{str, taskInfo.getName(), num, e.getMessage()});
        }
    }

    @Override // com.weibo.rill.flow.olympicene.traversal.runners.AbstractTaskRunner, com.weibo.rill.flow.olympicene.traversal.runners.TaskRunner
    public ExecutionResult finish(String str, NotifyInfo notifyInfo, Map<String, Object> map) {
        return finishParentTask(str, notifyInfo);
    }

    @Generated
    public void setStasher(Stasher stasher) {
        this.stasher = stasher;
    }
}
