package com.gemstone.gemfire.internal.cache.wan.parallel;

import com.gemstone.gemfire.CancelException;
import com.gemstone.gemfire.DataSerializer;
import com.gemstone.gemfire.cache.CacheException;
import com.gemstone.gemfire.cache.EntryNotFoundException;
import com.gemstone.gemfire.cache.wan.GatewayEventFilter;
import com.gemstone.gemfire.cache.wan.GatewayQueueEvent;
import com.gemstone.gemfire.distributed.internal.DistributionManager;
import com.gemstone.gemfire.distributed.internal.PooledDistributionMessage;
import com.gemstone.gemfire.internal.DataSerializableFixedID;
import com.gemstone.gemfire.internal.cache.AbstractBucketRegionQueue;
import com.gemstone.gemfire.internal.cache.ForceReattemptException;
import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
import com.gemstone.gemfire.internal.cache.LocalRegion;
import com.gemstone.gemfire.internal.cache.PartitionedRegion;
import com.gemstone.gemfire.internal.cache.RegionQueue;
import com.gemstone.gemfire.internal.cache.wan.AbstractGatewaySender;
import com.gemstone.gemfire.internal.cache.wan.GatewaySenderEventImpl;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/gemstone/gemfire/internal/cache/wan/parallel/ParallelQueueRemovalMessage.class */
public class ParallelQueueRemovalMessage extends PooledDistributionMessage {
    private static final Logger logger = LogService.getLogger();
    private HashMap regionToDispatchedKeysMap;

    public ParallelQueueRemovalMessage() {
    }

    public ParallelQueueRemovalMessage(HashMap hashMap) {
        this.regionToDispatchedKeysMap = hashMap;
    }

    @Override // com.gemstone.gemfire.internal.DataSerializableFixedID
    public int getDSFID() {
        return DataSerializableFixedID.PARALLEL_QUEUE_REMOVAL_MESSAGE;
    }

    @Override // com.gemstone.gemfire.distributed.internal.DistributionMessage
    protected void process(DistributionManager distributionManager) {
        boolean destroyFromTempQueue;
        boolean isDebugEnabled = logger.isDebugEnabled();
        GemFireCacheImpl gemFireCacheImpl = GemFireCacheImpl.getInstance();
        if (gemFireCacheImpl != null) {
            int threadInitLevelRequirement = LocalRegion.setThreadInitLevelRequirement(1);
            try {
                for (String str : this.regionToDispatchedKeysMap.keySet()) {
                    PartitionedRegion partitionedRegion = (PartitionedRegion) gemFireCacheImpl.getRegion(str);
                    if (partitionedRegion != null) {
                        AbstractGatewaySender parallelGatewaySender = partitionedRegion.getParallelGatewaySender();
                        Map map = (Map) this.regionToDispatchedKeysMap.get(str);
                        for (Object obj : map.keySet()) {
                            String str2 = "/__PR/" + partitionedRegion.getBucketName(((Integer) obj).intValue());
                            AbstractBucketRegionQueue abstractBucketRegionQueue = (AbstractBucketRegionQueue) gemFireCacheImpl.getRegionByPath(str2);
                            if (isDebugEnabled) {
                                logger.debug("ParallelQueueRemovalMessage : The bucket in the cache is bucketRegionName : {} bucket: {}", str2, abstractBucketRegionQueue);
                            }
                            List list = (List) map.get((Integer) obj);
                            if (list != null) {
                                for (Object obj2 : list) {
                                    parallelGatewaySender.removeFromTempQueueEvents(obj2);
                                    if (abstractBucketRegionQueue == null) {
                                        destroyFromTempQueue(partitionedRegion, ((Integer) obj).intValue(), obj2);
                                    } else if (abstractBucketRegionQueue.isInitialized()) {
                                        if (isDebugEnabled) {
                                            logger.debug("ParallelQueueRemovalMessage : The bucket {} is initialized. Destroying the key {} from BucketRegionQueue.", str2, obj2);
                                        }
                                        afterAckForSecondary_EventInBucket(parallelGatewaySender, abstractBucketRegionQueue, obj2);
                                        destroyKeyFromBucketQueue(abstractBucketRegionQueue, obj2, partitionedRegion);
                                    } else {
                                        if (isDebugEnabled) {
                                            logger.debug("ParallelQueueRemovalMessage : The bucket {} is not yet initialized.", str2);
                                        }
                                        abstractBucketRegionQueue.getInitializationLock().readLock().lock();
                                        try {
                                            if (abstractBucketRegionQueue.containsKey(obj2)) {
                                                afterAckForSecondary_EventInBucket(parallelGatewaySender, abstractBucketRegionQueue, obj2);
                                                destroyKeyFromBucketQueue(abstractBucketRegionQueue, obj2, partitionedRegion);
                                                destroyFromTempQueue = true;
                                            } else {
                                                destroyFromTempQueue = destroyFromTempQueue(abstractBucketRegionQueue.getPartitionedRegion(), ((Integer) obj).intValue(), obj2);
                                            }
                                            if (!destroyFromTempQueue) {
                                                abstractBucketRegionQueue.addToFailedBatchRemovalMessageKeys(obj2);
                                                if (isDebugEnabled) {
                                                    logger.debug("Event is neither destroyed from BucketRegionQueue not from tempQueue. Added to failedBatchRemovalMessageKeys: {}", obj2);
                                                }
                                            }
                                            abstractBucketRegionQueue.getInitializationLock().readLock().unlock();
                                        } catch (Throwable th) {
                                            abstractBucketRegionQueue.getInitializationLock().readLock().unlock();
                                            throw th;
                                        }
                                    }
                                }
                            }
                        }
                    }
                }
            } finally {
                LocalRegion.setThreadInitLevelRequirement(threadInitLevelRequirement);
            }
        }
    }

    private void afterAckForSecondary_EventInBucket(AbstractGatewaySender abstractGatewaySender, AbstractBucketRegionQueue abstractBucketRegionQueue, Object obj) {
        for (GatewayEventFilter gatewayEventFilter : abstractGatewaySender.getGatewayEventFilters()) {
            GatewayQueueEvent gatewayQueueEvent = (GatewayQueueEvent) abstractBucketRegionQueue.get(obj);
            if (gatewayQueueEvent != null) {
                try {
                    gatewayEventFilter.afterAcknowledgement(gatewayQueueEvent);
                } catch (Exception e) {
                    logger.fatal(LocalizedMessage.create(LocalizedStrings.GatewayEventFilter_EXCEPTION_OCCURED_WHILE_HANDLING_CALL_TO_0_AFTER_ACKNOWLEDGEMENT_FOR_EVENT_1, new Object[]{gatewayEventFilter.toString(), gatewayQueueEvent}), e);
                }
            }
        }
    }

    private void destroyKeyFromBucketQueue(AbstractBucketRegionQueue abstractBucketRegionQueue, Object obj, PartitionedRegion partitionedRegion) {
        boolean isDebugEnabled = logger.isDebugEnabled();
        try {
            abstractBucketRegionQueue.destroyKey(obj);
            if (isDebugEnabled) {
                logger.debug("Destroyed the key {} for shadowPR {} for bucket {}", obj, partitionedRegion.getName(), Integer.valueOf(abstractBucketRegionQueue.getId()));
            }
        } catch (CancelException e) {
        } catch (EntryNotFoundException e2) {
            if (isDebugEnabled) {
                logger.debug("Got EntryNotFoundException while destroying the key {} for bucket {}", obj, Integer.valueOf(abstractBucketRegionQueue.getId()));
            }
            abstractBucketRegionQueue.addToFailedBatchRemovalMessageKeys(obj);
        } catch (CacheException e3) {
            logger.error(LocalizedMessage.create(LocalizedStrings.ParallelQueueRemovalMessage_QUEUEREMOVALMESSAGEPROCESSEXCEPTION_IN_PROCESSING_THE_LAST_DISPTACHED_KEY_FOR_A_SHADOWPR_THE_PROBLEM_IS_WITH_KEY__0_FOR_SHADOWPR_WITH_NAME_1, new Object[]{obj, partitionedRegion.getName()}), e3);
        } catch (ForceReattemptException e4) {
            if (isDebugEnabled) {
                logger.debug("Got ForceReattemptException while getting bucket {} to destroyLocally the keys.", Integer.valueOf(abstractBucketRegionQueue.getId()));
            }
        }
    }

    private boolean destroyFromTempQueue(PartitionedRegion partitionedRegion, int i, Object obj) {
        BlockingQueue<GatewaySenderEventImpl> bucketTmpQueue;
        boolean z = false;
        Set<RegionQueue> queues = partitionedRegion.getParallelGatewaySender().getQueues();
        if (queues != null && (bucketTmpQueue = ((ConcurrentParallelGatewaySenderQueue) queues.toArray()[0]).getBucketTmpQueue(i)) != null) {
            Iterator it = bucketTmpQueue.iterator();
            while (it.hasNext()) {
                GatewaySenderEventImpl gatewaySenderEventImpl = (GatewaySenderEventImpl) it.next();
                afterAckForSecondary_EventInTempQueue(partitionedRegion.getParallelGatewaySender(), gatewaySenderEventImpl);
                if (gatewaySenderEventImpl.getShadowKey().equals(obj)) {
                    it.remove();
                    gatewaySenderEventImpl.release();
                    z = true;
                }
            }
        }
        return z;
    }

    private void afterAckForSecondary_EventInTempQueue(AbstractGatewaySender abstractGatewaySender, GatewaySenderEventImpl gatewaySenderEventImpl) {
        for (GatewayEventFilter gatewayEventFilter : abstractGatewaySender.getGatewayEventFilters()) {
            if (gatewaySenderEventImpl != null) {
                try {
                    gatewayEventFilter.afterAcknowledgement(gatewaySenderEventImpl);
                } catch (Exception e) {
                    logger.fatal(LocalizedMessage.create(LocalizedStrings.GatewayEventFilter_EXCEPTION_OCCURED_WHILE_HANDLING_CALL_TO_0_AFTER_ACKNOWLEDGEMENT_FOR_EVENT_1, new Object[]{gatewayEventFilter.toString(), gatewaySenderEventImpl}), e);
                }
            }
        }
    }

    @Override // com.gemstone.gemfire.distributed.internal.DistributionMessage, com.gemstone.gemfire.internal.DataSerializableFixedID
    public void toData(DataOutput dataOutput) throws IOException {
        super.toData(dataOutput);
        DataSerializer.writeHashMap(this.regionToDispatchedKeysMap, dataOutput);
    }

    @Override // com.gemstone.gemfire.distributed.internal.DistributionMessage, com.gemstone.gemfire.internal.DataSerializableFixedID
    public void fromData(DataInput dataInput) throws IOException, ClassNotFoundException {
        super.fromData(dataInput);
        this.regionToDispatchedKeysMap = DataSerializer.readHashMap(dataInput);
    }
}
