package pl.edu.icm.yadda.process.bwmeta.source.cache;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.edu.icm.yadda.client.model.UpdateRequest;
import pl.edu.icm.yadda.common.YaddaException;
import pl.edu.icm.yadda.process.bwmeta.source.IdCacheBuilder;
import pl.edu.icm.yadda.service2.YaddaObjectID;
import pl.edu.icm.yadda.service2.editor.EditEvent;

/* loaded from: input_file:WEB-INF/lib/bwmeta-process-nodes-0.1.2.jar:pl/edu/icm/yadda/process/bwmeta/source/cache/BlockingQueueIdCache.class */
public class BlockingQueueIdCache implements IdCache {
    private final IdCacheBuilder idCacheBuilder;
    private final BlockingQueue<EditEvent> processInput;
    protected final Logger log = LoggerFactory.getLogger(BlockingQueueIdCache.class);
    private final Queue<String> currentIds = new LinkedList();
    private boolean running = true;

    public BlockingQueueIdCache(IdCacheBuilder idCacheBuilder, BlockingQueue<EditEvent> blockingQueue) {
        this.idCacheBuilder = idCacheBuilder;
        this.processInput = blockingQueue;
    }

    @Override // pl.edu.icm.yadda.process.bwmeta.source.cache.IdCache
    public String poll() {
        if (!hasNext()) {
            throw new NoSuchElementException("Cache stopped. No new elements available.");
        }
        String poll = this.currentIds.poll();
        this.log.debug("Returning id:{}", poll);
        return poll;
    }

    @Override // pl.edu.icm.yadda.process.bwmeta.source.cache.IdCache
    public boolean offer(String str) {
        throw new UnsupportedOperationException("Cache is based on blocking queue. Offering data not supported.");
    }

    @Override // pl.edu.icm.yadda.process.bwmeta.source.cache.IdCache
    public int size() {
        throw new UnsupportedOperationException();
    }

    @Override // pl.edu.icm.yadda.process.bwmeta.source.cache.IdCache
    public synchronized boolean hasNext() {
        if (this.running && this.currentIds.isEmpty()) {
            try {
                waitForNewIds();
            } catch (YaddaException e) {
                this.log.error("Error while retrieving new data from EditEvent. Cache stopped.", (Throwable) e);
                this.running = false;
            }
        }
        return this.running;
    }

    private void waitForNewIds() throws YaddaException {
        List<String> waitForUsableEditEvent = waitForUsableEditEvent(this.processInput, true, false);
        if (waitForUsableEditEvent != null) {
            this.currentIds.addAll(waitForUsableEditEvent);
        } else {
            this.running = false;
            this.log.info("Caching Ids from blocking queue stopped.");
        }
    }

    private List<String> waitForUsableEditEvent(BlockingQueue<EditEvent> blockingQueue, boolean z, boolean z2) throws YaddaException {
        ArrayList arrayList = new ArrayList();
        while (arrayList.isEmpty()) {
            this.log.info("Waiting for new EditEvent.");
            try {
                EditEvent take = blockingQueue.take();
                this.log.info("New EditEvent found. Extracting data..");
                YaddaObjectID[] objectIDs = take.getObjectIDs();
                Serializable[] additionalData = take.getAdditionalData();
                if (objectIDs == null) {
                    this.log.error("Edit Event contains no ids to process.");
                } else if (additionalData == null || additionalData.length < objectIDs.length) {
                    this.log.error("Number of Edit Event additional data does not match number of ids. Ids won't be processed.");
                } else {
                    ArrayList arrayList2 = new ArrayList();
                    for (int i = 0; i < objectIDs.length; i++) {
                        String id = objectIDs[i] != null ? objectIDs[i].getId() : null;
                        Serializable serializable = additionalData[i];
                        if (id != null && serializable != null && (serializable instanceof UpdateRequest)) {
                            arrayList2.add(id);
                        }
                    }
                    IdCache collect = this.idCacheBuilder.collect(arrayList2, z, z2);
                    while (collect.hasNext()) {
                        arrayList.add(collect.poll());
                    }
                }
            } catch (InterruptedException e) {
                this.log.error("Ids collecting interrupted.", (Throwable) e);
                return null;
            }
        }
        return arrayList;
    }
}
