package uk.org.retep.kernel.messaging.impl;

import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import uk.org.retep.kernel.messaging.Consumer;
import uk.org.retep.kernel.messaging.Destination;
import uk.org.retep.kernel.messaging.MailBox;
import uk.org.retep.kernel.messaging.Message;
import uk.org.retep.kernel.messaging.MessagingException;
import uk.org.retep.kernel.messaging.ParentConsumerListener;
import uk.org.retep.util.collections.ConcurrencySupport;
import uk.org.retep.util.thread.GlobalThreadPool;

/* loaded from: input_file:uk/org/retep/kernel/messaging/impl/MailBoxImpl.class */
public abstract class MailBoxImpl<T extends Collection<Consumer>> extends ConcurrencySupport implements MailBox {
    private final Destination destination;
    private final boolean clustered;
    private final int maxSize;
    private final T members;
    private final BlockingQueue<Message> queue;
    private Consumer[] memberCache = new Consumer[0];
    private Thread thread;
    private volatile boolean running;

    /* loaded from: input_file:uk/org/retep/kernel/messaging/impl/MailBoxImpl$Processor.class */
    private class Processor implements Runnable {
        private Processor() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (MailBoxImpl.this.running) {
                try {
                    Message<T> message = (Message) MailBoxImpl.this.queue.take();
                    if (message != null) {
                        MailBoxImpl.this.consumeMessageImpl(message);
                    }
                } catch (InterruptedException e) {
                } catch (Throwable th) {
                    MailBoxImpl.this.getLog().error("Problem while processing message queue", th);
                }
            }
        }
    }

    public MailBoxImpl(T t, Destination destination, int i, boolean z) {
        this.members = t;
        this.destination = destination;
        this.maxSize = i;
        this.clustered = z;
        this.queue = new LinkedBlockingQueue(i);
    }

    @PostConstruct
    public void start() {
        if (this.thread == null) {
            this.running = true;
            this.thread = GlobalThreadPool.createDaemonThread("KernelMessaging", new Processor());
            this.thread.start();
            getLog().info("Started %s", new Object[]{this.destination});
        }
    }

    @PreDestroy
    public void stop() {
        if (this.thread != null) {
            getLog().info("Stopping %s", new Object[]{this.destination});
            this.running = false;
            this.thread.interrupt();
            this.thread = null;
        }
    }

    @Override // uk.org.retep.kernel.messaging.MailBox
    public final boolean isClustered() {
        return this.clustered;
    }

    @Override // uk.org.retep.kernel.messaging.MailBox
    public final Destination getDestination() {
        return this.destination;
    }

    @Override // uk.org.retep.kernel.messaging.MailBox
    public final boolean addMember(Consumer consumer) {
        writeLock().lock();
        try {
            if (!this.members.add(consumer)) {
                writeLock().unlock();
                return false;
            }
            this.memberCache = (Consumer[]) this.members.toArray(new Consumer[this.members.size()]);
            if (consumer instanceof ParentConsumerListener) {
                ((ParentConsumerListener) consumer).setParentConsumer(this);
            }
            return true;
        } finally {
            writeLock().unlock();
        }
    }

    @Override // uk.org.retep.kernel.messaging.MailBox
    public final boolean removeMember(Consumer consumer) {
        writeLock().lock();
        try {
            if (!this.members.remove(consumer)) {
                writeLock().unlock();
                return false;
            }
            this.memberCache = (Consumer[]) this.members.toArray(new Consumer[this.members.size()]);
            if (consumer instanceof ParentConsumerListener) {
                ((ParentConsumerListener) consumer).setParentConsumer(null);
            }
            return true;
        } finally {
            writeLock().unlock();
        }
    }

    @Override // uk.org.retep.kernel.messaging.MailBox
    public final boolean containsMember(Consumer consumer) {
        writeLock().lock();
        try {
            boolean contains = this.members.contains(consumer);
            writeLock().unlock();
            return contains;
        } catch (Throwable th) {
            writeLock().unlock();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final Consumer[] getConsumers() {
        readLock().lock();
        try {
            Consumer[] consumerArr = this.memberCache;
            readLock().unlock();
            return consumerArr;
        } catch (Throwable th) {
            readLock().unlock();
            throw th;
        }
    }

    @Override // uk.org.retep.kernel.messaging.Consumer
    public final <T> void consumeMessage(Message<T> message) {
        if (!this.queue.offer(message)) {
            throw new MessagingException(getDestination().toString() + " is full");
        }
    }

    protected abstract <T> void consumeMessageImpl(Message<T> message);
}
