package com.redis.spring.batch.reader;

import com.redis.spring.batch.common.BatchOperation;
import com.redis.spring.batch.common.Openable;
import com.redis.spring.batch.common.OperationItemStreamSupport;
import com.redis.spring.batch.common.ProcessingItemWriter;
import com.redis.spring.batch.common.QueueItemWriter;
import com.redis.spring.batch.common.Utils;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.codec.RedisCodec;
import io.micrometer.core.instrument.Tag;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionException;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.support.AbstractItemStreamItemReader;
import org.springframework.batch.support.transaction.ResourcelessTransactionManager;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.util.ClassUtils;

/* loaded from: input_file:com/redis/spring/batch/reader/AbstractRedisItemReader.class */
public abstract class AbstractRedisItemReader<K, V, T> extends AbstractItemStreamItemReader<T> implements PollableItemReader<T>, Openable {
    protected final AbstractRedisClient client;
    protected final RedisCodec<K, V> codec;
    private final BatchOperation<K, V, K, T> operation;
    private ItemProcessor<K, K> processor;
    private JobRepository jobRepository;
    protected ReaderOptions options = ReaderOptions.builder().build();
    private String name;
    private JobExecution jobExecution;
    private BlockingQueue<T> queue;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractRedisItemReader(AbstractRedisClient abstractRedisClient, RedisCodec<K, V> redisCodec, BatchOperation<K, V, K, T> batchOperation) {
        setName(ClassUtils.getShortName(getClass()));
        this.client = abstractRedisClient;
        this.codec = redisCodec;
        this.operation = batchOperation;
    }

    public void setOptions(ReaderOptions readerOptions) {
        this.options = readerOptions;
    }

    public void setJobRepository(JobRepository jobRepository) {
        this.jobRepository = jobRepository;
    }

    public void setKeyProcessor(ItemProcessor<K, K> itemProcessor) {
        this.processor = itemProcessor;
    }

    public void setName(String str) {
        super.setName(str);
        this.name = str;
    }

    public synchronized void open(ExecutionContext executionContext) {
        super.open(executionContext);
        if (this.queue != null) {
            return;
        }
        OperationItemStreamSupport<K, V, K, T> operationProcessor = operationProcessor();
        this.queue = queue();
        if (this.jobRepository == null) {
            try {
                this.jobRepository = Utils.inMemoryJobRepository();
            } catch (Exception e) {
                throw new ItemStreamException("Could not initialize job repository", e);
            }
        }
        StepBuilder stepBuilder = new StepBuilder(this.name + "-step");
        stepBuilder.repository(this.jobRepository);
        stepBuilder.transactionManager(transactionManager());
        SimpleStepBuilder<K, K> step = step(stepBuilder);
        ItemReader<K> keyReader = keyReader();
        step.reader(this.options.getThreads() > 1 ? Utils.synchronizedReader(keyReader) : keyReader);
        step.processor(this.processor);
        step.writer(new ProcessingItemWriter(operationProcessor, new QueueItemWriter(this.queue)));
        Utils.multiThread(step, this.options.getThreads());
        Job build = new JobBuilderFactory(this.jobRepository).get(this.name).start(step.build()).build();
        SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
        simpleJobLauncher.setJobRepository(this.jobRepository);
        simpleJobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
        try {
            this.jobExecution = simpleJobLauncher.run(build, new JobParameters());
            while (!this.jobExecution.isRunning() && !this.jobExecution.getStatus().isUnsuccessful() && this.jobExecution.getStatus() != BatchStatus.COMPLETED) {
                sleep();
            }
            sleep();
            if (this.jobExecution.getStatus().isUnsuccessful()) {
                throw new ItemStreamException("Could not run job", (Throwable) this.jobExecution.getAllFailureExceptions().iterator().next());
            }
        } catch (JobExecutionException e2) {
            throw new ItemStreamException("Job execution failed", e2);
        }
    }

    public OperationItemStreamSupport<K, V, K, T> operationProcessor() {
        OperationItemStreamSupport<K, V, K, T> operationItemStreamSupport = new OperationItemStreamSupport<>(this.client, this.codec, this.operation);
        operationItemStreamSupport.setPoolOptions(this.options.getPoolOptions());
        operationItemStreamSupport.setReadFrom(this.options.getReadFrom());
        return operationItemStreamSupport;
    }

    protected abstract ItemReader<K> keyReader();

    private void sleep() {
        try {
            Thread.sleep(this.options.getQueueOptions().getPollTimeout().toMillis());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ItemStreamException("Interrupted during initialization", e);
        }
    }

    private PlatformTransactionManager transactionManager() {
        return new ResourcelessTransactionManager();
    }

    public JobExecution getJobExecution() {
        return this.jobExecution;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SimpleStepBuilder<K, K> step(StepBuilder stepBuilder) {
        return stepBuilder.chunk(this.options.getChunkSize());
    }

    private BlockingQueue<T> queue() {
        LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue(this.options.getQueueOptions().getCapacity());
        Utils.createGaugeCollectionSize("reader.queue.size", linkedBlockingQueue, new Tag[0]);
        return linkedBlockingQueue;
    }

    public synchronized void close() {
        if (this.jobExecution != null) {
            if (this.jobExecution.isRunning()) {
                Iterator it = this.jobExecution.getStepExecutions().iterator();
                while (it.hasNext()) {
                    ((StepExecution) it.next()).setTerminateOnly();
                }
                this.jobExecution.setStatus(BatchStatus.STOPPING);
            }
            this.jobExecution = null;
        }
        this.queue = null;
        super.close();
    }

    @Override // com.redis.spring.batch.common.Openable
    public boolean isOpen() {
        return this.jobExecution != null;
    }

    @Override // com.redis.spring.batch.reader.PollableItemReader
    public synchronized T poll(long j, TimeUnit timeUnit) throws InterruptedException {
        return this.queue.poll(j, timeUnit);
    }

    public synchronized T read() throws Exception {
        T poll;
        do {
            poll = this.queue.poll(this.options.getQueueOptions().getPollTimeout().toMillis(), TimeUnit.MILLISECONDS);
            if (poll != null || this.jobExecution == null) {
                break;
            }
        } while (this.jobExecution.isRunning());
        if (this.jobExecution == null || !this.jobExecution.getStatus().isUnsuccessful()) {
            return poll;
        }
        throw new ItemStreamException("Reader job failed");
    }

    public synchronized List<T> read(int i) {
        ArrayList arrayList = new ArrayList(i);
        this.queue.drainTo(arrayList, i);
        return arrayList;
    }
}
