package com.redis.spring.batch.reader;

import com.redis.spring.batch.common.CompositeItemStreamProcessor;
import com.redis.spring.batch.common.ItemStreamProcessor;
import com.redis.spring.batch.common.KeyValue;
import com.redis.spring.batch.common.ListItemProcessor;
import com.redis.spring.batch.common.ProcessingItemWriter;
import com.redis.spring.batch.common.QueueItemWriter;
import com.redis.spring.batch.common.Utils;
import com.redis.spring.batch.common.ValueType;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.codec.RedisCodec;
import io.micrometer.core.instrument.Tag;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionException;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.AbstractItemStreamItemReader;
import org.springframework.batch.support.transaction.ResourcelessTransactionManager;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.util.ClassUtils;

/* loaded from: input_file:com/redis/spring/batch/reader/AbstractRedisItemReader.class */
public abstract class AbstractRedisItemReader<K, V> extends AbstractItemStreamItemReader<KeyValue<K>> {
    protected final AbstractRedisClient client;
    protected final RedisCodec<K, V> codec;
    protected final KeyItemReader<K> keyReader;
    private final ValueType valueType;
    private ItemProcessor<K, K> processor;
    private JobRepository jobRepository;
    private JobBuilderFactory jobBuilderFactory;
    protected ReaderOptions options = ReaderOptions.builder().build();
    private String name;
    private JobExecution jobExecution;
    protected BlockingQueue<KeyValue<K>> queue;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractRedisItemReader(AbstractRedisClient abstractRedisClient, RedisCodec<K, V> redisCodec, KeyItemReader<K> keyItemReader, ValueType valueType) {
        setName(ClassUtils.getShortName(getClass()));
        this.client = abstractRedisClient;
        this.codec = redisCodec;
        this.keyReader = keyItemReader;
        this.valueType = valueType;
    }

    public KeyItemReader<K> getKeyReader() {
        return this.keyReader;
    }

    public AbstractRedisClient getClient() {
        return this.client;
    }

    public ValueType getValueType() {
        return this.valueType;
    }

    public ReaderOptions getOptions() {
        return this.options;
    }

    public void setOptions(ReaderOptions readerOptions) {
        this.options = readerOptions;
    }

    public JobRepository getJobRepository() {
        return this.jobRepository;
    }

    public void setJobRepository(JobRepository jobRepository) {
        this.jobRepository = jobRepository;
    }

    public void setKeyProcessor(ItemProcessor<K, K> itemProcessor) {
        this.processor = itemProcessor;
    }

    public void setName(String str) {
        super.setName(str);
        this.name = str;
    }

    public synchronized void open(ExecutionContext executionContext) {
        super.open(executionContext);
        if (isOpen()) {
            return;
        }
        doOpen();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void doOpen() {
        try {
            this.jobExecution = jobLauncher().run(job(), new JobParameters());
            while (!Utils.isOpen(this.keyReader) && !this.jobExecution.getStatus().isUnsuccessful() && !this.jobExecution.getStatus().isLessThanOrEqualTo(BatchStatus.COMPLETED)) {
                sleep();
            }
            if (this.jobExecution.getStatus().isUnsuccessful()) {
                throw new ItemStreamException("Could not run job", (Throwable) this.jobExecution.getAllFailureExceptions().iterator().next());
            }
        } catch (JobExecutionException e) {
            throw new ItemStreamException("Job execution failed", e);
        }
    }

    private Job job() {
        return jobBuilderFactory().get(this.name).start(step().build()).build();
    }

    private JobBuilderFactory jobBuilderFactory() {
        if (this.jobBuilderFactory == null) {
            this.jobBuilderFactory = new JobBuilderFactory(jobRepository());
        }
        return this.jobBuilderFactory;
    }

    private JobRepository jobRepository() {
        if (this.jobRepository == null) {
            try {
                this.jobRepository = Utils.inMemoryJobRepository();
            } catch (Exception e) {
                throw new ItemStreamException("Could not initialize job repository", e);
            }
        }
        return this.jobRepository;
    }

    private SimpleJobLauncher jobLauncher() {
        SimpleJobLauncher simpleJobLauncher = new SimpleJobLauncher();
        simpleJobLauncher.setJobRepository(jobRepository());
        simpleJobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
        return simpleJobLauncher;
    }

    private void sleep() {
        try {
            Thread.sleep(this.options.getQueueOptions().getPollTimeout().toMillis());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ItemStreamException("Interrupted during initialization", e);
        }
    }

    private PlatformTransactionManager transactionManager() {
        return new ResourcelessTransactionManager();
    }

    public JobExecution getJobExecution() {
        return this.jobExecution;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SimpleStepBuilder<K, K> step() {
        StepBuilder stepBuilder = new StepBuilder(this.name);
        stepBuilder.repository(jobRepository());
        stepBuilder.transactionManager(transactionManager());
        SimpleStepBuilder<K, K> chunk = stepBuilder.chunk(this.options.getChunkSize());
        chunk.reader(this.keyReader);
        chunk.processor(this.processor);
        chunk.writer(new ProcessingItemWriter(operationProcessor(), queueWriter()));
        Utils.multiThread(chunk, this.options.getThreads());
        return chunk;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ItemWriter<KeyValue<K>> queueWriter() {
        this.queue = new LinkedBlockingQueue(this.options.getQueueOptions().getCapacity());
        Utils.createGaugeCollectionSize("reader.queue.size", this.queue, new Tag[0]);
        return new QueueItemWriter(this.queue);
    }

    public ItemStreamProcessor<List<K>, List<KeyValue<K>>> operationProcessor() {
        KeyValueReadOperation keyValueReadOperation = new KeyValueReadOperation(this.client, this.codec);
        keyValueReadOperation.setMemoryUsageOptions(this.options.getMemoryUsageOptions());
        keyValueReadOperation.setValueType(this.valueType);
        OperationItemProcessor operationItemProcessor = new OperationItemProcessor(this.client, this.codec, keyValueReadOperation);
        operationItemProcessor.setPoolOptions(this.options.getPoolOptions());
        operationItemProcessor.setReadFrom(this.options.getReadFrom());
        return new CompositeItemStreamProcessor(operationItemProcessor, keyValueListProcessor());
    }

    private ItemProcessor<List<List<Object>>, List<KeyValue<K>>> keyValueListProcessor() {
        ListItemProcessor listItemProcessor = new ListItemProcessor(new KeyValueProcessor(this.codec));
        return this.valueType == ValueType.STRUCT ? new CompositeItemStreamProcessor(listItemProcessor, new ListItemProcessor(new StructProcessor(this.codec))) : listItemProcessor;
    }

    public synchronized void close() {
        if (isOpen()) {
            doClose();
        }
        super.close();
    }

    protected void doClose() {
        this.queue = null;
        if (this.jobExecution.isRunning()) {
            Iterator it = this.jobExecution.getStepExecutions().iterator();
            while (it.hasNext()) {
                ((StepExecution) it.next()).setTerminateOnly();
            }
            this.jobExecution.setStatus(BatchStatus.STOPPING);
        }
        this.jobExecution = null;
    }

    public boolean isOpen() {
        return this.jobExecution != null;
    }

    /* renamed from: read, reason: merged with bridge method [inline-methods] */
    public synchronized KeyValue<K> m5read() throws Exception {
        KeyValue<K> poll;
        do {
            poll = this.queue.poll(this.options.getQueueOptions().getPollTimeout().toMillis(), TimeUnit.MILLISECONDS);
            if (poll != null || this.jobExecution == null) {
                break;
            }
        } while (this.jobExecution.isRunning());
        if (this.jobExecution == null || !this.jobExecution.getStatus().isUnsuccessful()) {
            return poll;
        }
        throw new ItemStreamException("Reader job failed");
    }

    public synchronized List<KeyValue<K>> read(int i) {
        ArrayList arrayList = new ArrayList(i);
        this.queue.drainTo(arrayList, i);
        return arrayList;
    }
}
