package com.redis.spring.batch.item;

import com.redis.spring.batch.JobUtils;
import java.time.Duration;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
import org.springframework.batch.core.ItemReadListener;
import org.springframework.batch.core.ItemWriteListener;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionException;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.job.builder.JobBuilder;
import org.springframework.batch.core.launch.support.TaskExecutorJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.SynchronizedItemReader;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:com/redis/spring/batch/item/AbstractAsyncItemReader.class */
public abstract class AbstractAsyncItemReader<S, T> extends AbstractPollableItemReader<T> {
    public static final Duration DEFAULT_POLL_DELAY = Duration.ZERO;
    public static final Duration DEFAULT_AWAIT_POLL_INTERVAL = Duration.ofMillis(1);
    public static final Duration DEFAULT_AWAIT_TIMEOUT = Duration.ofSeconds(3);
    public static final int DEFAULT_THREADS = 1;
    public static final int DEFAULT_CHUNK_SIZE = 50;
    public static final String DEFAULT_JOB_REPOSITORY_NAME = "redis";
    private int skipLimit;
    private int retryLimit;
    private JobRepository jobRepository;
    private ItemProcessor<S, S> processor;
    private JobExecution jobExecution;
    private ItemReader<S> reader;
    private final Log log = LogFactory.getLog(getClass());
    private Duration awaitPollDelay = DEFAULT_POLL_DELAY;
    private Duration awaitPollInterval = DEFAULT_AWAIT_POLL_INTERVAL;
    private Duration awaitTimeout = DEFAULT_AWAIT_TIMEOUT;
    private int chunkSize = 50;
    private int threads = 1;
    private String jobRepositoryName = DEFAULT_JOB_REPOSITORY_NAME;
    private PlatformTransactionManager transactionManager = JobUtils.resourcelessTransactionManager();
    private Set<JobExecutionListener> jobExecutionListeners = new LinkedHashSet();
    private Set<ItemReadListener<S>> itemReadListeners = new LinkedHashSet();
    private Set<ItemWriteListener<S>> itemWriteListeners = new LinkedHashSet();

    protected synchronized void doOpen() throws Exception {
        if (this.jobRepository == null) {
            this.jobRepository = JobUtils.jobRepositoryFactoryBean(this.jobRepositoryName).getObject();
        }
        if (this.jobExecution == null) {
            Job build = jobBuilder().start(step().build()).build();
            TaskExecutorJobLauncher taskExecutorJobLauncher = new TaskExecutorJobLauncher();
            taskExecutorJobLauncher.setJobRepository(this.jobRepository);
            taskExecutorJobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
            taskExecutorJobLauncher.afterPropertiesSet();
            this.jobExecution = taskExecutorJobLauncher.run(build, new JobParameters());
            try {
                awaitUntil(() -> {
                    return Boolean.valueOf(jobRunning() || jobFailed());
                });
            } catch (ConditionTimeoutException e) {
                List allFailureExceptions = this.jobExecution.getAllFailureExceptions();
                if (!CollectionUtils.isEmpty(allFailureExceptions)) {
                    throw new JobExecutionException("Job execution unsuccessful", (Throwable) allFailureExceptions.get(0));
                }
            }
            Optional<Throwable> exception = JobUtils.exception(this.jobExecution);
            if (exception.isPresent()) {
                throw new JobExecutionException("Could not run job", exception.get());
            }
        }
    }

    private JobBuilder jobBuilder() {
        JobBuilder jobBuilder = new JobBuilder(getName(), this.jobRepository);
        Set<JobExecutionListener> set = this.jobExecutionListeners;
        Objects.requireNonNull(jobBuilder);
        set.forEach(jobBuilder::listener);
        return jobBuilder;
    }

    private void awaitUntil(Callable<Boolean> callable) {
        Awaitility.await().pollDelay(this.awaitPollDelay).pollInterval(this.awaitPollInterval).timeout(this.awaitTimeout).until(callable);
    }

    protected boolean jobRunning() {
        if (!this.jobExecution.isRunning()) {
            return false;
        }
        this.log.info(String.format("Job %s running", this.jobExecution.getJobInstance().getJobName()));
        return true;
    }

    protected boolean jobFailed() {
        if (!this.jobExecution.getStatus().isUnsuccessful()) {
            return false;
        }
        this.log.warn(String.format("Job %s failed: %s", this.jobExecution.getJobInstance().getJobName(), this.jobExecution.getStatus()));
        return true;
    }

    protected synchronized void doClose() throws TimeoutException, InterruptedException {
        if (this.jobExecution != null) {
            Awaitility.await().until(() -> {
                return Boolean.valueOf(!this.jobExecution.isRunning());
            });
            this.jobExecution = null;
        }
    }

    private SimpleStepBuilder<S, S> step() {
        SimpleStepBuilder<S, S> stepBuilder = stepBuilder();
        this.reader = reader();
        if (this.threads > 1) {
            ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
            threadPoolTaskExecutor.setMaxPoolSize(this.threads);
            threadPoolTaskExecutor.setCorePoolSize(this.threads);
            threadPoolTaskExecutor.setQueueCapacity(this.threads);
            threadPoolTaskExecutor.afterPropertiesSet();
            stepBuilder.taskExecutor(threadPoolTaskExecutor);
            stepBuilder.reader(new SynchronizedItemReader(this.reader));
        } else {
            stepBuilder.reader(this.reader);
        }
        stepBuilder.processor(this.processor);
        stepBuilder.writer(writer());
        Set<ItemReadListener<S>> set = this.itemReadListeners;
        Objects.requireNonNull(stepBuilder);
        set.forEach(stepBuilder::listener);
        Set<ItemWriteListener<S>> set2 = this.itemWriteListeners;
        Objects.requireNonNull(stepBuilder);
        set2.forEach(stepBuilder::listener);
        return faultTolerant(stepBuilder);
    }

    protected abstract ItemReader<S> reader();

    protected abstract ItemWriter<S> writer();

    protected SimpleStepBuilder<S, S> faultTolerant(SimpleStepBuilder<S, S> simpleStepBuilder) {
        return (this.retryLimit == 0 && this.skipLimit == 0) ? simpleStepBuilder : JobUtils.faultTolerant(simpleStepBuilder).retryLimit(this.retryLimit).skipLimit(this.skipLimit);
    }

    protected SimpleStepBuilder<S, S> stepBuilder() {
        return new StepBuilder(getName(), this.jobRepository).chunk(this.chunkSize, this.transactionManager);
    }

    @Override // com.redis.spring.batch.item.AbstractPollableItemReader
    public boolean isComplete() {
        return this.jobExecution == null || !this.jobExecution.isRunning();
    }

    public void addJobExecutionListener(JobExecutionListener jobExecutionListener) {
        this.jobExecutionListeners.add(jobExecutionListener);
    }

    public void addItemReadListener(ItemReadListener<S> itemReadListener) {
        this.itemReadListeners.add(itemReadListener);
    }

    public void addItemWriteListener(ItemWriteListener<S> itemWriteListener) {
        this.itemWriteListeners.add(itemWriteListener);
    }

    public ItemReader<S> getReader() {
        return this.reader;
    }

    public String getJobRepositoryName() {
        return this.jobRepositoryName;
    }

    public void setJobRepositoryName(String str) {
        this.jobRepositoryName = str;
    }

    public JobRepository getJobRepository() {
        return this.jobRepository;
    }

    public void setJobRepository(JobRepository jobRepository) {
        this.jobRepository = jobRepository;
    }

    public PlatformTransactionManager getTransactionManager() {
        return this.transactionManager;
    }

    public void setTransactionManager(PlatformTransactionManager platformTransactionManager) {
        this.transactionManager = platformTransactionManager;
    }

    public JobExecution getJobExecution() {
        return this.jobExecution;
    }

    public ItemProcessor<S, S> getProcessor() {
        return this.processor;
    }

    public void setProcessor(ItemProcessor<S, S> itemProcessor) {
        this.processor = itemProcessor;
    }

    public int getChunkSize() {
        return this.chunkSize;
    }

    public void setChunkSize(int i) {
        this.chunkSize = i;
    }

    public int getThreads() {
        return this.threads;
    }

    public void setThreads(int i) {
        this.threads = i;
    }

    public int getSkipLimit() {
        return this.skipLimit;
    }

    public void setSkipLimit(int i) {
        this.skipLimit = i;
    }

    public int getRetryLimit() {
        return this.retryLimit;
    }

    public void setRetryLimit(int i) {
        this.retryLimit = i;
    }

    public Duration getAwaitPollDelay() {
        return this.awaitPollDelay;
    }

    public void setAwaitPollDelay(Duration duration) {
        this.awaitPollDelay = duration;
    }

    public Duration getAwaitPollInterval() {
        return this.awaitPollInterval;
    }

    public void setAwaitPollInterval(Duration duration) {
        this.awaitPollInterval = duration;
    }

    public Duration getAwaitTimeout() {
        return this.awaitTimeout;
    }

    public void setAwaitTimeout(Duration duration) {
        this.awaitTimeout = duration;
    }
}
