package com.jivesoftware.os.tasmo.reference.lib.traverser;

import com.google.common.util.concurrent.ListeningExecutorService;
import com.jivesoftware.os.jive.utils.base.interfaces.CallbackStream;
import com.jivesoftware.os.jive.utils.id.ObjectId;
import com.jivesoftware.os.jive.utils.id.TenantIdAndCentricId;
import com.jivesoftware.os.jive.utils.logger.MetricLogger;
import com.jivesoftware.os.jive.utils.logger.MetricLoggerFactory;
import com.jivesoftware.os.tasmo.reference.lib.RefStreamRequestContext;
import com.jivesoftware.os.tasmo.reference.lib.ReferenceStore;
import com.jivesoftware.os.tasmo.reference.lib.ReferenceWithTimestamp;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;

/* loaded from: input_file:com/jivesoftware/os/tasmo/reference/lib/traverser/BatchingReferenceTraverser.class */
public class BatchingReferenceTraverser implements ReferenceTraverser {
    private static final MetricLogger LOG = MetricLoggerFactory.getLogger();
    private final ReferenceStore referenceStore;
    private final ListeningExecutorService traverserExecutors;
    private final int processUpToNRequestAtATime;
    private final BlockingQueue<RefStreamRequestContext> requestsQueue;
    private final AtomicBoolean running = new AtomicBoolean(false);

    public BatchingReferenceTraverser(ReferenceStore referenceStore, ListeningExecutorService listeningExecutorService, int i, int i2) {
        this.referenceStore = referenceStore;
        this.traverserExecutors = listeningExecutorService;
        this.processUpToNRequestAtATime = i;
        this.requestsQueue = new ArrayBlockingQueue(i2);
    }

    public void startProcessingRequests() throws InterruptedException {
        if (this.running.compareAndSet(false, true)) {
            while (this.running.get()) {
                final ArrayList arrayList = new ArrayList();
                arrayList.add(this.requestsQueue.take());
                this.requestsQueue.drainTo(arrayList, this.processUpToNRequestAtATime);
                if (arrayList.isEmpty()) {
                    return;
                } else {
                    this.traverserExecutors.submit(new Runnable() { // from class: com.jivesoftware.os.tasmo.reference.lib.traverser.BatchingReferenceTraverser.1
                        @Override // java.lang.Runnable
                        public void run() {
                            try {
                                if (arrayList.size() > 1) {
                                    BatchingReferenceTraverser.LOG.debug("Request aggregation size: {}", Integer.valueOf(arrayList.size()));
                                }
                                BatchingReferenceTraverser.this.referenceStore.multiStreamRefs(arrayList);
                            } catch (Exception e) {
                                BatchingReferenceTraverser.LOG.warn("Failed to process request:" + arrayList, e);
                                Iterator it = arrayList.iterator();
                                while (it.hasNext()) {
                                    ((RefStreamRequestContext) it.next()).failure(e);
                                }
                            }
                        }
                    });
                }
            }
        }
    }

    public void stopProcessingRequests() {
        this.running.compareAndSet(true, false);
    }

    @Override // com.jivesoftware.os.tasmo.reference.lib.traverser.ReferenceTraverser
    public void traverseForwardRef(TenantIdAndCentricId tenantIdAndCentricId, Set<String> set, String str, ObjectId objectId, long j, CallbackStream<ReferenceWithTimestamp> callbackStream) throws InterruptedException, Exception {
        RefStreamRequestContext refStreamRequestContext = new RefStreamRequestContext(tenantIdAndCentricId, set, str, objectId, j, false);
        this.requestsQueue.put(refStreamRequestContext);
        refStreamRequestContext.traverse(callbackStream);
    }

    @Override // com.jivesoftware.os.tasmo.reference.lib.traverser.ReferenceTraverser
    public void traversBackRefs(TenantIdAndCentricId tenantIdAndCentricId, Set<String> set, String str, ObjectId objectId, long j, CallbackStream<ReferenceWithTimestamp> callbackStream) throws InterruptedException, Exception {
        RefStreamRequestContext refStreamRequestContext = new RefStreamRequestContext(tenantIdAndCentricId, set, str, objectId, j, true);
        this.requestsQueue.put(refStreamRequestContext);
        refStreamRequestContext.traverse(callbackStream);
    }
}
