package pl.edu.icm.yadda.process.cp;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import pl.edu.icm.yadda.process.cp.persist.CheckpointPersisterException;
import pl.edu.icm.yadda.process.cp.persist.ICheckpointPersister;
import pl.edu.icm.yadda.process.handler.IErrorHandler;
import pl.edu.icm.yadda.process.registry.listener.EventListenerException;
import pl.edu.icm.yadda.process.registry.listener.EventType;
import pl.edu.icm.yadda.process.registry.listener.GenericProcessEvent;
import pl.edu.icm.yadda.process.registry.listener.IEvent;
import pl.edu.icm.yadda.process.registry.listener.IMessageRegistryListener;
import pl.edu.icm.yadda.process.registry.listener.MessageProcessedEntry;
import pl.edu.icm.yadda.process.registry.listener.MessagesProcessedEvent;
import pl.edu.icm.yadda.process.registry.listener.ProcessStartedEvent;
import pl.edu.icm.yadda.process.stats.ProcessingStatsException;

/* loaded from: input_file:WEB-INF/lib/yadda-process-1.11.6-SNAPSHOT.jar:pl/edu/icm/yadda/process/cp/CheckpointManager.class */
public class CheckpointManager implements IMessageRegistryListener {
    public static final String AUX_PARAM_ACTIVATION_TRESHOLD = "cp:activationTreshold";
    public static final String AUX_PARAM_PACKAGE_SIZE = "cp:packageSize";
    public static final String AUX_PARAM_PACKAGE_SIZE_PERC_OF_TOTAL = "cp:packageSizePercentageOfTotal";
    protected final Logger log = LoggerFactory.getLogger(getClass());
    protected final Map<String, ProcessCheckpointsHolder> entriesMap = new HashMap();
    protected final ReadWriteLock entriesLock = new ReentrantReadWriteLock();
    protected int activationTreshold;
    protected int packageSizePercentageOfTotal;
    protected IErrorHandler errorHandler;
    protected ICheckpointPersister persister;

    @Override // pl.edu.icm.yadda.process.registry.listener.IMessageRegistryListener
    public boolean handlesEvent(EventType eventType) {
        return eventType.equals(EventType.MESSAGE_CONSUMED) || eventType.equals(EventType.PROCESS_STARTED) || eventType.equals(EventType.PROCESS_INTERRUPTED) || eventType.equals(EventType.PROCESS_FINISHED);
    }

    @Override // pl.edu.icm.yadda.process.registry.listener.IMessageRegistryListener
    public void notify(IEvent iEvent) throws EventListenerException {
        switch (iEvent.getType()) {
            case PROCESS_STARTED:
                handleInitialization((ProcessStartedEvent) iEvent);
                return;
            case PROCESS_FINISHED:
                handleFinalization(((GenericProcessEvent) iEvent).getProcessId());
                return;
            case PROCESS_INTERRUPTED:
                handleFinalization(((GenericProcessEvent) iEvent).getProcessId());
                return;
            case MESSAGE_CONSUMED:
                handleMessageConsumption((MessagesProcessedEvent) iEvent);
                return;
            default:
                throw new EventListenerException(iEvent, false, "unsupported event: " + iEvent.getType());
        }
    }

    protected void handleInitialization(ProcessStartedEvent processStartedEvent) {
        this.log.info("handling initialization of process " + processStartedEvent.getProcessId());
        Integer num = (Integer) processStartedEvent.getContext().getAuxParam(AUX_PARAM_ACTIVATION_TRESHOLD);
        if (num == null) {
            num = Integer.valueOf(this.activationTreshold);
        }
        if (processStartedEvent.getSize() < num.intValue()) {
            this.log.warn("Checkpointing disabled! Total number of elements: " + processStartedEvent.getSize() + " for process " + processStartedEvent.getProcessId() + " does not exceed treshold value " + num);
            this.entriesLock.writeLock().lock();
            try {
                this.entriesMap.put(processStartedEvent.getProcessId(), new ProcessCheckpointsHolder(0, processStartedEvent.getContext()));
                this.entriesLock.writeLock().unlock();
                return;
            } finally {
            }
        }
        Integer num2 = (Integer) processStartedEvent.getContext().getAuxParam(AUX_PARAM_PACKAGE_SIZE);
        if (num2 == null) {
            Integer num3 = (Integer) processStartedEvent.getContext().getAuxParam(AUX_PARAM_PACKAGE_SIZE_PERC_OF_TOTAL);
            num2 = Integer.valueOf(num3 != null ? (num3.intValue() * processStartedEvent.getSize()) / 100 : (this.packageSizePercentageOfTotal * processStartedEvent.getSize()) / 100);
        }
        this.entriesLock.writeLock().lock();
        try {
            if (num2.intValue() <= 0) {
                this.log.warn("Checkpointing disabled! Checkpoint interval was set to " + num2);
            }
            this.entriesMap.put(processStartedEvent.getProcessId(), new ProcessCheckpointsHolder(num2.intValue(), processStartedEvent.getContext()));
            this.entriesLock.writeLock().unlock();
        } finally {
        }
    }

    protected void handleFinalization(String str) {
        this.log.info("handling finalization of process " + str);
        this.entriesLock.writeLock().lock();
        try {
            this.entriesMap.remove(str);
            this.entriesLock.writeLock().unlock();
        } catch (Throwable th) {
            this.entriesLock.writeLock().unlock();
            throw th;
        }
    }

    protected void handleMessageConsumption(MessagesProcessedEvent messagesProcessedEvent) throws EventListenerException {
        this.entriesLock.readLock().lock();
        try {
            ProcessCheckpointsHolder processCheckpointsHolder = this.entriesMap.get(messagesProcessedEvent.getProcessId());
            this.entriesLock.readLock().unlock();
            if (processCheckpointsHolder == null) {
                throw new EventListenerException((IEvent) messagesProcessedEvent, false, "Invalid state! Unable to find checkpoints holder for process: " + messagesProcessedEvent.getProcessId() + ", should be initialized at initializing phase!");
            }
            if (processCheckpointsHolder.getPackageSize() == 0) {
                this.log.info("packageSize=0, checkpointing disabled!");
                return;
            }
            for (MessageProcessedEntry messageProcessedEntry : messagesProcessedEvent.getMessages()) {
                int seqIdx = messageProcessedEntry.getSeqIdx() / processCheckpointsHolder.getPackageSize();
                int seqIdx2 = messageProcessedEntry.getSeqIdx() % processCheckpointsHolder.getPackageSize();
                synchronized (processCheckpointsHolder.getChunks()) {
                    if (seqIdx >= processCheckpointsHolder.getChunks().size()) {
                        while (seqIdx >= processCheckpointsHolder.getChunks().size()) {
                            processCheckpointsHolder.getChunks().add(new CheckpointChunk(processCheckpointsHolder.getPackageSize()));
                        }
                    }
                }
                CheckpointChunk checkpointChunk = processCheckpointsHolder.getChunks().get(seqIdx);
                if (seqIdx2 == processCheckpointsHolder.getPackageSize() - 1) {
                    checkpointChunk.setCheckpointCandidate(messageProcessedEntry.getMessageId(), messageProcessedEntry.getSeqIdx(), processCheckpointsHolder.cloneProcessContext());
                } else {
                    checkpointChunk.setNonSignificantConsumed(seqIdx2);
                }
                if (checkpointChunk.isAllConsumed()) {
                    boolean z = true;
                    synchronized (processCheckpointsHolder.getChunks()) {
                        int i = 0;
                        while (true) {
                            if (i >= seqIdx) {
                                break;
                            }
                            if (processCheckpointsHolder.getChunks().get(i) != null && !processCheckpointsHolder.getChunks().get(i).isAllConsumed()) {
                                z = false;
                                break;
                            }
                            i++;
                        }
                    }
                    if (z) {
                        try {
                            Checkpoint checkpoint = null;
                            synchronized (processCheckpointsHolder.getChunks()) {
                                if (processCheckpointsHolder.getChunks().size() > seqIdx + 1) {
                                    for (int i2 = seqIdx + 1; i2 < processCheckpointsHolder.getChunks().size() && processCheckpointsHolder.getChunks().get(i2) != null && processCheckpointsHolder.getChunks().get(i2).isAllConsumed(); i2++) {
                                        this.log.debug("found CP candidate among successors");
                                        checkpoint = processCheckpointsHolder.getChunks().get(i2).getCheckpointCandidate();
                                        processCheckpointsHolder.getChunks().set(i2, null);
                                    }
                                }
                            }
                            if (checkpoint == null) {
                                checkpoint = checkpointChunk.getCheckpointCandidate();
                            }
                            processCheckpointsHolder.getChunks().set(seqIdx, null);
                            try {
                                checkpoint.setErrors(this.errorHandler.getErrorsByIdx(messagesProcessedEvent.getProcessId(), checkpoint.getSeqIdx()));
                                this.persister.store(checkpoint);
                            } catch (ProcessingStatsException e) {
                                throw new EventListenerException((IEvent) messagesProcessedEvent, false, (Throwable) e);
                            }
                        } catch (CheckpointPersisterException e2) {
                            throw new EventListenerException((IEvent) messagesProcessedEvent, false, (Throwable) e2);
                        } catch (Exception e3) {
                            e3.printStackTrace();
                        }
                    } else {
                        continue;
                    }
                }
            }
        } catch (Throwable th) {
            this.entriesLock.readLock().unlock();
            throw th;
        }
    }

    public void setActivationTreshold(int i) {
        this.activationTreshold = i;
    }

    public void setPackageSizePercentageOfTotal(int i) {
        this.packageSizePercentageOfTotal = i;
    }

    public void setErrorHandler(IErrorHandler iErrorHandler) {
        this.errorHandler = iErrorHandler;
    }

    public void setPersister(ICheckpointPersister iCheckpointPersister) {
        this.persister = iCheckpointPersister;
    }
}
