package com.redis.spring.batch.common;

import com.redis.spring.batch.reader.PollableItemReader;
import com.redis.spring.batch.step.FlushingSimpleStepBuilder;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import org.awaitility.Awaitility;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean;
import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder;
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.core.step.skip.SkipPolicy;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStreamReader;
import org.springframework.batch.item.support.SynchronizedItemStreamReader;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;

/* loaded from: input_file:com/redis/spring/batch/common/JobRunner.class */
public class JobRunner {
    public static final Duration DEFAULT_RUNNING_TIMEOUT = Duration.ofSeconds(5);
    public static final Duration DEFAULT_TERMINATION_TIMEOUT = Duration.ofSeconds(5);
    private final JobRepository jobRepository;
    private final PlatformTransactionManager transactionManager;
    private Duration runningTimeout = DEFAULT_RUNNING_TIMEOUT;
    private Duration terminationTimeout = DEFAULT_TERMINATION_TIMEOUT;
    private final SimpleJobLauncher jobLauncher = launcher(new SyncTaskExecutor());
    private final SimpleJobLauncher asyncJobLauncher = launcher(new SimpleAsyncTaskExecutor());

    public JobRunner(JobRepository jobRepository, PlatformTransactionManager platformTransactionManager) {
        this.jobRepository = jobRepository;
        this.transactionManager = platformTransactionManager;
    }

    public static JobRunner inMemory() {
        MapJobRepositoryFactoryBean mapJobRepositoryFactoryBean = new MapJobRepositoryFactoryBean();
        try {
            mapJobRepositoryFactoryBean.afterPropertiesSet();
            return new JobRunner(mapJobRepositoryFactoryBean.getObject(), mapJobRepositoryFactoryBean.getTransactionManager());
        } catch (Exception e) {
            throw new RuntimeException("Could not initialize in-memory job runner", e);
        }
    }

    public JobRepository getJobRepository() {
        return this.jobRepository;
    }

    public PlatformTransactionManager getTransactionManager() {
        return this.transactionManager;
    }

    private SimpleJobLauncher launcher(TaskExecutor taskExecutor) {
        SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
        simpleJobLauncher.setJobRepository(this.jobRepository);
        simpleJobLauncher.setTaskExecutor(taskExecutor);
        return simpleJobLauncher;
    }

    public JobRunner runningTimeout(Duration duration) {
        this.runningTimeout = duration;
        return this;
    }

    public JobRunner terminationTimeout(Duration duration) {
        this.terminationTimeout = duration;
        return this;
    }

    public JobBuilder job(String str) {
        return new JobBuilder(str).repository(this.jobRepository);
    }

    public StepBuilder step(String str) {
        return new StepBuilder(str).repository(this.jobRepository).transactionManager(this.transactionManager);
    }

    public static <T> ItemReader<T> synchronize(ItemReader<T> itemReader) {
        return itemReader instanceof PollableItemReader ? new SynchronizedPollableItemReader((PollableItemReader) itemReader) : itemReader instanceof ItemStreamReader ? synchronizedItemStreamReader((ItemStreamReader) itemReader) : itemReader;
    }

    private static <T> ItemReader<T> synchronizedItemStreamReader(ItemStreamReader<T> itemStreamReader) {
        SynchronizedItemStreamReader synchronizedItemStreamReader = new SynchronizedItemStreamReader();
        synchronizedItemStreamReader.setDelegate(itemStreamReader);
        return synchronizedItemStreamReader;
    }

    public static <I, O> FlushingSimpleStepBuilder<I, O> flushing(SimpleStepBuilder<I, O> simpleStepBuilder, FlushingOptions flushingOptions) {
        FlushingSimpleStepBuilder<I, O> flushingSimpleStepBuilder = new FlushingSimpleStepBuilder<>(simpleStepBuilder);
        flushingSimpleStepBuilder.flushingInterval(flushingOptions.getFlushingInterval());
        flushingSimpleStepBuilder.idleTimeout(flushingOptions.getIdleTimeout());
        return flushingSimpleStepBuilder;
    }

    public static <I, O> FaultTolerantStepBuilder<I, O> faultTolerant(SimpleStepBuilder<I, O> simpleStepBuilder, FaultToleranceOptions faultToleranceOptions) {
        FaultTolerantStepBuilder<I, O> faultTolerant = simpleStepBuilder.faultTolerant();
        faultTolerant.skipLimit(faultToleranceOptions.getSkipLimit());
        List<Class<? extends Throwable>> skip = faultToleranceOptions.getSkip();
        Objects.requireNonNull(faultTolerant);
        skip.forEach(faultTolerant::skip);
        List<Class<? extends Throwable>> noSkip = faultToleranceOptions.getNoSkip();
        Objects.requireNonNull(faultTolerant);
        noSkip.forEach(faultTolerant::noSkip);
        Optional<SkipPolicy> skipPolicy = faultToleranceOptions.getSkipPolicy();
        Objects.requireNonNull(faultTolerant);
        skipPolicy.ifPresent(faultTolerant::skipPolicy);
        return faultTolerant;
    }

    public static void multiThreaded(SimpleStepBuilder<?, ?> simpleStepBuilder, int i) {
        if (i > 1) {
            simpleStepBuilder.taskExecutor(taskExecutor(i));
            simpleStepBuilder.throttleLimit(i);
        }
    }

    public static TaskExecutor taskExecutor(int i) {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setMaxPoolSize(i);
        threadPoolTaskExecutor.setCorePoolSize(i);
        threadPoolTaskExecutor.setQueueCapacity(i);
        threadPoolTaskExecutor.afterPropertiesSet();
        return threadPoolTaskExecutor;
    }

    public static boolean isRunning(JobExecution jobExecution) {
        return jobExecution.isRunning() || jobExecution.getStatus().isUnsuccessful() || jobExecution.getStatus() != BatchStatus.STARTING;
    }

    public static boolean isTerminated(JobExecution jobExecution) {
        return !jobExecution.isRunning() || jobExecution.getStatus().isUnsuccessful() || jobExecution.getStatus() == BatchStatus.COMPLETED || jobExecution.getStatus() == BatchStatus.STOPPED || jobExecution.getStatus().isGreaterThan(BatchStatus.STOPPED);
    }

    public SimpleJobLauncher getJobLauncher() {
        return this.jobLauncher;
    }

    public SimpleJobLauncher getAsyncJobLauncher() {
        return this.asyncJobLauncher;
    }

    public void awaitRunning(JobExecution jobExecution) {
        Awaitility.await().timeout(this.runningTimeout).until(() -> {
            return Boolean.valueOf(isRunning(jobExecution));
        });
    }

    public void awaitTermination(JobExecution jobExecution) {
        Awaitility.await().timeout(this.terminationTimeout).until(() -> {
            return Boolean.valueOf(isTerminated(jobExecution));
        });
    }
}
