package org.apache.qpid.server.queue;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.text.MessageFormat;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.qpid.server.consumer.ConsumerOption;
import org.apache.qpid.server.consumer.ConsumerTarget;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.Filterable;
import org.apache.qpid.server.filter.JMSSelectorFilter;
import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.filter.SelectorParsingException;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.messages.SubscriptionMessages;
import org.apache.qpid.server.logging.subjects.LogSubjectFormat;
import org.apache.qpid.server.logging.subjects.QueueLogSubject;
import org.apache.qpid.server.message.MessageContainer;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.security.access.Operation;
import org.apache.qpid.server.session.AMQPSession;
import org.apache.qpid.server.util.StateChangeListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/qpid/server/queue/QueueConsumerImpl.class */
public class QueueConsumerImpl<T extends ConsumerTarget> extends AbstractConfiguredObject<QueueConsumerImpl<T>> implements QueueConsumer<QueueConsumerImpl<T>, T>, LogSubject {
    private static final Logger LOGGER = LoggerFactory.getLogger(QueueConsumerImpl.class);
    private final AtomicBoolean _closed;
    private final long _consumerNumber;
    private final long _createTime;
    private final MessageInstance.StealableConsumerAcquiredState<QueueConsumerImpl<T>> _owningState;
    private final QueueConsumerImpl<T>.WaitingOnCreditMessageListener _waitingOnCreditMessageListener;
    private final boolean _acquires;
    private final boolean _seesRequeues;
    private final boolean _isTransient;
    private final AtomicLong _deliveredCount;
    private final AtomicLong _deliveredBytes;
    private final FilterManager _filters;
    private final Class<? extends ServerMessage> _messageClass;
    private final Object _sessionReference;
    private final AbstractQueue _queue;
    private final T _target;
    private volatile QueueContext _queueContext;

    @ManagedAttributeField
    private boolean _exclusive;

    @ManagedAttributeField
    private boolean _noLocal;

    @ManagedAttributeField
    private String _distributionMode;

    @ManagedAttributeField
    private String _settlementMode;

    @ManagedAttributeField
    private String _selector;

    @ManagedAttributeField
    private int _priority;
    private final String _linkName;
    private volatile QueueConsumerNode _queueConsumerNode;
    private volatile boolean _nonLive;

    /* loaded from: input_file:org/apache/qpid/server/queue/QueueConsumerImpl$WaitingOnCreditMessageListener.class */
    public class WaitingOnCreditMessageListener implements StateChangeListener<MessageInstance, MessageInstance.EntryState> {
        private final AtomicReference<MessageInstance> _entry = new AtomicReference<>();

        public WaitingOnCreditMessageListener() {
        }

        public void update(MessageInstance messageInstance) {
            remove();
            this._entry.set(messageInstance);
            messageInstance.addStateChangeListener(this);
            if (messageInstance.isAvailable()) {
                return;
            }
            QueueConsumerImpl.this._target.notifyWork();
            remove();
        }

        public void remove() {
            MessageInstance andSet = this._entry.getAndSet(null);
            if (andSet != null) {
                andSet.removeStateChangeListener(this);
            }
        }

        @Override // org.apache.qpid.server.util.StateChangeListener
        public void stateChanged(MessageInstance messageInstance, MessageInstance.EntryState entryState, MessageInstance.EntryState entryState2) {
            messageInstance.removeStateChangeListener(this);
            this._entry.compareAndSet(messageInstance, null);
            QueueConsumerImpl.this._target.notifyWork();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public QueueConsumerImpl(AbstractQueue<?> abstractQueue, T t, String str, FilterManager filterManager, Class<? extends ServerMessage> cls, EnumSet<ConsumerOption> enumSet, Integer num) {
        super(abstractQueue, createAttributeMap(t.getSession(), str, filterManager, enumSet, num));
        this._closed = new AtomicBoolean(false);
        this._createTime = System.currentTimeMillis();
        this._owningState = new MessageInstance.StealableConsumerAcquiredState<>(this);
        this._waitingOnCreditMessageListener = new WaitingOnCreditMessageListener();
        this._deliveredCount = new AtomicLong(0L);
        this._deliveredBytes = new AtomicLong(0L);
        this._messageClass = cls;
        this._sessionReference = t.getSession().getConnectionReference();
        this._consumerNumber = CONSUMER_NUMBER_GENERATOR.getAndIncrement();
        this._filters = filterManager;
        this._acquires = enumSet.contains(ConsumerOption.ACQUIRES);
        this._seesRequeues = enumSet.contains(ConsumerOption.SEES_REQUEUES);
        this._isTransient = enumSet.contains(ConsumerOption.TRANSIENT);
        this._target = t;
        this._queue = abstractQueue;
        this._linkName = str;
        authorise(Operation.CREATE);
        open();
        setupLogging();
    }

    private static Map<String, Object> createAttributeMap(AMQPSession<?, ?> aMQPSession, String str, FilterManager filterManager, EnumSet<ConsumerOption> enumSet, Integer num) {
        HashMap hashMap = new HashMap();
        hashMap.put("id", UUID.randomUUID());
        hashMap.put("name", aMQPSession.getAMQPConnection().getConnectionId() + "|" + aMQPSession.getChannelId() + "|" + str);
        hashMap.put("exclusive", Boolean.valueOf(enumSet.contains(ConsumerOption.EXCLUSIVE)));
        hashMap.put("noLocal", Boolean.valueOf(enumSet.contains(ConsumerOption.NO_LOCAL)));
        hashMap.put(Consumer.DISTRIBUTION_MODE, enumSet.contains(ConsumerOption.ACQUIRES) ? "MOVE" : "COPY");
        hashMap.put(ConfiguredObject.DURABLE, Boolean.valueOf(enumSet.contains(ConsumerOption.DURABLE)));
        hashMap.put(ConfiguredObject.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_SESSION_END);
        if (num != null) {
            hashMap.put("priority", num);
        }
        if (filterManager != null) {
            Iterator<MessageFilter> filters = filterManager.filters();
            while (true) {
                if (!filters.hasNext()) {
                    break;
                }
                MessageFilter next = filters.next();
                if (next instanceof JMSSelectorFilter) {
                    hashMap.put(Consumer.SELECTOR, ((JMSSelectorFilter) next).getSelector());
                    break;
                }
            }
        }
        return hashMap;
    }

    @Override // org.apache.qpid.server.message.MessageInstanceConsumer
    public T getTarget() {
        return this._target;
    }

    @Override // org.apache.qpid.server.model.Consumer
    public String getLinkName() {
        return this._linkName;
    }

    @Override // org.apache.qpid.server.queue.QueueConsumer
    public void awaitCredit(QueueEntry queueEntry) {
        this._waitingOnCreditMessageListener.update(queueEntry);
    }

    @Override // org.apache.qpid.server.queue.QueueConsumer
    public boolean isNotifyWorkDesired() {
        return !isNonLive() && this._target.isNotifyWorkDesired();
    }

    @Override // org.apache.qpid.server.message.MessageInstanceConsumer
    public void externalStateChange() {
        this._target.notifyWork();
    }

    @Override // org.apache.qpid.server.model.Consumer
    public long getUnacknowledgedBytes() {
        return this._target.getUnacknowledgedBytes();
    }

    @Override // org.apache.qpid.server.model.Consumer
    public long getUnacknowledgedMessages() {
        return this._target.getUnacknowledgedMessages();
    }

    @Override // org.apache.qpid.server.queue.QueueConsumer, org.apache.qpid.server.model.Consumer
    public AMQPSession<?, ?> getSession() {
        return this._target.getSession();
    }

    @Override // org.apache.qpid.server.message.MessageInstanceConsumer
    public Object getIdentifier() {
        return Long.valueOf(getConsumerNumber());
    }

    @Override // org.apache.qpid.server.model.Consumer
    public boolean isSuspended() {
        return this._target.isSuspended();
    }

    @Override // org.apache.qpid.server.model.AbstractConfiguredObject
    protected ListenableFuture<Void> onClose() {
        if (!this._closed.compareAndSet(false, true)) {
            return Futures.immediateFuture((Object) null);
        }
        getEventLogger().message(getLogSubject(), SubscriptionMessages.CLOSE());
        this._waitingOnCreditMessageListener.remove();
        return doAfter(this._target.consumerRemoved(this), () -> {
            this._queue.unregisterConsumer(this);
        }).then(this::deleteNoChecks);
    }

    @Override // org.apache.qpid.server.model.Consumer
    public int getPriority() {
        return this._priority;
    }

    @Override // org.apache.qpid.server.queue.QueueConsumer
    public void flushBatched() {
        this._target.flushBatched();
    }

    @Override // org.apache.qpid.server.queue.QueueConsumer
    public void notifyWork() {
        this._target.notifyWork();
    }

    @Override // org.apache.qpid.server.queue.QueueConsumer
    public void setQueueConsumerNode(QueueConsumerNode queueConsumerNode) {
        this._queueConsumerNode = queueConsumerNode;
    }

    @Override // org.apache.qpid.server.queue.QueueConsumer
    public QueueConsumerNode getQueueConsumerNode() {
        return this._queueConsumerNode;
    }

    @Override // org.apache.qpid.server.queue.QueueConsumer
    public void queueDeleted() {
        this._target.queueDeleted(getQueue(), this);
    }

    @Override // org.apache.qpid.server.queue.QueueConsumer
    public boolean allocateCredit(QueueEntry queueEntry) {
        return this._target.allocateCredit(queueEntry.getMessage());
    }

    @Override // org.apache.qpid.server.queue.QueueConsumer
    public void restoreCredit(QueueEntry queueEntry) {
        this._target.restoreCredit(queueEntry.getMessage());
    }

    @Override // org.apache.qpid.server.queue.QueueConsumer
    public void noMessagesAvailable() {
        this._target.noMessagesAvailable();
    }

    @Override // org.apache.qpid.server.model.AbstractConfiguredObject
    protected void logOperation(String str) {
        getEventLogger().message(SubscriptionMessages.OPERATION(str));
    }

    @Override // org.apache.qpid.server.queue.QueueConsumer
    public final Queue<?> getQueue() {
        return this._queue;
    }

    private void setupLogging() {
        String filterLogString = getFilterLogString();
        getEventLogger().message(this, SubscriptionMessages.CREATE(filterLogString, this._queue.isDurable() && this._exclusive, filterLogString.length() > 0));
    }

    protected final LogSubject getLogSubject() {
        return this;
    }

    @Override // org.apache.qpid.server.message.MessageInstanceConsumer
    public MessageContainer pullMessage() {
        MessageContainer deliverSingleMessage = this._queue.deliverSingleMessage(this);
        if (deliverSingleMessage != null) {
            this._deliveredCount.incrementAndGet();
            this._deliveredBytes.addAndGet(deliverSingleMessage.getMessageInstance().getMessage().getSizeIncludingHeader());
        }
        return deliverSingleMessage;
    }

    @Override // org.apache.qpid.server.message.MessageInstanceConsumer
    public void setNotifyWorkDesired(boolean z) {
        this._queue.setNotifyWorkDesired(this, z);
    }

    @Override // org.apache.qpid.server.model.Consumer
    public final long getConsumerNumber() {
        return this._consumerNumber;
    }

    @Override // org.apache.qpid.server.queue.QueueConsumer
    public final QueueContext getQueueContext() {
        return this._queueContext;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public final void setQueueContext(QueueContext queueContext) {
        this._queueContext = queueContext;
    }

    @Override // org.apache.qpid.server.model.Consumer
    public final boolean isActive() {
        return this._target.getState() == ConsumerTarget.State.OPEN;
    }

    @Override // org.apache.qpid.server.message.MessageInstanceConsumer
    public final boolean isClosed() {
        return this._target.getState() == ConsumerTarget.State.CLOSED;
    }

    @Override // org.apache.qpid.server.queue.QueueConsumer
    public final boolean hasInterest(QueueEntry queueEntry) {
        Object connectionReference;
        if (queueEntry.isRejectedBy(this) || queueEntry.checkHeld(System.currentTimeMillis())) {
            return false;
        }
        if (queueEntry.getMessage().getClass() == this._messageClass) {
            if (this._noLocal && (connectionReference = queueEntry.getMessage().getConnectionReference()) != null && connectionReference == this._sessionReference) {
                return false;
            }
        } else if (this._messageClass != null && MessageConverterRegistry.getConverter(queueEntry.getMessage().getClass(), this._messageClass) == null) {
            return false;
        }
        if (this._filters == null) {
            return true;
        }
        MessageReference newMessageReference = queueEntry.newMessageReference();
        if (newMessageReference == null) {
            return false;
        }
        try {
            Filterable asFilterable = queueEntry.asFilterable();
            try {
                boolean allAllow = this._filters.allAllow(asFilterable);
                newMessageReference.release();
                return allAllow;
            } catch (SelectorParsingException e) {
                LOGGER.info(this + " could not evaluate filter [" + this._filters + "]  against message " + asFilterable + ". Error was : " + e.getMessage());
                newMessageReference.release();
                return false;
            }
        } catch (Throwable th) {
            newMessageReference.release();
            throw th;
        }
    }

    protected String getFilterLogString() {
        StringBuilder sb = new StringBuilder();
        boolean z = false;
        if (this._filters != null && this._filters.hasFilters()) {
            sb.append(this._filters.toString());
            z = true;
        }
        if (!acquires()) {
            if (z) {
                sb.append(", ");
            }
            sb.append("Browser");
        }
        return sb.toString();
    }

    public final long getCreateTime() {
        return this._createTime;
    }

    @Override // org.apache.qpid.server.queue.QueueConsumer
    public final MessageInstance.StealableConsumerAcquiredState<QueueConsumerImpl<T>> getOwningState() {
        return this._owningState;
    }

    @Override // org.apache.qpid.server.message.MessageInstanceConsumer
    public final boolean acquires() {
        return this._acquires;
    }

    @Override // org.apache.qpid.server.model.Consumer
    public final boolean seesRequeues() {
        return this._seesRequeues;
    }

    public final boolean isTransient() {
        return this._isTransient;
    }

    @Override // org.apache.qpid.server.model.Consumer
    public final long getBytesOut() {
        return this._deliveredBytes.longValue();
    }

    @Override // org.apache.qpid.server.model.Consumer
    public final long getMessagesOut() {
        return this._deliveredCount.longValue();
    }

    @Override // org.apache.qpid.server.queue.QueueConsumer
    public void acquisitionRemoved(QueueEntry queueEntry) {
        this._target.acquisitionRemoved(queueEntry);
    }

    @Override // org.apache.qpid.server.model.Consumer
    public String getDistributionMode() {
        return this._distributionMode;
    }

    @Override // org.apache.qpid.server.model.Consumer
    public String getSettlementMode() {
        return this._settlementMode;
    }

    @Override // org.apache.qpid.server.model.Consumer
    public boolean isExclusive() {
        return this._exclusive;
    }

    @Override // org.apache.qpid.server.model.Consumer
    public boolean isNoLocal() {
        return this._noLocal;
    }

    @Override // org.apache.qpid.server.model.Consumer
    public String getSelector() {
        return this._selector;
    }

    @Override // org.apache.qpid.server.model.Consumer
    public boolean isNonLive() {
        return this._nonLive;
    }

    public void setNonLive(boolean z) {
        this._nonLive = z;
    }

    @Override // org.apache.qpid.server.logging.LogSubject
    public String toLogString() {
        String str;
        if (this._queue == null) {
            str = "[" + MessageFormat.format(LogSubjectFormat.SUBSCRIPTION_FORMAT, Long.valueOf(getConsumerNumber())) + "(UNKNOWN)] ";
        } else {
            String logString = new QueueLogSubject(this._queue).toLogString();
            str = "[" + MessageFormat.format(LogSubjectFormat.SUBSCRIPTION_FORMAT, Long.valueOf(getConsumerNumber())) + "(" + logString.substring(1, logString.length() - 3) + ")] ";
        }
        return str;
    }

    private EventLogger getEventLogger() {
        return this._queue.getEventLogger();
    }
}
