package com.aizuda.snailjob.server.starter.dispatch;

import akka.actor.ActorRef;
import cn.hutool.core.collection.CollUtil;
import com.aizuda.snailjob.common.core.constant.SystemConstants;
import com.aizuda.snailjob.common.log.SnailJobLog;
import com.aizuda.snailjob.server.common.Lifecycle;
import com.aizuda.snailjob.server.common.akka.ActorGenerator;
import com.aizuda.snailjob.server.common.dto.DistributeInstance;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:com/aizuda/snailjob/server/starter/dispatch/DispatchService.class */
public class DispatchService implements Lifecycle {
    private final ScheduledExecutorService dispatchService = Executors.newSingleThreadScheduledExecutor(runnable -> {
        return new Thread(runnable, "dispatch-service");
    });
    private static final Logger log = LoggerFactory.getLogger(DispatchService.class);
    public static final Long PERIOD = SystemConstants.SCHEDULE_PERIOD;
    public static final Long INITIAL_DELAY = SystemConstants.SCHEDULE_INITIAL_DELAY;

    public void start() {
        ActorRef scanBucketActor = ActorGenerator.scanBucketActor();
        this.dispatchService.scheduleAtFixedRate(() -> {
            try {
                if (DistributeInstance.RE_BALANCE_ING.get()) {
                    SnailJobLog.LOCAL.info("正在rebalance中....", new Object[0]);
                    TimeUnit.SECONDS.sleep(INITIAL_DELAY.longValue());
                }
                Set<Integer> consumerBucket = getConsumerBucket();
                if (CollUtil.isNotEmpty(consumerBucket)) {
                    ConsumerBucket consumerBucket2 = new ConsumerBucket();
                    consumerBucket2.setBuckets(consumerBucket);
                    scanBucketActor.tell(consumerBucket2, scanBucketActor);
                }
            } catch (Exception e) {
                SnailJobLog.LOCAL.error("分发异常", new Object[]{e});
            }
        }, INITIAL_DELAY.longValue(), PERIOD.longValue(), TimeUnit.SECONDS);
    }

    private Set<Integer> getConsumerBucket() {
        return DistributeInstance.INSTANCE.getConsumerBucket();
    }

    public void close() {
    }
}
