package com.weibo.rill.flow.olympicene.traversal.checker;

import com.google.common.collect.Lists;
import com.weibo.rill.flow.olympicene.storage.redis.api.RedisClient;
import com.weibo.rill.flow.olympicene.storage.redis.lock.RedisScriptLoader;
import com.weibo.rill.flow.olympicene.traversal.constant.TraversalErrorCode;
import com.weibo.rill.flow.olympicene.traversal.exception.DAGTraversalException;
import com.weibo.rill.flow.olympicene.traversal.runners.TimeCheckRunner;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/weibo/rill/flow/olympicene/traversal/checker/DefaultTimeChecker.class */
public class DefaultTimeChecker implements TimeChecker {
    private static final Logger log = LoggerFactory.getLogger(DefaultTimeChecker.class);
    private static final String REDIS_GET_TIMEOUT;
    private RedisClient redisClient;
    private TimeCheckRunner timeCheckRunner;

    public DefaultTimeChecker(int i, RedisClient redisClient) {
        this.redisClient = redisClient;
        initCheckThread(i);
    }

    protected String timeCheckKey() {
        return "all_time_check_redis_key";
    }

    protected String buildTimeCheckRedisKey(String str) {
        log.debug("buildTimeCheckRedisKey executionId:{}", str);
        return "time_check";
    }

    @Override // com.weibo.rill.flow.olympicene.traversal.checker.TimeChecker
    public boolean addMemberToCheckPool(String str, String str2, long j) {
        try {
            log.info("addMemberToCheckPool executionId:{}, member:{}, time:{}", new Object[]{str, str2, Long.valueOf(j)});
            String buildTimeCheckRedisKey = buildTimeCheckRedisKey(str);
            this.redisClient.zadd(buildTimeCheckRedisKey, j, str2);
            this.redisClient.zadd(timeCheckKey(), System.currentTimeMillis(), buildTimeCheckRedisKey);
            return true;
        } catch (Exception e) {
            log.warn("addMemberToCheckPool fails, executionId:{} member:{} time:{}", new Object[]{str, str2, Long.valueOf(j), e});
            return false;
        }
    }

    @Override // com.weibo.rill.flow.olympicene.traversal.checker.TimeChecker
    public boolean remMemberFromCheckPool(String str, String str2) {
        try {
            log.info("remMemberFromCheckPool executionId:{}, member:{}", str, str2);
            this.redisClient.zrem(buildTimeCheckRedisKey(str), str2);
            return true;
        } catch (Exception e) {
            log.warn("recordDAGCompleted fails, executionId:{} member:{}", new Object[]{str, str2, e});
            return false;
        }
    }

    @Override // com.weibo.rill.flow.olympicene.traversal.checker.TimeChecker
    public void initCheckThread(int i) {
        Executors.newScheduledThreadPool(1).scheduleAtFixedRate(this::timeCheck, i, i, TimeUnit.SECONDS);
    }

    protected void timeCheck() {
        try {
            log.info("timeCheck start");
            Set zrangeByScore = this.redisClient.zrangeByScore(timeCheckKey(), 0.0d, System.currentTimeMillis());
            log.info("timeCheck keys size:{}", Integer.valueOf(CollectionUtils.isEmpty(zrangeByScore) ? 0 : zrangeByScore.size()));
            if (CollectionUtils.isEmpty(zrangeByScore)) {
                return;
            }
            Consumer consumer = str -> {
                try {
                    this.timeCheckRunner.handleTimeCheck(str);
                } catch (Exception e) {
                    log.warn("timeCheck fails, member:{}", str, e);
                }
            };
            zrangeByScore.stream().filter((v0) -> {
                return StringUtils.isNotEmpty(v0);
            }).forEach(str2 -> {
                doCheck(str2, consumer);
            });
        } catch (Exception e) {
            log.warn("timeCheck fails, ", e);
        }
    }

    protected void doCheck(String str, Consumer<String> consumer) {
        try {
            log.info("doCheck start redisKey:{}", str);
            ArrayList newArrayList = Lists.newArrayList(new String[]{str});
            ArrayList newArrayList2 = Lists.newArrayList(new String[]{"0", String.valueOf(System.currentTimeMillis()), "0", "30"});
            while (true) {
                List list = (List) Optional.ofNullable((List) this.redisClient.eval(REDIS_GET_TIMEOUT, str, newArrayList, newArrayList2)).map(list2 -> {
                    return (List) list2.stream().filter((v0) -> {
                        return Objects.nonNull(v0);
                    }).map(bArr -> {
                        return new String(bArr, StandardCharsets.UTF_8);
                    }).collect(Collectors.toList());
                }).orElse(Lists.newArrayList());
                if (CollectionUtils.isEmpty(list)) {
                    return;
                } else {
                    list.forEach(str2 -> {
                        log.info("doCheck begin to check member:{}", str2);
                        consumer.accept(str2);
                    });
                }
            }
        } catch (Exception e) {
            log.warn("doCheck fails, redisKey:{}", str, e);
        }
    }

    public DefaultTimeChecker() {
    }

    public void setRedisClient(RedisClient redisClient) {
        this.redisClient = redisClient;
    }

    public void setTimeCheckRunner(TimeCheckRunner timeCheckRunner) {
        this.timeCheckRunner = timeCheckRunner;
    }

    static {
        try {
            REDIS_GET_TIMEOUT = RedisScriptLoader.loadResourceAsText("lua/redis_get_timeout.lua");
        } catch (IOException e) {
            throw new DAGTraversalException(TraversalErrorCode.OPERATION_UNSUPPORTED.getCode(), "cannot load redis_get_timeout.lua");
        }
    }
}
