package org.apache.kylin.job.impl.threadpool;

import java.io.FileNotFoundException;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.job.ErrorTestExecutable;
import org.apache.kylin.job.FailedTestExecutable;
import org.apache.kylin.job.FiveSecondSucceedTestExecutable;
import org.apache.kylin.job.NoErrorStatusExecutable;
import org.apache.kylin.job.RunningTestExecutable;
import org.apache.kylin.job.SelfStopExecutable;
import org.apache.kylin.job.SucceedTestExecutable;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
import org.apache.kylin.job.execution.ExecutableManager;
import org.apache.kylin.job.execution.ExecutableState;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.class */
public class DefaultSchedulerTest extends BaseSchedulerTest {
    private static final Logger logger = LoggerFactory.getLogger(DefaultSchedulerTest.class);
    private static final int MAX_WAIT_TIME = 20000;

    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @Override // org.apache.kylin.job.impl.threadpool.BaseSchedulerTest
    public void after() throws Exception {
        super.after();
        System.clearProperty("kylin.job.retry");
        System.clearProperty("kylin.job.retry-exception-classes");
    }

    @Test
    public void testSingleTaskJob() throws Exception {
        logger.info("testSingleTaskJob");
        DefaultChainedExecutable defaultChainedExecutable = new DefaultChainedExecutable();
        SucceedTestExecutable succeedTestExecutable = new SucceedTestExecutable();
        defaultChainedExecutable.addTask(succeedTestExecutable);
        this.execMgr.addJob(defaultChainedExecutable);
        waitForJobFinish(defaultChainedExecutable.getId(), MAX_WAIT_TIME);
        Assert.assertEquals(ExecutableState.SUCCEED, this.execMgr.getOutput(defaultChainedExecutable.getId()).getState());
        Assert.assertEquals(ExecutableState.SUCCEED, this.execMgr.getOutput(succeedTestExecutable.getId()).getState());
    }

    @Test
    public void testSucceed() throws Exception {
        logger.info("testSucceed");
        DefaultChainedExecutable defaultChainedExecutable = new DefaultChainedExecutable();
        SucceedTestExecutable succeedTestExecutable = new SucceedTestExecutable();
        SucceedTestExecutable succeedTestExecutable2 = new SucceedTestExecutable();
        defaultChainedExecutable.addTask(succeedTestExecutable);
        defaultChainedExecutable.addTask(succeedTestExecutable2);
        this.execMgr.addJob(defaultChainedExecutable);
        waitForJobFinish(defaultChainedExecutable.getId(), MAX_WAIT_TIME);
        Assert.assertEquals(ExecutableState.SUCCEED, this.execMgr.getOutput(defaultChainedExecutable.getId()).getState());
        Assert.assertEquals(ExecutableState.SUCCEED, this.execMgr.getOutput(succeedTestExecutable.getId()).getState());
        Assert.assertEquals(ExecutableState.SUCCEED, this.execMgr.getOutput(succeedTestExecutable2.getId()).getState());
    }

    @Test
    public void testSucceedAndFailed() throws Exception {
        logger.info("testSucceedAndFailed");
        DefaultChainedExecutable defaultChainedExecutable = new DefaultChainedExecutable();
        SucceedTestExecutable succeedTestExecutable = new SucceedTestExecutable();
        FailedTestExecutable failedTestExecutable = new FailedTestExecutable();
        defaultChainedExecutable.addTask(succeedTestExecutable);
        defaultChainedExecutable.addTask(failedTestExecutable);
        this.execMgr.addJob(defaultChainedExecutable);
        waitForJobFinish(defaultChainedExecutable.getId(), MAX_WAIT_TIME);
        Assert.assertEquals(ExecutableState.ERROR, this.execMgr.getOutput(defaultChainedExecutable.getId()).getState());
        Assert.assertEquals(ExecutableState.SUCCEED, this.execMgr.getOutput(succeedTestExecutable.getId()).getState());
        Assert.assertEquals(ExecutableState.ERROR, this.execMgr.getOutput(failedTestExecutable.getId()).getState());
    }

    @Test
    public void testSucceedAndError() throws Exception {
        logger.info("testSucceedAndError");
        DefaultChainedExecutable defaultChainedExecutable = new DefaultChainedExecutable();
        ErrorTestExecutable errorTestExecutable = new ErrorTestExecutable();
        SucceedTestExecutable succeedTestExecutable = new SucceedTestExecutable();
        defaultChainedExecutable.addTask(errorTestExecutable);
        defaultChainedExecutable.addTask(succeedTestExecutable);
        this.execMgr.addJob(defaultChainedExecutable);
        waitForJobFinish(defaultChainedExecutable.getId(), MAX_WAIT_TIME);
        Assert.assertEquals(ExecutableState.ERROR, this.execMgr.getOutput(defaultChainedExecutable.getId()).getState());
        Assert.assertEquals(ExecutableState.ERROR, this.execMgr.getOutput(errorTestExecutable.getId()).getState());
        Assert.assertEquals(ExecutableState.READY, this.execMgr.getOutput(succeedTestExecutable.getId()).getState());
    }

    @Test
    public void testDiscard() throws Exception {
        logger.info("testDiscard");
        DefaultChainedExecutable defaultChainedExecutable = new DefaultChainedExecutable();
        SelfStopExecutable selfStopExecutable = new SelfStopExecutable();
        defaultChainedExecutable.addTask(selfStopExecutable);
        this.execMgr.addJob(defaultChainedExecutable);
        Thread.sleep(1100L);
        waitForJobStatus(defaultChainedExecutable.getId(), ExecutableState.RUNNING, 500L);
        this.execMgr.discardJob(defaultChainedExecutable.getId());
        waitForJobFinish(defaultChainedExecutable.getId(), MAX_WAIT_TIME);
        Assert.assertEquals(ExecutableState.DISCARDED, this.execMgr.getOutput(defaultChainedExecutable.getId()).getState());
        Assert.assertEquals(ExecutableState.DISCARDED, this.execMgr.getOutput(selfStopExecutable.getId()).getState());
        selfStopExecutable.waitForDoWork();
    }

    @Test
    public void testIllegalState() throws Exception {
        logger.info("testIllegalState");
        DefaultChainedExecutable defaultChainedExecutable = new DefaultChainedExecutable();
        SucceedTestExecutable succeedTestExecutable = new SucceedTestExecutable();
        RunningTestExecutable runningTestExecutable = new RunningTestExecutable();
        defaultChainedExecutable.addTask(succeedTestExecutable);
        defaultChainedExecutable.addTask(runningTestExecutable);
        this.execMgr.addJob(defaultChainedExecutable);
        ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv()).updateJobOutput(runningTestExecutable.getId(), ExecutableState.RUNNING, (Map) null, (String) null);
        waitForJobFinish(defaultChainedExecutable.getId(), MAX_WAIT_TIME);
        Assert.assertEquals(ExecutableState.ERROR, this.execMgr.getOutput(defaultChainedExecutable.getId()).getState());
        Assert.assertEquals(ExecutableState.SUCCEED, this.execMgr.getOutput(succeedTestExecutable.getId()).getState());
        Assert.assertEquals(ExecutableState.ERROR, this.execMgr.getOutput(runningTestExecutable.getId()).getState());
    }

    @Test
    @Ignore("why test JDK feature?")
    public void testSchedulerPool() throws InterruptedException {
        logger.info("testSchedulerPool");
        ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(1);
        CountDownLatch countDownLatch = new CountDownLatch(3);
        countDownLatch.getClass();
        ScheduledFuture<?> scheduleAtFixedRate = newScheduledThreadPool.scheduleAtFixedRate(countDownLatch::countDown, 0L, 1L, TimeUnit.SECONDS);
        Assert.assertTrue("countDownLatch should reach zero in 15 secs", countDownLatch.await(7L, TimeUnit.SECONDS));
        Assert.assertTrue("future should still running", scheduleAtFixedRate.cancel(true));
        final CountDownLatch countDownLatch2 = new CountDownLatch(3);
        ScheduledFuture<?> scheduleAtFixedRate2 = newScheduledThreadPool.scheduleAtFixedRate(new Runnable() { // from class: org.apache.kylin.job.impl.threadpool.DefaultSchedulerTest.1
            @Override // java.lang.Runnable
            public void run() {
                countDownLatch2.countDown();
                throw new RuntimeException();
            }
        }, 0L, 1L, TimeUnit.SECONDS);
        Assert.assertFalse("countDownLatch2 should NOT reach zero in 15 secs", countDownLatch2.await(7L, TimeUnit.SECONDS));
        Assert.assertFalse("future2 should has been stopped", scheduleAtFixedRate2.cancel(true));
    }

    @Test
    public void testMetaStoreRecover() throws Exception {
        logger.info("testMetaStoreRecover");
        NoErrorStatusExecutable noErrorStatusExecutable = new NoErrorStatusExecutable();
        noErrorStatusExecutable.addTask(new ErrorTestExecutable());
        this.execMgr.addJob(noErrorStatusExecutable);
        Thread.sleep(2500L);
        runningJobToError(noErrorStatusExecutable.getId());
        waitForJobFinish(noErrorStatusExecutable.getId(), MAX_WAIT_TIME);
        Assert.assertEquals(ExecutableState.ERROR, this.execMgr.getOutput(noErrorStatusExecutable.getId()).getState());
    }

    @Test
    public void testSchedulerStop() throws Exception {
        logger.info("testSchedulerStop");
        this.thrown.expect(RuntimeException.class);
        this.thrown.expectMessage("too long wait time");
        DefaultChainedExecutable defaultChainedExecutable = new DefaultChainedExecutable();
        defaultChainedExecutable.addTask(new FiveSecondSucceedTestExecutable());
        this.execMgr.addJob(defaultChainedExecutable);
        Thread.sleep(3000L);
        this.scheduler.shutdown();
        waitForJobFinish(defaultChainedExecutable.getId(), 6000);
    }

    @Test
    public void testSchedulerRestart() throws Exception {
        logger.info("testSchedulerRestart");
        DefaultChainedExecutable defaultChainedExecutable = new DefaultChainedExecutable();
        FiveSecondSucceedTestExecutable fiveSecondSucceedTestExecutable = new FiveSecondSucceedTestExecutable();
        defaultChainedExecutable.addTask(fiveSecondSucceedTestExecutable);
        this.execMgr.addJob(defaultChainedExecutable);
        Thread.sleep(3000L);
        this.scheduler.shutdown();
        startScheduler();
        waitForJobFinish(defaultChainedExecutable.getId(), MAX_WAIT_TIME);
        Assert.assertEquals(ExecutableState.SUCCEED, this.execMgr.getOutput(defaultChainedExecutable.getId()).getState());
        Assert.assertEquals(ExecutableState.SUCCEED, this.execMgr.getOutput(fiveSecondSucceedTestExecutable.getId()).getState());
    }

    @Test
    public void testRetryableException() throws Exception {
        DefaultChainedExecutable defaultChainedExecutable = new DefaultChainedExecutable();
        ErrorTestExecutable errorTestExecutable = new ErrorTestExecutable();
        defaultChainedExecutable.addTask(errorTestExecutable);
        System.setProperty("kylin.job.retry", "3");
        Assert.assertFalse(defaultChainedExecutable.needRetry(1, new Exception("")));
        Assert.assertTrue(errorTestExecutable.needRetry(1, new Exception("")));
        Assert.assertFalse(errorTestExecutable.needRetry(1, null));
        Assert.assertFalse(errorTestExecutable.needRetry(4, new Exception("")));
        System.setProperty("kylin.job.retry-exception-classes", "java.io.FileNotFoundException");
        Assert.assertTrue(errorTestExecutable.needRetry(1, new FileNotFoundException()));
        Assert.assertFalse(errorTestExecutable.needRetry(1, new Exception("")));
    }
}
