package com.redis.spring.batch.reader;

import com.redis.spring.batch.common.KeyValue;
import com.redis.spring.batch.common.Utils;
import io.micrometer.core.instrument.Tag;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.support.AbstractItemStreamItemWriter;

/* loaded from: input_file:com/redis/spring/batch/reader/Enqueuer.class */
public class Enqueuer<K, T extends KeyValue<K>> extends AbstractItemStreamItemWriter<K> {
    private final Log log = LogFactory.getLog(getClass());
    private final ItemProcessor<List<? extends K>, List<T>> valueReader;
    private final QueueOptions options;
    private final BlockingQueue<T> queue;

    public Enqueuer(ItemProcessor<List<? extends K>, List<T>> itemProcessor, QueueOptions queueOptions) {
        this.valueReader = itemProcessor;
        this.options = queueOptions;
        this.queue = new LinkedBlockingQueue(queueOptions.getCapacity());
    }

    public void open(ExecutionContext executionContext) {
        Utils.createGaugeCollectionSize("reader.queue.size", this.queue, new Tag[0]);
        super.open(executionContext);
        if (this.valueReader instanceof ItemStream) {
            this.valueReader.open(executionContext);
        }
    }

    public void update(ExecutionContext executionContext) {
        super.update(executionContext);
        if (this.valueReader instanceof ItemStream) {
            this.valueReader.update(executionContext);
        }
    }

    public void close() {
        if (!this.queue.isEmpty()) {
            this.log.warn("Closing with items still in queue");
        }
        if (this.valueReader instanceof ItemStream) {
            this.valueReader.close();
        }
        super.close();
    }

    public T poll() throws InterruptedException {
        return this.queue.poll(this.options.getPollTimeout().toMillis(), TimeUnit.MILLISECONDS);
    }

    public T poll(long j, TimeUnit timeUnit) throws InterruptedException {
        return this.queue.poll(j, timeUnit);
    }

    public void write(List<? extends K> list) throws Exception {
        for (KeyValue keyValue : (List) this.valueReader.process(list)) {
            this.queue.removeIf(keyValue2 -> {
                return keyValue2.getKey().equals(keyValue.getKey());
            });
            this.queue.put(keyValue);
        }
    }

    public void drainTo(List<T> list, int i) {
        this.queue.drainTo(list, i);
    }
}
