package pl.edu.icm.synat.services.process.registry.listener;

import java.util.ArrayList;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.core.task.TaskExecutor;

/* loaded from: input_file:WEB-INF/lib/synat-core-services-impl-1.2-alpha-1.jar:pl/edu/icm/synat/services/process/registry/listener/AsynchronousEventDispatcher.class */
public class AsynchronousEventDispatcher extends DirectEventDispatcher implements EventDispatcher, InitializingBean, DisposableBean {
    protected TaskExecutor asyncExecutor;
    protected BlockingQueue<Event> eventQueue;
    protected final Logger log = LoggerFactory.getLogger(getClass());
    protected int queueCapacity = 10;

    /* loaded from: input_file:WEB-INF/lib/synat-core-services-impl-1.2-alpha-1.jar:pl/edu/icm/synat/services/process/registry/listener/AsynchronousEventDispatcher$FinalizeWorkerEvent.class */
    static class FinalizeWorkerEvent implements Event {
        FinalizeWorkerEvent() {
        }

        @Override // pl.edu.icm.synat.services.process.registry.listener.Event
        public EventType getType() {
            return null;
        }

        @Override // pl.edu.icm.synat.services.process.registry.listener.Event
        public String getProcessId() {
            return null;
        }
    }

    @Override // org.springframework.beans.factory.InitializingBean
    public void afterPropertiesSet() {
        this.eventQueue = new ArrayBlockingQueue(this.queueCapacity);
        this.asyncExecutor.execute(new Runnable() { // from class: pl.edu.icm.synat.services.process.registry.listener.AsynchronousEventDispatcher.1
            @Override // java.lang.Runnable
            public void run() {
                Event take;
                AsynchronousEventDispatcher.this.log.info("starting worker thread");
                while (true) {
                    try {
                        take = AsynchronousEventDispatcher.this.eventQueue.take();
                    } catch (InterruptedException e) {
                        AsynchronousEventDispatcher.this.log.error("exception ocurred while waiting for the data in queue, exitting worker thread...", (Throwable) e);
                    }
                    if (take instanceof FinalizeWorkerEvent) {
                        AsynchronousEventDispatcher.this.log.info("exitting worker thread");
                        return;
                    }
                    AsynchronousEventDispatcher.this.listenersLock.readLock().lock();
                    try {
                        ArrayList<MessageRegistryListener> arrayList = new ArrayList(AsynchronousEventDispatcher.this.listeners);
                        AsynchronousEventDispatcher.this.listenersLock.readLock().unlock();
                        for (MessageRegistryListener messageRegistryListener : arrayList) {
                            if (messageRegistryListener.handlesEvent(take.getType())) {
                                try {
                                    messageRegistryListener.notify(take);
                                } catch (Exception e2) {
                                    AsynchronousEventDispatcher.this.log.error("exception occurred when notifying listener with event: " + take.toString() + ", propagating to error handler", (Throwable) e2);
                                    AsynchronousEventDispatcher.this.errorHandler.handleException(take.getProcessId(), e2);
                                }
                            }
                        }
                    } catch (Throwable th) {
                        AsynchronousEventDispatcher.this.listenersLock.readLock().unlock();
                        throw th;
                        break;
                    }
                }
            }
        });
    }

    @Override // org.springframework.beans.factory.DisposableBean
    public void destroy() throws InterruptedException {
        this.eventQueue.put(new FinalizeWorkerEvent());
    }

    @Override // pl.edu.icm.synat.services.process.registry.listener.DirectEventDispatcher, pl.edu.icm.synat.services.process.registry.listener.EventDispatcher
    public void dispatch(Event event) {
        try {
            this.eventQueue.put(event);
        } catch (InterruptedException e) {
            this.log.error("exception ocurred while waiting for the data in queue", (Throwable) e);
            this.errorHandler.handleException(event.getProcessId(), e);
        }
    }

    @Required
    public void setAsyncExecutor(TaskExecutor taskExecutor) {
        this.asyncExecutor = taskExecutor;
    }

    public void setQueueCapacity(int i) {
        this.queueCapacity = i;
    }
}
