package com.redis.spring.batch.common;

import com.redis.spring.batch.reader.PollableItemReader;
import com.redis.spring.batch.step.FlushingSimpleStepBuilder;
import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Callable;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionFactory;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionException;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.job.builder.JobBuilderException;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean;
import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder;
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.core.step.skip.SkipPolicy;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStreamReader;
import org.springframework.batch.item.ItemStreamSupport;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.SynchronizedItemStreamReader;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.backoff.BackOffPolicy;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;

/* loaded from: input_file:com/redis/spring/batch/common/JobRunner.class */
public class JobRunner {
    private static JobRunner memoryInstance;
    public static final Duration DEFAULT_POLL_INTERVAL = Duration.ofMillis(30);
    public static final Duration DEFAULT_RUNNING_TIMEOUT = Duration.ofSeconds(5);
    public static final Duration DEFAULT_TERMINATION_TIMEOUT = Duration.ofSeconds(5);
    private static final JobParameters DEFAULT_JOB_PARAMETERS = new JobParameters();
    private final JobRepository jobRepository;
    private final PlatformTransactionManager transactionManager;
    private Duration pollInterval = DEFAULT_POLL_INTERVAL;
    private Duration runningTimeout = DEFAULT_RUNNING_TIMEOUT;
    private Duration terminationTimeout = DEFAULT_TERMINATION_TIMEOUT;
    private final SimpleJobLauncher jobLauncher = launcher(new SyncTaskExecutor());
    private final SimpleJobLauncher asyncJobLauncher = launcher(new SimpleAsyncTaskExecutor());

    public JobRunner(JobRepository jobRepository, PlatformTransactionManager platformTransactionManager) {
        this.jobRepository = jobRepository;
        this.transactionManager = platformTransactionManager;
    }

    public static JobRunner getInMemoryInstance() {
        if (memoryInstance == null) {
            try {
                memoryInstance = inMemory();
            } catch (Exception e) {
                throw new JobBuilderException(e);
            }
        }
        return memoryInstance;
    }

    public static JobRunner inMemory() throws Exception {
        MapJobRepositoryFactoryBean mapJobRepositoryFactoryBean = new MapJobRepositoryFactoryBean();
        mapJobRepositoryFactoryBean.afterPropertiesSet();
        return new JobRunner(mapJobRepositoryFactoryBean.getObject(), mapJobRepositoryFactoryBean.getTransactionManager());
    }

    public JobRepository getJobRepository() {
        return this.jobRepository;
    }

    public PlatformTransactionManager getTransactionManager() {
        return this.transactionManager;
    }

    private SimpleJobLauncher launcher(TaskExecutor taskExecutor) {
        SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
        simpleJobLauncher.setJobRepository(this.jobRepository);
        simpleJobLauncher.setTaskExecutor(taskExecutor);
        return simpleJobLauncher;
    }

    public JobRunner pollInterval(Duration duration) {
        this.pollInterval = duration;
        return this;
    }

    public JobRunner runningTimeout(Duration duration) {
        this.runningTimeout = duration;
        return this;
    }

    public JobRunner terminationTimeout(Duration duration) {
        this.terminationTimeout = duration;
        return this;
    }

    public JobBuilder job(String str) {
        return new JobBuilder(str).repository(this.jobRepository);
    }

    public <T> ItemReader<T> synchronize(ItemReader<T> itemReader) {
        return itemReader instanceof PollableItemReader ? new SynchronizedPollableItemReader((PollableItemReader) itemReader) : itemReader instanceof ItemStreamReader ? synchronizedItemStreamReader((ItemStreamReader) itemReader) : itemReader;
    }

    private <T> ItemReader<T> synchronizedItemStreamReader(ItemStreamReader<T> itemStreamReader) {
        SynchronizedItemStreamReader synchronizedItemStreamReader = new SynchronizedItemStreamReader();
        synchronizedItemStreamReader.setDelegate(itemStreamReader);
        return synchronizedItemStreamReader;
    }

    public static boolean isRunning(JobExecution jobExecution) {
        return jobExecution.isRunning() || jobExecution.getStatus().isUnsuccessful() || jobExecution.getStatus() != BatchStatus.STARTING;
    }

    public static boolean isTerminated(JobExecution jobExecution) {
        return !jobExecution.isRunning() || jobExecution.getStatus().isUnsuccessful() || jobExecution.getStatus() == BatchStatus.COMPLETED || jobExecution.getStatus() == BatchStatus.STOPPED || jobExecution.getStatus().isGreaterThan(BatchStatus.STOPPED);
    }

    public void awaitRunning(Callable<Boolean> callable) {
        await().timeout(this.runningTimeout).until(callable);
    }

    public JobExecution awaitRunning(JobExecution jobExecution) {
        awaitRunning(() -> {
            return Boolean.valueOf(isRunning(jobExecution));
        });
        return jobExecution;
    }

    private ConditionFactory await() {
        return Awaitility.await().pollInterval(this.pollInterval);
    }

    public void awaitTermination(Callable<Boolean> callable) {
        await().timeout(this.terminationTimeout).until(callable);
    }

    public JobExecution awaitTermination(JobExecution jobExecution) {
        await().timeout(this.terminationTimeout).until(() -> {
            return Boolean.valueOf(isTerminated(jobExecution));
        });
        return jobExecution;
    }

    public StepBuilder step(String str) {
        return new StepBuilder(str).repository(this.jobRepository).transactionManager(this.transactionManager);
    }

    public <I, O> SimpleStepBuilder<I, O> step(String str, ItemReader<I> itemReader, ItemWriter<O> itemWriter, StepOptions stepOptions) {
        return step(str, itemReader, null, itemWriter, stepOptions);
    }

    public <I, O> SimpleStepBuilder<I, O> step(String str, ItemReader<I> itemReader, ItemProcessor<I, O> itemProcessor, ItemWriter<O> itemWriter, StepOptions stepOptions) {
        setName(itemReader, str + "-reader");
        setName(itemProcessor, str + "-processor");
        setName(itemWriter, str + "-writer");
        SimpleStepBuilder<I, O> chunk = step(str).chunk(stepOptions.getChunkSize());
        chunk.reader(itemReader);
        chunk.processor(itemProcessor);
        chunk.writer(itemWriter);
        if (stepOptions.getThreads() > 1) {
            chunk.reader(synchronize(itemReader));
            ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
            threadPoolTaskExecutor.setMaxPoolSize(stepOptions.getThreads());
            threadPoolTaskExecutor.setCorePoolSize(stepOptions.getThreads());
            threadPoolTaskExecutor.setQueueCapacity(stepOptions.getThreads());
            threadPoolTaskExecutor.afterPropertiesSet();
            chunk.taskExecutor(threadPoolTaskExecutor);
            chunk.throttleLimit(stepOptions.getThreads());
        }
        if (stepOptions.isFaultTolerant()) {
            chunk = faultTolerant(chunk, stepOptions);
        }
        if (!stepOptions.getFlushingInterval().isPresent()) {
            return chunk;
        }
        FlushingSimpleStepBuilder flushingSimpleStepBuilder = new FlushingSimpleStepBuilder(chunk);
        flushingSimpleStepBuilder.flushingInterval(stepOptions.getFlushingInterval().get());
        flushingSimpleStepBuilder.idleTimeout(stepOptions.getIdleTimeout());
        return flushingSimpleStepBuilder;
    }

    private void setName(Object obj, String str) {
        if (obj instanceof ItemStreamSupport) {
            ((ItemStreamSupport) obj).setName(str);
        }
    }

    private <I, O> SimpleStepBuilder<I, O> faultTolerant(SimpleStepBuilder<I, O> simpleStepBuilder, StepOptions stepOptions) {
        FaultTolerantStepBuilder faultTolerant = simpleStepBuilder.faultTolerant();
        faultTolerant.retryLimit(stepOptions.getRetryLimit());
        faultTolerant.skipLimit(stepOptions.getSkipLimit());
        List<Class<? extends Throwable>> skip = stepOptions.getSkip();
        Objects.requireNonNull(faultTolerant);
        skip.forEach(faultTolerant::skip);
        List<Class<? extends Throwable>> noSkip = stepOptions.getNoSkip();
        Objects.requireNonNull(faultTolerant);
        noSkip.forEach(faultTolerant::noSkip);
        Optional<SkipPolicy> skipPolicy = stepOptions.getSkipPolicy();
        Objects.requireNonNull(faultTolerant);
        skipPolicy.ifPresent(faultTolerant::skipPolicy);
        Optional<BackOffPolicy> backOffPolicy = stepOptions.getBackOffPolicy();
        Objects.requireNonNull(faultTolerant);
        backOffPolicy.ifPresent(faultTolerant::backOffPolicy);
        Optional<RetryPolicy> retryPolicy = stepOptions.getRetryPolicy();
        Objects.requireNonNull(faultTolerant);
        retryPolicy.ifPresent(faultTolerant::retryPolicy);
        return faultTolerant;
    }

    public JobExecution awaitNotRunning(JobExecution jobExecution) {
        awaitTermination(() -> {
            return Boolean.valueOf(!jobExecution.isRunning());
        });
        return jobExecution;
    }

    public JobExecution runAsync(Job job) throws JobExecutionException {
        return awaitRunning(this.asyncJobLauncher.run(job, DEFAULT_JOB_PARAMETERS));
    }

    public JobExecution run(Job job) throws JobExecutionException {
        return awaitTermination(this.jobLauncher.run(job, DEFAULT_JOB_PARAMETERS));
    }
}
