package com.redis.spring.batch.memcached.reader;

import com.redis.spring.batch.item.AbstractQueuePollableItemReader;
import com.redis.spring.batch.memcached.MemcachedException;
import com.redis.spring.batch.memcached.reader.LruCrawlerMetadumpOperation;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
import java.util.function.Supplier;
import net.spy.memcached.MemcachedClient;
import net.spy.memcached.ops.OperationStatus;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:com/redis/spring/batch/memcached/reader/LruMetadumpItemReader.class */
public class LruMetadumpItemReader extends AbstractQueuePollableItemReader<LruMetadumpEntry> {
    private final Supplier<MemcachedClient> clientSupplier;
    private MemcachedClient client;
    private CountDownLatch latch;

    /* loaded from: input_file:com/redis/spring/batch/memcached/reader/LruMetadumpItemReader$MetadumpCallback.class */
    private static class MetadumpCallback implements LruCrawlerMetadumpOperation.Callback {
        private final Log log = LogFactory.getLog(getClass());
        private final CountDownLatch latch;
        private final Consumer<LruMetadumpEntry> consumer;

        public MetadumpCallback(CountDownLatch countDownLatch, Consumer<LruMetadumpEntry> consumer) {
            this.latch = countDownLatch;
            this.consumer = consumer;
        }

        @Override // com.redis.spring.batch.memcached.reader.LruCrawlerMetadumpOperation.Callback
        public void gotMetadump(LruMetadumpEntry lruMetadumpEntry) {
            this.consumer.accept(lruMetadumpEntry);
        }

        public void receivedStatus(OperationStatus operationStatus) {
            if (operationStatus.isSuccess()) {
                return;
            }
            this.log.error("Unsuccessful lru_crawler metadump: " + String.valueOf(operationStatus));
        }

        public void complete() {
            this.latch.countDown();
        }
    }

    public LruMetadumpItemReader(Supplier<MemcachedClient> supplier) {
        this.clientSupplier = supplier;
    }

    protected synchronized void doOpen() throws Exception {
        super.doOpen();
        if (this.client == null) {
            this.client = this.clientSupplier.get();
            this.latch = this.client.broadcastOp((memcachedNode, countDownLatch) -> {
                return new LruCrawlerMetadumpOperationImpl("all", new MetadumpCallback(countDownLatch, this::safePut));
            });
        }
    }

    private void safePut(LruMetadumpEntry lruMetadumpEntry) {
        try {
            this.queue.put(lruMetadumpEntry);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new MemcachedException("Interrupted while trying to add entry to queue", e);
        }
    }

    protected synchronized void doClose() throws Exception {
        if (this.client != null) {
            this.client.shutdown();
            this.client = null;
        }
        super.doClose();
    }

    public boolean isComplete() {
        return this.latch.getCount() == 0 && this.queue.isEmpty();
    }
}
