package com.redis.spring.batch.step;

import com.redis.spring.batch.reader.PollableItemReader;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.observability.BatchMetrics;
import org.springframework.batch.core.step.item.SimpleChunkProvider;
import org.springframework.batch.core.step.item.SkipOverflowException;
import org.springframework.batch.core.step.skip.SkipListenerFailedException;
import org.springframework.batch.item.Chunk;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.repeat.RepeatOperations;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.util.Assert;

/* loaded from: input_file:com/redis/spring/batch/step/FlushingChunkProvider.class */
public class FlushingChunkProvider<I> extends SimpleChunkProvider<I> {
    public static final Duration DEFAULT_FLUSH_INTERVAL = Duration.ofMillis(50);
    public static final Duration DEFAULT_IDLE_TIMEOUT = Duration.ofMillis(Long.MAX_VALUE);
    private final RepeatOperations repeatOperations;
    private Duration interval;
    private long idleTimeout;
    private long lastActivity;

    public FlushingChunkProvider(ItemReader<? extends I> itemReader, RepeatOperations repeatOperations) {
        super(itemReader, repeatOperations);
        this.interval = DEFAULT_FLUSH_INTERVAL;
        this.idleTimeout = DEFAULT_IDLE_TIMEOUT.toMillis();
        this.lastActivity = 0L;
        Assert.isTrue(itemReader instanceof PollableItemReader, "Reader must extend PollableItemReader");
        this.repeatOperations = repeatOperations;
    }

    public void setInterval(Duration duration) {
        this.interval = duration;
    }

    public void setIdleTimeout(Duration duration) {
        Assert.notNull(duration, "Idle timeout must not be null");
        this.idleTimeout = duration.toMillis();
    }

    private void stopFlushTimer(Timer.Sample sample, StepExecution stepExecution, String str) {
        sample.stop(BatchMetrics.createTimer(Metrics.globalRegistry, "item.read", "Item reading duration", new Tag[]{Tag.of("job.name", stepExecution.getJobExecution().getJobInstance().getJobName()), Tag.of("step.name", stepExecution.getStepName()), Tag.of("status", str)}));
    }

    public Chunk<I> provide(StepContribution stepContribution) {
        long currentTimeMillis = System.currentTimeMillis();
        if (this.lastActivity == 0) {
            this.lastActivity = currentTimeMillis;
        }
        Chunk<I> chunk = new Chunk<>(new Object[0]);
        this.repeatOperations.iterate(repeatContext -> {
            long millis = this.interval.toMillis() - millisSince(currentTimeMillis);
            if (millis < 0) {
                return RepeatStatus.FINISHED;
            }
            Timer.Sample start = Timer.start(Metrics.globalRegistry);
            try {
                I read = read(stepContribution, chunk, millis);
                if (read == null) {
                    if (millisSince(this.lastActivity) > this.idleTimeout) {
                        chunk.setEnd();
                    }
                    return RepeatStatus.CONTINUABLE;
                }
                stopFlushTimer(start, stepContribution.getStepExecution(), "SUCCESS");
                chunk.add(read);
                stepContribution.incrementReadCount();
                this.lastActivity = System.currentTimeMillis();
                return RepeatStatus.CONTINUABLE;
            } catch (SkipOverflowException e) {
                stopFlushTimer(start, stepContribution.getStepExecution(), "FAILURE");
                return RepeatStatus.FINISHED;
            }
        });
        return chunk;
    }

    private long millisSince(long j) {
        return System.currentTimeMillis() - j;
    }

    protected I read(StepContribution stepContribution, Chunk<I> chunk, long j) throws InterruptedException {
        try {
            return doRead(j);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw e;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final I doRead(long j) throws InterruptedException {
        try {
            getListener().beforeRead();
            I i = (I) this.itemReader.poll(j, TimeUnit.MILLISECONDS);
            if (i != null) {
                getListener().afterRead(i);
            }
            return i;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw e;
        } catch (Exception e2) {
            getListener().onReadError(e2);
            throw e2;
        }
    }

    public void postProcess(StepContribution stepContribution, Chunk<I> chunk) {
        for (Exception exc : chunk.getErrors()) {
            try {
                getListener().onSkipInRead(exc);
            } catch (RuntimeException e) {
                throw new SkipListenerFailedException("Fatal exception in SkipListener.", e, exc);
            }
        }
    }
}
