package com.redis.spring.batch.memcached;

import com.redis.spring.batch.memcached.impl.ByteArrayTranscoder;
import com.redis.spring.batch.memcached.impl.LruCrawlerMetadumpOperation;
import com.redis.spring.batch.memcached.impl.LruCrawlerMetadumpOperationImpl;
import com.redis.spring.batch.memcached.impl.LruMetadumpEntry;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import net.spy.memcached.MemcachedClient;
import net.spy.memcached.ops.OperationStatus;
import net.spy.memcached.transcoders.Transcoder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemStreamReader;
import org.springframework.batch.item.NonTransientResourceException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;

/* loaded from: input_file:com/redis/spring/batch/memcached/MemcachedItemReader.class */
public class MemcachedItemReader implements ItemStreamReader<MemcachedEntry> {
    public static final int DEFAULT_THREADS = 1;
    public static final int DEFAULT_QUEUE_CAPACITY = 10000;
    public static final int DEFAULT_CHUNK_SIZE = 50;
    private final List<InetSocketAddress> addresses;
    private int chunkSize;
    private int threads;
    private Duration pollTimeout;
    private int queueCapacity;
    private BlockingQueue<MemcachedEntry> queue;
    private BlockingQueue<LruMetadumpEntry> metadumpEntryQueue;
    private MemcachedClient crawlerClient;
    private MemcachedClient processorClient;
    private CountDownLatch latch;
    private ExecutorService executor;
    private List<Future<Long>> futures;
    public static final Duration DEFAULT_POLL_TIMEOUT = Duration.ofMillis(100);
    private static final Log log = LogFactory.getLog(MemcachedItemReader.class);

    /* loaded from: input_file:com/redis/spring/batch/memcached/MemcachedItemReader$MetadumpCallback.class */
    private class MetadumpCallback implements LruCrawlerMetadumpOperation.Callback {
        private final CountDownLatch latch;

        public MetadumpCallback(CountDownLatch countDownLatch) {
            this.latch = countDownLatch;
        }

        @Override // com.redis.spring.batch.memcached.impl.LruCrawlerMetadumpOperation.Callback
        public void gotMetadump(LruMetadumpEntry lruMetadumpEntry) {
            try {
                MemcachedItemReader.this.metadumpEntryQueue.put(lruMetadumpEntry);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("Could not add entry to queue", e);
            }
        }

        public void receivedStatus(OperationStatus operationStatus) {
            if (operationStatus.isSuccess()) {
                return;
            }
            MemcachedItemReader.log.error("Unsuccessful lru_crawler metadump: " + String.valueOf(operationStatus));
        }

        public void complete() {
            this.latch.countDown();
        }
    }

    /* loaded from: input_file:com/redis/spring/batch/memcached/MemcachedItemReader$Processor.class */
    private class Processor implements Callable<Long> {
        private static final Transcoder<byte[]> transcoder = new ByteArrayTranscoder();
        private final int id;

        public Processor(int i) {
            this.id = i;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Long call() throws Exception {
            long j = 0;
            while (true) {
                if (MemcachedItemReader.this.latch.getCount() <= 0 && MemcachedItemReader.this.metadumpEntryQueue.isEmpty()) {
                    return Long.valueOf(j);
                }
                ArrayList<LruMetadumpEntry> arrayList = new ArrayList(MemcachedItemReader.this.chunkSize);
                MemcachedItemReader.this.metadumpEntryQueue.drainTo(arrayList, MemcachedItemReader.this.chunkSize);
                Map bulk = MemcachedItemReader.this.processorClient.getBulk((List) arrayList.stream().map((v0) -> {
                    return v0.getKey();
                }).collect(Collectors.toList()), transcoder);
                for (LruMetadumpEntry lruMetadumpEntry : arrayList) {
                    MemcachedEntry memcachedEntry = new MemcachedEntry();
                    memcachedEntry.setKey(lruMetadumpEntry.getKey());
                    memcachedEntry.setValue((byte[]) bulk.get(lruMetadumpEntry.getKey()));
                    memcachedEntry.setExpiration(lruMetadumpEntry.getExp());
                    MemcachedItemReader.this.queue.put(memcachedEntry);
                    j++;
                }
            }
        }
    }

    public MemcachedItemReader(InetSocketAddress... inetSocketAddressArr) {
        this((List<InetSocketAddress>) Arrays.asList(inetSocketAddressArr));
    }

    public MemcachedItemReader(List<InetSocketAddress> list) {
        this.chunkSize = 50;
        this.threads = 1;
        this.pollTimeout = DEFAULT_POLL_TIMEOUT;
        this.queueCapacity = DEFAULT_QUEUE_CAPACITY;
        this.addresses = list;
    }

    public void setChunkSize(int i) {
        this.chunkSize = i;
    }

    public void setPollTimeout(Duration duration) {
        this.pollTimeout = duration;
    }

    public void setQueueCapacity(int i) {
        this.queueCapacity = i;
    }

    public void setThreads(int i) {
        this.threads = i;
    }

    public synchronized void open(ExecutionContext executionContext) {
        if (this.crawlerClient == null) {
            try {
                this.crawlerClient = client();
            } catch (IOException e) {
                throw new ItemStreamException("Could not initialize crawler client", e);
            }
        }
        if (this.processorClient == null) {
            try {
                this.processorClient = client();
            } catch (IOException e2) {
                throw new ItemStreamException("Could not initialize processor client", e2);
            }
        }
        if (this.queue == null) {
            this.queue = new LinkedBlockingQueue(this.queueCapacity);
        }
        if (this.metadumpEntryQueue == null) {
            this.metadumpEntryQueue = new LinkedBlockingQueue(this.queueCapacity);
        }
        if (this.latch == null) {
            this.latch = this.crawlerClient.broadcastOp((memcachedNode, countDownLatch) -> {
                return new LruCrawlerMetadumpOperationImpl("all", new MetadumpCallback(countDownLatch));
            });
        }
        if (this.executor == null) {
            this.executor = Executors.newFixedThreadPool(this.threads);
        }
        if (this.futures == null) {
            Stream mapToObj = IntStream.range(0, this.threads).mapToObj(i -> {
                return new Processor(i);
            });
            ExecutorService executorService = this.executor;
            Objects.requireNonNull(executorService);
            this.futures = (List) mapToObj.map((v1) -> {
                return r2.submit(v1);
            }).collect(Collectors.toList());
        }
    }

    private MemcachedClient client() throws IOException {
        return new MemcachedClient(this.addresses);
    }

    public synchronized void close() {
        if (this.executor != null) {
            this.executor.shutdownNow();
        }
        this.futures = null;
        this.latch = null;
        this.metadumpEntryQueue = null;
        this.queue = null;
        if (this.processorClient != null) {
            this.processorClient.shutdown();
            this.processorClient = null;
        }
        if (this.crawlerClient != null) {
            this.crawlerClient.shutdown();
            this.crawlerClient = null;
        }
    }

    /* renamed from: read, reason: merged with bridge method [inline-methods] */
    public MemcachedEntry m1read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
        MemcachedEntry poll;
        do {
            poll = this.queue.poll(this.pollTimeout.toMillis(), TimeUnit.MILLISECONDS);
            if (poll != null) {
                break;
            }
        } while (!isDone());
        return poll;
    }

    private boolean isDone() {
        return this.futures == null || this.futures.stream().allMatch((v0) -> {
            return v0.isDone();
        });
    }
}
