package com.redis.spring.batch.reader;

import com.redis.spring.batch.common.JobRunner;
import com.redis.spring.batch.common.Openable;
import com.redis.spring.batch.common.Utils;
import io.lettuce.core.AbstractRedisClient;
import io.lettuce.core.codec.RedisCodec;
import io.micrometer.core.instrument.Tag;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.awaitility.Awaitility;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.job.builder.SimpleJobBuilder;
import org.springframework.batch.core.step.builder.SimpleStepBuilder;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader;
import org.springframework.batch.item.support.AbstractItemStreamItemWriter;
import org.springframework.util.ClassUtils;

/* loaded from: input_file:com/redis/spring/batch/reader/AbstractRedisItemReader.class */
public abstract class AbstractRedisItemReader<K, T> extends AbstractItemCountingItemStreamItemReader<T> implements Openable {
    private static final Log log = LogFactory.getLog(AbstractRedisItemReader.class);
    protected final JobRunner jobRunner;
    protected final ItemReader<K> keyReader;
    private final ItemProcessor<K, K> keyProcessor;
    private final Writer<K, T> writer;
    private final ReaderOptions options;
    protected final BlockingQueue<T> queue;
    private String name;
    private JobExecution jobExecution;

    /* loaded from: input_file:com/redis/spring/batch/reader/AbstractRedisItemReader$AbstractReaderBuilder.class */
    public static abstract class AbstractReaderBuilder<K, V, B extends AbstractReaderBuilder<K, V, B>> {
        private static JobRunner inMemoryJobRunner;
        private JobRunner jobRunner;
        protected final AbstractRedisClient client;
        protected final RedisCodec<K, V> codec;
        protected ReaderOptions readerOptions = ReaderOptions.builder().build();

        /* JADX INFO: Access modifiers changed from: protected */
        public AbstractReaderBuilder(AbstractRedisClient abstractRedisClient, RedisCodec<K, V> redisCodec) {
            this.client = abstractRedisClient;
            this.codec = redisCodec;
        }

        protected abstract ItemReader<K> keyReader();

        /* JADX INFO: Access modifiers changed from: protected */
        public final JobRunner jobRunner() {
            return this.jobRunner == null ? inMemoryJobRunner() : this.jobRunner;
        }

        private static JobRunner inMemoryJobRunner() {
            if (inMemoryJobRunner == null) {
                inMemoryJobRunner = JobRunner.inMemory();
            }
            return inMemoryJobRunner;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public <K1, V1, B1 extends AbstractReaderBuilder<K1, V1, B1>> B1 toBuilder(B1 b1) {
            b1.jobRunner(this.jobRunner);
            b1.readerOptions(this.readerOptions);
            return b1;
        }

        public B jobRunner(JobRunner jobRunner) {
            this.jobRunner = jobRunner;
            return this;
        }

        public B readerOptions(ReaderOptions readerOptions) {
            this.readerOptions = readerOptions;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public DataStructureValueReader<K, V> dataStructureValueReader() {
            return new DataStructureValueReader<>(this.client, this.codec, this.readerOptions.getPoolOptions());
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public KeyDumpValueReader<K, V> keyDumpValueReader() {
            return new KeyDumpValueReader<>(this.client, this.codec, this.readerOptions.getPoolOptions());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/redis/spring/batch/reader/AbstractRedisItemReader$Writer.class */
    public static class Writer<K, T> extends AbstractItemStreamItemWriter<K> implements Openable {
        private final ValueReader<K, T> valueReader;
        private final BlockingQueue<T> queue;
        private boolean open;

        public Writer(ValueReader<K, T> valueReader, BlockingQueue<T> blockingQueue) {
            this.valueReader = valueReader;
            this.queue = blockingQueue;
        }

        public void open(ExecutionContext executionContext) {
            super.open(executionContext);
            if (this.valueReader instanceof ItemStream) {
                this.valueReader.open(executionContext);
            }
            this.open = true;
        }

        public void update(ExecutionContext executionContext) {
            super.update(executionContext);
            if (this.valueReader instanceof ItemStream) {
                this.valueReader.update(executionContext);
            }
        }

        public void close() {
            if (this.valueReader instanceof ItemStream) {
                this.valueReader.close();
            }
            super.close();
            this.open = false;
        }

        @Override // com.redis.spring.batch.common.Openable
        public boolean isOpen() {
            return this.open;
        }

        public void write(List<? extends K> list) throws InterruptedException {
            try {
                Iterator<T> it = this.valueReader.read(list).iterator();
                while (it.hasNext()) {
                    try {
                        this.queue.put(it.next());
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        throw e;
                    }
                }
            } catch (Exception e2) {
                throw new ItemStreamException("Could not read values", e2);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractRedisItemReader(JobRunner jobRunner, ItemReader<K> itemReader, ItemProcessor<K, K> itemProcessor, ValueReader<K, T> valueReader, ReaderOptions readerOptions) {
        setName(ClassUtils.getShortName(getClass()));
        this.jobRunner = jobRunner;
        this.keyReader = readerOptions.getThreads() > 1 ? JobRunner.synchronize(itemReader) : itemReader;
        this.keyProcessor = itemProcessor;
        this.queue = new LinkedBlockingQueue(readerOptions.getQueueOptions().getCapacity());
        this.writer = new Writer<>(valueReader, this.queue);
        this.options = readerOptions;
    }

    public void setName(String str) {
        super.setName(str);
        this.name = str;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized void doOpen() throws Exception {
        if (this.jobExecution == null) {
            Utils.createGaugeCollectionSize("reader.queue.size", this.queue, new Tag[0]);
            this.jobExecution = this.jobRunner.getAsyncJobLauncher().run(job().build(), new JobParameters());
            this.jobRunner.awaitRunning(this.jobExecution);
            if (this.jobExecution.getStatus().isUnsuccessful()) {
                throw new ItemStreamException("Job execution unsuccessful");
            }
        }
    }

    protected SimpleJobBuilder job() {
        return this.jobRunner.job(this.name).start(JobRunner.faultTolerant(step(), this.options.getFaultToleranceOptions()).build());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SimpleStepBuilder<K, K> step() {
        SimpleStepBuilder<K, K> chunk = this.jobRunner.step(this.name).chunk(this.options.getChunkSize());
        chunk.reader(this.keyReader);
        chunk.processor(this.keyProcessor);
        chunk.writer(this.writer);
        JobRunner.multiThreaded(chunk, this.options.getThreads());
        return chunk;
    }

    protected synchronized void doClose() {
        log.info("Closing reader " + this.name);
        if (this.keyReader instanceof ItemStream) {
            this.keyReader.close();
        }
        if (this.jobExecution != null) {
            Awaitility.await().until(() -> {
                return Boolean.valueOf(!this.jobExecution.isRunning());
            });
            this.jobExecution = null;
        }
    }

    protected T doRead() throws Exception {
        T poll;
        do {
            poll = this.queue.poll(this.options.getQueueOptions().getPollTimeout().toMillis(), TimeUnit.MILLISECONDS);
            if (poll != null) {
                break;
            }
        } while (isOpen());
        return poll;
    }

    @Override // com.redis.spring.batch.common.Openable
    public boolean isOpen() {
        return (this.jobExecution == null || !this.jobExecution.isRunning() || this.jobExecution.getStatus().isUnsuccessful()) ? false : true;
    }
}
