package pl.edu.icm.yadda.process.registry;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.log4j.Priority;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Required;
import org.springframework.integration.core.Message;
import org.springframework.integration.core.MessageHeaders;
import org.springframework.integration.message.GenericMessage;
import pl.edu.icm.yadda.process.ctx.ProcessContext;
import pl.edu.icm.yadda.process.message.payload.GenericProcessOrchestrationPayload;
import pl.edu.icm.yadda.process.message.payload.OrchestrationMessageType;
import pl.edu.icm.yadda.process.message.payload.ProcessFinishingOrchestrationPayload;
import pl.edu.icm.yadda.process.message.payload.ProcessInterruptingOrchestrationPayload;
import pl.edu.icm.yadda.process.message.payload.ProcessStartingOrchestrationPayload;
import pl.edu.icm.yadda.process.registry.listener.IEventDispatcher;
import pl.edu.icm.yadda.process.registry.listener.MessageProcessedEntry;
import pl.edu.icm.yadda.process.registry.listener.MessagesProcessedEvent;
import pl.edu.icm.yadda.process.registry.listener.ProcessFinishedEvent;
import pl.edu.icm.yadda.process.registry.listener.ProcessInterruptedEvent;
import pl.edu.icm.yadda.process.registry.listener.ProcessStartedEvent;

/* loaded from: input_file:WEB-INF/lib/yadda-process-1.10.1.jar:pl/edu/icm/yadda/process/registry/MessageRegistry.class */
public class MessageRegistry implements IMessageRegistry {
    protected IEventDispatcher eventDispatcher;
    protected Object errorChannel;
    Logger log = LoggerFactory.getLogger(getClass());
    protected int unconsumedTreshold = Priority.FATAL_INT;
    protected boolean discardEventsAfterIterrupt = true;
    protected boolean discardInterruptWhenFinished = true;
    protected Map<String, ProcessDataHolder> registry = new HashMap();
    protected ReadWriteLock registryLock = new ReentrantReadWriteLock();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/yadda-process-1.10.1.jar:pl/edu/icm/yadda/process/registry/MessageRegistry$InitializationMessageTuple.class */
    public class InitializationMessageTuple extends OrchestrationMessageTuple {
        protected final int size;

        public InitializationMessageTuple(String str, int i) {
            super(str);
            this.size = i;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/yadda-process-1.10.1.jar:pl/edu/icm/yadda/process/registry/MessageRegistry$MessageEntry.class */
    public class MessageEntry {
        protected final int seqIdx;
        protected final AtomicInteger storedCount = new AtomicInteger(1);

        public MessageEntry(int i) {
            this.seqIdx = i;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/yadda-process-1.10.1.jar:pl/edu/icm/yadda/process/registry/MessageRegistry$OrchestrationMessageTuple.class */
    public class OrchestrationMessageTuple {
        protected final String msgId;
        protected final AtomicInteger instancesCount = new AtomicInteger(1);

        public OrchestrationMessageTuple(String str) {
            this.msgId = str;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:WEB-INF/lib/yadda-process-1.10.1.jar:pl/edu/icm/yadda/process/registry/MessageRegistry$ProcessDataHolder.class */
    public class ProcessDataHolder {
        protected ProcessContext context;
        protected volatile InitializationMessageTuple initMsg = null;
        protected volatile OrchestrationMessageTuple finalMsg = null;
        protected volatile OrchestrationMessageTuple interruptMsg = null;
        protected AtomicBoolean isGarbage = new AtomicBoolean(false);
        protected AtomicBoolean wasFinishedEventSent = new AtomicBoolean(false);
        protected AtomicBoolean wasInterruptedEventSent = new AtomicBoolean(false);
        protected AtomicInteger seqIdx = new AtomicInteger(-1);
        protected Map<String, MessageEntry> msgMap = Collections.synchronizedMap(new HashMap());

        protected ProcessDataHolder(ProcessContext processContext) {
            this.context = processContext;
        }
    }

    @Override // pl.edu.icm.yadda.process.registry.IMessageRegistry
    public <T> Message<T> create(String str, T t, String str2) throws MessageRegistryException {
        if (str2 == null || str == null) {
            throw new MessageRegistryException("either process id or message id was undefined. Got message id: " + str + ", process id " + str2);
        }
        this.registryLock.readLock().lock();
        try {
            ProcessDataHolder processDataHolder = this.registry.get(str2);
            if (processDataHolder == null) {
                throw new MessageRegistryException("no process entry for procId: " + str2 + ", probably PROCESS_STARTED message was not handled before");
            }
            if (processDataHolder.msgMap.get(str) != null) {
                throw new MessageRegistryException("message of id " + str + " was already registered!");
            }
            if (processDataHolder.msgMap.size() >= this.unconsumedTreshold) {
                throw new MessageRegistryException("unconsumed treshold of " + this.unconsumedTreshold + " exceeded!");
            }
            int incrementAndGet = processDataHolder.seqIdx.incrementAndGet();
            HashMap hashMap = new HashMap();
            hashMap.put(MessageRegistryConstants.MSG_HEADER_ID, str);
            hashMap.put(MessageRegistryConstants.MSG_HEADER_CTX, processDataHolder.context);
            hashMap.put(MessageHeaders.ERROR_CHANNEL, this.errorChannel);
            hashMap.put(MessageRegistryConstants.MSG_HEADER_SEQ_IDX, Integer.valueOf(incrementAndGet));
            GenericMessage genericMessage = new GenericMessage(t, hashMap);
            processDataHolder.msgMap.put(str, new MessageEntry(incrementAndGet));
            this.registryLock.readLock().unlock();
            return genericMessage;
        } catch (Throwable th) {
            this.registryLock.readLock().unlock();
            throw th;
        }
    }

    @Override // pl.edu.icm.yadda.process.registry.IMessageRegistry
    public Message<GenericProcessOrchestrationPayload> initialize(ProcessContext processContext, int i) throws MessageRegistryException {
        this.registryLock.writeLock().lock();
        try {
            if (this.registry.containsKey(processContext.getProcessId())) {
                return null;
            }
            ProcessDataHolder processDataHolder = new ProcessDataHolder(processContext);
            String str = processContext.getProcessId() + '_' + OrchestrationMessageType.PROCESS_STARTING;
            processDataHolder.initMsg = new InitializationMessageTuple(str, i);
            this.registry.put(processContext.getProcessId(), processDataHolder);
            HashMap hashMap = new HashMap();
            hashMap.put(MessageRegistryConstants.MSG_HEADER_ID, str);
            hashMap.put(MessageRegistryConstants.MSG_HEADER_CTX, processContext);
            hashMap.put(MessageHeaders.ERROR_CHANNEL, this.errorChannel);
            GenericMessage genericMessage = new GenericMessage(new ProcessStartingOrchestrationPayload(processContext, i), hashMap);
            this.registryLock.writeLock().unlock();
            return genericMessage;
        } finally {
            this.registryLock.writeLock().unlock();
        }
    }

    @Override // pl.edu.icm.yadda.process.registry.IMessageRegistry
    public Message<GenericProcessOrchestrationPayload> interrupt(String str) throws MessageRegistryException {
        this.registryLock.readLock().lock();
        try {
            ProcessDataHolder processDataHolder = this.registry.get(str);
            if (processDataHolder == null) {
                throw new MessageRegistryException("process " + str + " was not registered in registry before!");
            }
            if (processDataHolder.interruptMsg != null) {
                return null;
            }
            String str2 = str + '_' + OrchestrationMessageType.PROCESS_INTERRUPTING;
            processDataHolder.interruptMsg = new OrchestrationMessageTuple(str2);
            HashMap hashMap = new HashMap();
            hashMap.put(MessageRegistryConstants.MSG_HEADER_ID, str2);
            hashMap.put(MessageRegistryConstants.MSG_HEADER_CTX, processDataHolder.context);
            hashMap.put(MessageHeaders.ERROR_CHANNEL, this.errorChannel);
            GenericMessage genericMessage = new GenericMessage(new ProcessInterruptingOrchestrationPayload(processDataHolder.context), hashMap);
            this.registryLock.readLock().unlock();
            return genericMessage;
        } finally {
            this.registryLock.readLock().unlock();
        }
    }

    @Override // pl.edu.icm.yadda.process.registry.IMessageRegistry
    public Message<GenericProcessOrchestrationPayload> finish(String str) throws MessageRegistryException {
        this.registryLock.readLock().lock();
        try {
            ProcessDataHolder processDataHolder = this.registry.get(str);
            if (processDataHolder == null) {
                throw new MessageRegistryException("process " + str + " was not registered in registry before!");
            }
            if (processDataHolder.finalMsg != null) {
                return null;
            }
            String str2 = str + '_' + OrchestrationMessageType.PROCESS_FINISHING;
            processDataHolder.finalMsg = new OrchestrationMessageTuple(str2);
            HashMap hashMap = new HashMap();
            hashMap.put(MessageRegistryConstants.MSG_HEADER_ID, str2);
            hashMap.put(MessageRegistryConstants.MSG_HEADER_CTX, processDataHolder.context);
            hashMap.put(MessageHeaders.ERROR_CHANNEL, this.errorChannel);
            GenericMessage genericMessage = new GenericMessage(new ProcessFinishingOrchestrationPayload(processDataHolder.context), hashMap);
            this.registryLock.readLock().unlock();
            return genericMessage;
        } finally {
            this.registryLock.readLock().unlock();
        }
    }

    protected boolean consumeInternal(String str, String str2, ProcessDataHolder processDataHolder) throws MessageRegistryException {
        MessageEntry messageEntry = processDataHolder.msgMap.get(str);
        if (messageEntry != null) {
            if (messageEntry.storedCount.decrementAndGet() > 0) {
                return false;
            }
            processDataHolder.msgMap.remove(str);
            boolean checkSendMessageConsumptionEvent = checkSendMessageConsumptionEvent(processDataHolder);
            if (checkSendMessageConsumptionEvent) {
                this.eventDispatcher.dispatch(new MessagesProcessedEvent(str2, str, messageEntry.seqIdx));
            }
            if (processDataHolder.finalMsg == null || processDataHolder.finalMsg.instancesCount.get() != 0 || !processDataHolder.msgMap.isEmpty()) {
                return true;
            }
            processDataHolder.isGarbage.set(true);
            if (checkSendMessageConsumptionEvent) {
                this.eventDispatcher.dispatch(new ProcessFinishedEvent(str2, processDataHolder.context));
            }
            processDataHolder.wasFinishedEventSent.set(true);
            return true;
        }
        if (processDataHolder.initMsg != null && processDataHolder.initMsg.msgId.equals(str)) {
            if (processDataHolder.initMsg.instancesCount.decrementAndGet() > 0) {
                return false;
            }
            this.eventDispatcher.dispatch(new ProcessStartedEvent(str2, processDataHolder.context, processDataHolder.initMsg.size));
            return true;
        }
        if (processDataHolder.interruptMsg != null && processDataHolder.interruptMsg.msgId.equals(str)) {
            if (processDataHolder.interruptMsg.instancesCount.decrementAndGet() > 0) {
                return false;
            }
            if (this.discardInterruptWhenFinished && processDataHolder.wasFinishedEventSent.get()) {
                this.log.warn("FINISHED event was already sent, discarding INTERRUPTED event");
                return true;
            }
            this.eventDispatcher.dispatch(new ProcessInterruptedEvent(str2, processDataHolder.context));
            processDataHolder.wasInterruptedEventSent.set(true);
            processDataHolder.isGarbage.set(true);
            return true;
        }
        if (processDataHolder.finalMsg == null || !processDataHolder.finalMsg.msgId.equals(str)) {
            throw new MessageRegistryException("message " + str + " was not registered before!");
        }
        if (processDataHolder.finalMsg.instancesCount.decrementAndGet() > 0) {
            return false;
        }
        if (!processDataHolder.msgMap.isEmpty()) {
            return true;
        }
        if (checkSendMessageConsumptionEvent(processDataHolder)) {
            this.eventDispatcher.dispatch(new ProcessFinishedEvent(str2, processDataHolder.context));
        }
        processDataHolder.wasFinishedEventSent.set(true);
        processDataHolder.isGarbage.set(true);
        return true;
    }

    @Override // pl.edu.icm.yadda.process.registry.IMessageRegistry
    public boolean consume(String str, String str2) throws MessageRegistryException {
        if (str == null || str2 == null) {
            throw new MessageRegistryException("either process id or message id was undefined! Got processId: " + str + ", messageId: " + str2);
        }
        this.registryLock.readLock().lock();
        try {
            ProcessDataHolder processDataHolder = this.registry.get(str);
            if (processDataHolder == null) {
                throw new MessageRegistryException("no process entry for procId: " + str + ", probably PROCESS_STARTED message was not handled before");
            }
            boolean consumeInternal = consumeInternal(str2, str, processDataHolder);
            this.registryLock.readLock().unlock();
            return consumeInternal;
        } catch (Throwable th) {
            this.registryLock.readLock().unlock();
            throw th;
        }
    }

    @Override // pl.edu.icm.yadda.process.registry.IMessageRegistry
    public boolean[] consume(String str, String[] strArr) throws MessageRegistryException {
        if (strArr == null || strArr.length == 0) {
            return new boolean[0];
        }
        if (str == null) {
            throw new MessageRegistryException("Process id was undefined!");
        }
        this.registryLock.readLock().lock();
        try {
            ProcessDataHolder processDataHolder = this.registry.get(str);
            if (processDataHolder == null) {
                throw new MessageRegistryException("no process entry for procId: " + str + ", probably PROCESS_STARTED message was not handled before");
            }
            ArrayList arrayList = new ArrayList();
            boolean[] zArr = new boolean[strArr.length];
            for (int i = 0; i < strArr.length; i++) {
                String str2 = strArr[i];
                MessageEntry messageEntry = processDataHolder.msgMap.get(str2);
                if (messageEntry == null) {
                    throw new MessageRegistryException("message " + str2 + " was not registered before!");
                }
                if (messageEntry.storedCount.decrementAndGet() <= 0) {
                    processDataHolder.msgMap.remove(str2);
                    arrayList.add(new MessageProcessedEntry(str2, messageEntry.seqIdx));
                    zArr[i] = true;
                } else {
                    zArr[i] = false;
                }
            }
            boolean checkSendMessageConsumptionEvent = checkSendMessageConsumptionEvent(processDataHolder);
            if (checkSendMessageConsumptionEvent) {
                this.eventDispatcher.dispatch(new MessagesProcessedEvent(str, (MessageProcessedEntry[]) arrayList.toArray(new MessageProcessedEntry[arrayList.size()])));
            }
            if (processDataHolder.finalMsg != null && processDataHolder.finalMsg.instancesCount.get() == 0 && processDataHolder.msgMap.isEmpty()) {
                processDataHolder.isGarbage.set(true);
                if (checkSendMessageConsumptionEvent) {
                    this.eventDispatcher.dispatch(new ProcessFinishedEvent(str, processDataHolder.context));
                }
                processDataHolder.wasFinishedEventSent.set(true);
            }
            return zArr;
        } finally {
            this.registryLock.readLock().unlock();
        }
    }

    protected boolean checkSendMessageConsumptionEvent(ProcessDataHolder processDataHolder) {
        return (this.discardEventsAfterIterrupt && processDataHolder.wasInterruptedEventSent.get()) ? false : true;
    }

    @Override // pl.edu.icm.yadda.process.registry.IMessageRegistry
    public void increment(String str, String str2, int i) throws MessageRegistryException {
        if (str == null || str2 == null) {
            throw new MessageRegistryException("either process id or message id was undefined! Got processId: " + str + ", messageId: " + str2);
        }
        this.registryLock.readLock().lock();
        try {
            ProcessDataHolder processDataHolder = this.registry.get(str);
            if (processDataHolder == null) {
                throw new MessageRegistryException("no process entry for procId: " + str + ", probably PROCESS_STARTED message was not handled before");
            }
            MessageEntry messageEntry = processDataHolder.msgMap.get(str2);
            if (messageEntry != null) {
                messageEntry.storedCount.addAndGet(i);
            } else if (processDataHolder.initMsg != null && processDataHolder.initMsg.msgId.equals(str2)) {
                processDataHolder.initMsg.instancesCount.addAndGet(i);
            } else if (processDataHolder.interruptMsg != null && processDataHolder.interruptMsg.msgId.equals(str2)) {
                processDataHolder.interruptMsg.instancesCount.addAndGet(i);
            } else {
                if (processDataHolder.finalMsg == null || !processDataHolder.finalMsg.msgId.equals(str2)) {
                    throw new MessageRegistryException("message " + str2 + " was not registered before!");
                }
                processDataHolder.finalMsg.instancesCount.addAndGet(i);
            }
        } finally {
            this.registryLock.readLock().unlock();
        }
    }

    public void cleanup() {
        this.registryLock.writeLock().lock();
        try {
            Iterator<Map.Entry<String, ProcessDataHolder>> it = this.registry.entrySet().iterator();
            while (it.hasNext()) {
                if (it.next().getValue().isGarbage.get()) {
                    it.remove();
                }
            }
        } finally {
            this.registryLock.writeLock().unlock();
        }
    }

    public void setUnconsumedTreshold(int i) {
        this.unconsumedTreshold = i;
    }

    @Required
    public void setEventDispatcher(IEventDispatcher iEventDispatcher) {
        this.eventDispatcher = iEventDispatcher;
    }

    @Required
    public void setErrorChannel(Object obj) {
        this.errorChannel = obj;
    }

    public void setDiscardEventsAfterIterrupt(boolean z) {
        this.discardEventsAfterIterrupt = z;
    }

    public void setDiscardInterruptWhenFinished(boolean z) {
        this.discardInterruptWhenFinished = z;
    }
}
