package com.redis.spring.batch.step;

import com.redis.spring.batch.reader.PollableItemReader;
import com.redis.spring.batch.support.Utils;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.metrics.BatchMetrics;
import org.springframework.batch.core.step.item.Chunk;
import org.springframework.batch.core.step.item.FaultTolerantChunkProvider;
import org.springframework.batch.core.step.item.SkipOverflowException;
import org.springframework.batch.core.step.skip.LimitCheckingItemSkipPolicy;
import org.springframework.batch.core.step.skip.NonSkippableReadException;
import org.springframework.batch.core.step.skip.SkipException;
import org.springframework.batch.core.step.skip.SkipListenerFailedException;
import org.springframework.batch.core.step.skip.SkipPolicy;
import org.springframework.batch.core.step.skip.SkipPolicyFailedException;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.repeat.RepeatOperations;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.classify.BinaryExceptionClassifier;
import org.springframework.classify.Classifier;
import org.springframework.util.Assert;

/* loaded from: input_file:com/redis/spring/batch/step/FlushingChunkProvider.class */
public class FlushingChunkProvider<I> extends FaultTolerantChunkProvider<I> {
    private static final Logger log = LoggerFactory.getLogger(FlushingChunkProvider.class);
    public static final int DEFAULT_MAX_SKIPS_ON_READ = 100;
    private final RepeatOperations repeatOperations;
    private SkipPolicy skipPolicy;
    private Classifier<Throwable, Boolean> rollbackClassifier;
    private int maxSkipsOnRead;
    private long flushingInterval;
    private long idleTimeout;
    private long lastActivity;

    public FlushingChunkProvider(ItemReader<? extends I> itemReader, RepeatOperations repeatOperations) {
        super(itemReader, repeatOperations);
        this.skipPolicy = new LimitCheckingItemSkipPolicy();
        this.rollbackClassifier = new BinaryExceptionClassifier(true);
        this.maxSkipsOnRead = 100;
        this.idleTimeout = Long.MAX_VALUE;
        this.lastActivity = 0L;
        Assert.isTrue(itemReader instanceof PollableItemReader, "Reader must extend PollableItemReader");
        this.repeatOperations = repeatOperations;
    }

    public void setMaxSkipsOnRead(int i) {
        this.maxSkipsOnRead = i;
    }

    public void setSkipPolicy(SkipPolicy skipPolicy) {
        this.skipPolicy = skipPolicy;
    }

    public void setRollbackClassifier(Classifier<Throwable, Boolean> classifier) {
        this.rollbackClassifier = classifier;
    }

    public void setFlushingInterval(Duration duration) {
        Utils.assertPositive(duration, "Flushing interval");
        this.flushingInterval = duration.toMillis();
    }

    public void setIdleTimeout(Duration duration) {
        Utils.assertPositive(duration, "Idle timeout");
        this.idleTimeout = duration.toMillis();
    }

    private void stopTimer(Timer.Sample sample, StepExecution stepExecution, String str) {
        sample.stop(BatchMetrics.createTimer("item.read", "Item reading duration", new Tag[]{Tag.of("job.name", stepExecution.getJobExecution().getJobInstance().getJobName()), Tag.of("step.name", stepExecution.getStepName()), Tag.of("status", str)}));
    }

    public Chunk<I> provide(StepContribution stepContribution) {
        long currentTimeMillis = System.currentTimeMillis();
        if (this.lastActivity == 0) {
            this.lastActivity = currentTimeMillis;
        }
        Chunk<I> chunk = new Chunk<>();
        this.repeatOperations.iterate(repeatContext -> {
            long currentTimeMillis2 = this.flushingInterval - (System.currentTimeMillis() - currentTimeMillis);
            if (currentTimeMillis2 < 0) {
                return RepeatStatus.FINISHED;
            }
            Timer.Sample start = Timer.start(Metrics.globalRegistry);
            try {
                I read = read(stepContribution, chunk, currentTimeMillis2);
                if (read == null) {
                    long currentTimeMillis3 = System.currentTimeMillis() - this.lastActivity;
                    if (currentTimeMillis3 > this.idleTimeout) {
                        log.debug("End of stream: idle for {} ms", Long.valueOf(currentTimeMillis3));
                        chunk.setEnd();
                    }
                    return RepeatStatus.CONTINUABLE;
                }
                stopTimer(start, stepContribution.getStepExecution(), "SUCCESS");
                chunk.add(read);
                stepContribution.incrementReadCount();
                this.lastActivity = System.currentTimeMillis();
                return RepeatStatus.CONTINUABLE;
            } catch (SkipOverflowException e) {
                stopTimer(start, stepContribution.getStepExecution(), "FAILURE");
                return RepeatStatus.FINISHED;
            }
        });
        return chunk;
    }

    protected I read(StepContribution stepContribution, Chunk<I> chunk, long j) {
        while (true) {
            try {
                return doRead(j);
            } catch (Exception e) {
                if (shouldPolicySkip(this.skipPolicy, e, stepContribution.getStepSkipCount())) {
                    stepContribution.incrementReadSkipCount();
                    chunk.skip(e);
                    if (chunk.getErrors().size() >= this.maxSkipsOnRead) {
                        throw new SkipOverflowException("Too many skips on read");
                    }
                    this.logger.debug("Skipping failed input", e);
                } else {
                    if (Boolean.TRUE.equals(this.rollbackClassifier.classify(e))) {
                        throw new NonSkippableReadException("Non-skippable exception during read", e);
                    }
                    this.logger.debug("No-rollback for non-skippable exception (ignored)", e);
                }
            }
        }
    }

    protected final I doRead(long j) throws Exception {
        try {
            getListener().beforeRead();
            I i = (I) this.itemReader.poll(j, TimeUnit.MILLISECONDS);
            if (i != null) {
                getListener().afterRead(i);
            }
            return i;
        } catch (Exception e) {
            if (log.isDebugEnabled()) {
                log.debug("{} : {}", e.getMessage(), e.getClass().getName());
            }
            getListener().onReadError(e);
            throw e;
        }
    }

    private boolean shouldPolicySkip(SkipPolicy skipPolicy, Throwable th, int i) {
        try {
            return skipPolicy.shouldSkip(th, i);
        } catch (RuntimeException e) {
            throw new SkipPolicyFailedException("Fatal exception in SkipPolicy.", e, th);
        } catch (SkipException e2) {
            throw e2;
        }
    }

    public void postProcess(StepContribution stepContribution, Chunk<I> chunk) {
        for (Exception exc : chunk.getErrors()) {
            try {
                getListener().onSkipInRead(exc);
            } catch (RuntimeException e) {
                throw new SkipListenerFailedException("Fatal exception in SkipListener.", e, exc);
            }
        }
    }
}
