package com.azure.core.amqp.implementation;

import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.AmqpRetryPolicy;
import com.azure.core.amqp.exception.AmqpErrorCondition;
import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.exception.OperationCancelledException;
import com.azure.core.amqp.implementation.handler.SendLinkHandler;
import com.azure.core.exception.AzureException;
import com.azure.core.util.logging.ClientLogger;
import java.io.IOException;
import java.io.Serializable;
import java.nio.BufferOverflowException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.PriorityQueue;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedLong;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.transaction.Declared;
import org.apache.qpid.proton.amqp.transport.DeliveryState;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.message.Message;
import reactor.core.Disposable;
import reactor.core.Disposables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;
import reactor.core.publisher.ReplayProcessor;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/azure/core/amqp/implementation/ReactorSender.class */
public class ReactorSender implements AmqpSendLink {
    private final String entityPath;
    private final Sender sender;
    private final SendLinkHandler handler;
    private final ReactorProvider reactorProvider;
    private final Disposable.Composite subscriptions;
    private final ReplayProcessor<AmqpEndpointState> endpointStates;
    private final TokenManager tokenManager;
    private final MessageSerializer messageSerializer;
    private final AmqpRetryPolicy retry;
    private final Duration timeout;
    private volatile Exception lastKnownLinkError;
    private volatile Instant lastKnownErrorReportedAt;
    private volatile int linkSize;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final AtomicBoolean hasConnected = new AtomicBoolean();
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final AtomicBoolean hasAuthorized = new AtomicBoolean(true);
    private final AtomicInteger retryAttempts = new AtomicInteger();
    private final Object pendingSendLock = new Object();
    private final ConcurrentHashMap<String, RetriableWorkItem> pendingSendsMap = new ConcurrentHashMap<>();
    private final PriorityQueue<WeightedDeliveryTag> pendingSendsQueue = new PriorityQueue<>(1000, new DeliveryTagComparator());
    private final ClientLogger logger = new ClientLogger(ReactorSender.class);
    private final Timer sendTimeoutTimer = new Timer("SendTimeout-timer");
    private final Object errorConditionLock = new Object();

    /* loaded from: input_file:com/azure/core/amqp/implementation/ReactorSender$DeliveryTagComparator.class */
    private static class DeliveryTagComparator implements Comparator<WeightedDeliveryTag>, Serializable {
        private static final long serialVersionUID = -7057500582037295635L;

        private DeliveryTagComparator() {
        }

        @Override // java.util.Comparator
        public int compare(WeightedDeliveryTag weightedDeliveryTag, WeightedDeliveryTag weightedDeliveryTag2) {
            return weightedDeliveryTag2.getPriority() - weightedDeliveryTag.getPriority();
        }
    }

    /* loaded from: input_file:com/azure/core/amqp/implementation/ReactorSender$SendTimeout.class */
    private class SendTimeout extends TimerTask {
        private final String deliveryTag;

        SendTimeout(String str) {
            this.deliveryTag = str;
        }

        @Override // java.util.TimerTask, java.lang.Runnable
        public void run() {
            AzureException azureException;
            Instant instant;
            RetriableWorkItem retriableWorkItem = (RetriableWorkItem) ReactorSender.this.pendingSendsMap.remove(this.deliveryTag);
            if (retriableWorkItem == null) {
                return;
            }
            AzureException azureException2 = ReactorSender.this.lastKnownLinkError;
            synchronized (ReactorSender.this.errorConditionLock) {
                azureException = ReactorSender.this.lastKnownLinkError;
                instant = ReactorSender.this.lastKnownErrorReportedAt;
            }
            if (azureException != null && instant != null) {
                Instant now = Instant.now();
                azureException2 = (((azureException instanceof AmqpException) && instant.isAfter(now.minusSeconds(4L))) || instant.isAfter(now.minus((TemporalAmount) ReactorSender.this.timeout))) ? azureException : null;
            }
            retriableWorkItem.error(azureException2 instanceof AmqpException ? (AmqpException) azureException2 : new AmqpException(true, AmqpErrorCondition.TIMEOUT_ERROR, String.format(Locale.US, "Entity(%s): Send operation timed out", ReactorSender.this.entityPath), ReactorSender.this.handler.getErrorContext(ReactorSender.this.sender)));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/azure/core/amqp/implementation/ReactorSender$WeightedDeliveryTag.class */
    public static class WeightedDeliveryTag {
        private final String deliveryTag;
        private final int priority;

        WeightedDeliveryTag(String str, int i) {
            this.deliveryTag = str;
            this.priority = i;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public String getDeliveryTag() {
            return this.deliveryTag;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public int getPriority() {
            return this.priority;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReactorSender(String str, Sender sender, SendLinkHandler sendLinkHandler, ReactorProvider reactorProvider, TokenManager tokenManager, MessageSerializer messageSerializer, Duration duration, AmqpRetryPolicy amqpRetryPolicy) {
        this.entityPath = str;
        this.sender = sender;
        this.handler = sendLinkHandler;
        this.reactorProvider = reactorProvider;
        this.tokenManager = tokenManager;
        this.messageSerializer = messageSerializer;
        this.retry = amqpRetryPolicy;
        this.timeout = duration;
        this.endpointStates = this.handler.getEndpointStates().map(endpointState -> {
            this.logger.verbose("connectionId[{}], path[{}], linkName[{}]: State {}", new Object[]{sendLinkHandler.getConnectionId(), str, getLinkName(), endpointState});
            this.hasConnected.set(endpointState == EndpointState.ACTIVE);
            return AmqpEndpointStateUtil.getConnectionState(endpointState);
        }).subscribeWith(ReplayProcessor.cacheLastOrDefault(AmqpEndpointState.UNINITIALIZED));
        this.subscriptions = Disposables.composite(new Disposable[]{this.handler.getDeliveredMessages().subscribe(this::processDeliveredMessage), this.handler.getLinkCredits().subscribe(num -> {
            this.logger.verbose("connectionId[{}], entityPath[{}], linkName[{}]: Credits on link: {}", new Object[]{sendLinkHandler.getConnectionId(), str, getLinkName(), num});
            scheduleWorkOnDispatcher();
        })});
        if (tokenManager != null) {
            this.subscriptions.add(this.tokenManager.getAuthorizationResults().subscribe(amqpResponseCode -> {
                this.logger.verbose("connectionId[{}], entityPath[{}], linkName[{}]: Token refreshed: {}", new Object[]{sendLinkHandler.getConnectionId(), str, getLinkName(), amqpResponseCode});
                this.hasAuthorized.set(true);
            }, th -> {
                this.logger.info("connectionId[{}], entityPath[{}], linkName[{}]: tokenRenewalFailure[{}]", new Object[]{sendLinkHandler.getConnectionId(), str, getLinkName(), th.getMessage()});
                this.hasAuthorized.set(false);
            }, () -> {
                this.hasAuthorized.set(false);
            }));
        }
    }

    @Override // com.azure.core.amqp.AmqpLink
    public Flux<AmqpEndpointState> getEndpointStates() {
        return this.endpointStates;
    }

    @Override // com.azure.core.amqp.implementation.AmqpSendLink
    public Mono<Void> send(Message message) {
        return send(message, (DeliveryState) null);
    }

    @Override // com.azure.core.amqp.implementation.AmqpSendLink
    public Mono<Void> send(Message message, DeliveryState deliveryState) {
        return getLinkSize().flatMap(num -> {
            int min = Math.min(this.messageSerializer.getSize(message) + 512, num.intValue());
            byte[] bArr = new byte[min];
            try {
                return send(bArr, message.encode(bArr, 0, min), 0, deliveryState);
            } catch (BufferOverflowException e) {
                return Mono.error(new AmqpException(false, AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED, String.format(Locale.US, "Error sending. Size of the payload exceeded maximum message size: %s kb", Integer.valueOf(num.intValue() / 1024)), e, this.handler.getErrorContext(this.sender)));
            }
        }).then();
    }

    @Override // com.azure.core.amqp.implementation.AmqpSendLink
    public Mono<Void> send(List<Message> list) {
        return send(list, (DeliveryState) null);
    }

    @Override // com.azure.core.amqp.implementation.AmqpSendLink
    public Mono<Void> send(List<Message> list, DeliveryState deliveryState) {
        return list.size() == 1 ? send(list.get(0), deliveryState) : getLinkSize().flatMap(num -> {
            Message message = (Message) list.get(0);
            Message message2 = Proton.message();
            message2.setMessageAnnotations(message.getMessageAnnotations());
            int intValue = num.intValue();
            byte[] bArr = new byte[intValue];
            int encode = message2.encode(bArr, 0, intValue);
            Iterator it = list.iterator();
            while (it.hasNext()) {
                Message message3 = (Message) it.next();
                Message message4 = Proton.message();
                int min = Math.min(this.messageSerializer.getSize(message3) + 512, intValue);
                byte[] bArr2 = new byte[min];
                message4.setBody(new Data(new Binary(bArr2, 0, message3.encode(bArr2, 0, min))));
                try {
                    encode += message4.encode(bArr, encode, (intValue - encode) - 1);
                } catch (BufferOverflowException e) {
                    return Mono.error(new AmqpException(false, AmqpErrorCondition.LINK_PAYLOAD_SIZE_EXCEEDED, String.format(Locale.US, "Size of the payload exceeded maximum message size: %s kb", Integer.valueOf(intValue / 1024)), e, this.handler.getErrorContext(this.sender)));
                }
            }
            return send(bArr, encode, -2147404032, deliveryState);
        }).then();
    }

    @Override // com.azure.core.amqp.implementation.AmqpSendLink
    public AmqpErrorContext getErrorContext() {
        return this.handler.getErrorContext(this.sender);
    }

    @Override // com.azure.core.amqp.AmqpLink
    public String getLinkName() {
        return this.sender.getName();
    }

    @Override // com.azure.core.amqp.AmqpLink
    public String getEntityPath() {
        return this.entityPath;
    }

    @Override // com.azure.core.amqp.AmqpLink
    public String getHostname() {
        return this.handler.getHostname();
    }

    @Override // com.azure.core.amqp.implementation.AmqpSendLink
    public Mono<Integer> getLinkSize() {
        if (this.linkSize > 0) {
            return Mono.just(Integer.valueOf(this.linkSize));
        }
        synchronized (this) {
            if (this.linkSize > 0) {
                return Mono.just(Integer.valueOf(this.linkSize));
            }
            return RetryUtil.withRetry(getEndpointStates().takeUntil(amqpEndpointState -> {
                return amqpEndpointState == AmqpEndpointState.ACTIVE;
            }).then(Mono.fromCallable(() -> {
                UnsignedLong remoteMaxMessageSize = this.sender.getRemoteMaxMessageSize();
                if (remoteMaxMessageSize != null) {
                    this.linkSize = remoteMaxMessageSize.intValue();
                }
                return Integer.valueOf(this.linkSize);
            })), this.timeout, this.retry);
        }
    }

    public boolean isDisposed() {
        return this.isDisposed.get();
    }

    public void dispose() {
        dispose(null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void dispose(ErrorCondition errorCondition) {
        if (this.isDisposed.getAndSet(true)) {
            return;
        }
        this.subscriptions.dispose();
        this.tokenManager.close();
        if (this.sender.getLocalState() == EndpointState.CLOSED) {
            return;
        }
        ClientLogger clientLogger = this.logger;
        Object[] objArr = new Object[4];
        objArr[0] = this.handler.getConnectionId();
        objArr[1] = this.entityPath;
        objArr[2] = getLinkName();
        objArr[3] = errorCondition != null ? errorCondition : ClientConstants.NOT_APPLICABLE;
        clientLogger.verbose("connectionId[{}], path[{}], linkName[{}]: setting error condition {}", objArr);
        if (errorCondition != null && this.sender.getCondition() == null) {
            this.sender.setCondition(errorCondition);
        }
        this.sender.close();
    }

    @Override // com.azure.core.amqp.implementation.AmqpSendLink
    public Mono<DeliveryState> send(byte[] bArr, int i, int i2, DeliveryState deliveryState) {
        return validateEndpoint().then(Mono.create(monoSink -> {
            sendWork(new RetriableWorkItem(bArr, i, i2, (MonoSink<DeliveryState>) monoSink, this.timeout, deliveryState));
        }));
    }

    private Mono<Void> validateEndpoint() {
        return Mono.defer(() -> {
            return RetryUtil.withRetry(this.handler.getEndpointStates().takeUntil(endpointState -> {
                return endpointState == EndpointState.ACTIVE;
            }), this.timeout, this.retry).then();
        });
    }

    private void sendWork(RetriableWorkItem retriableWorkItem) {
        String replace = UUID.randomUUID().toString().replace("-", "");
        synchronized (this.pendingSendLock) {
            this.pendingSendsMap.put(replace, retriableWorkItem);
            this.pendingSendsQueue.offer(new WeightedDeliveryTag(replace, retriableWorkItem.hasBeenRetried() ? 1 : 0));
        }
        scheduleWorkOnDispatcher();
    }

    private void processSendWork() {
        RetriableWorkItem retriableWorkItem;
        String str;
        if (!this.hasConnected.get()) {
            this.logger.warning("Not connected. Not processing send work.");
            return;
        }
        while (this.hasConnected.get() && this.sender.getCredit() > 0) {
            synchronized (this.pendingSendLock) {
                WeightedDeliveryTag poll = this.pendingSendsQueue.poll();
                if (poll != null) {
                    str = poll.getDeliveryTag();
                    retriableWorkItem = this.pendingSendsMap.get(str);
                } else {
                    retriableWorkItem = null;
                    str = null;
                }
            }
            if (retriableWorkItem != null) {
                Delivery delivery = null;
                boolean z = false;
                int i = 0;
                Exception exc = null;
                try {
                    delivery = this.sender.delivery(str.getBytes(StandardCharsets.UTF_8));
                    delivery.setMessageFormat(retriableWorkItem.getMessageFormat());
                    if (retriableWorkItem.isDeliveryStateProvided()) {
                        delivery.disposition(retriableWorkItem.getDeliveryState());
                    }
                    i = this.sender.send(retriableWorkItem.getMessage(), 0, retriableWorkItem.getEncodedMessageSize());
                } catch (Exception e) {
                    exc = e;
                }
                if (!$assertionsDisabled && i != retriableWorkItem.getEncodedMessageSize()) {
                    throw new AssertionError("Contract of the ProtonJ library for Sender. Send API changed");
                    break;
                }
                z = this.sender.advance();
                if (z) {
                    this.logger.verbose("entityPath[{}], linkName[{}], deliveryTag[{}]: Sent message", new Object[]{this.entityPath, getLinkName(), str});
                    retriableWorkItem.setWaitingForAck();
                    this.sendTimeoutTimer.schedule(new SendTimeout(str), this.timeout.toMillis());
                } else {
                    this.logger.verbose("clientId[{}]. path[{}], linkName[{}], deliveryTag[{}], sentMessageSize[{}], payloadActualSize[{}]: sendlink advance failed", new Object[]{this.handler.getConnectionId(), this.entityPath, getLinkName(), str, Integer.valueOf(i), Integer.valueOf(retriableWorkItem.getEncodedMessageSize())});
                    if (delivery != null) {
                        delivery.free();
                    }
                    AmqpErrorContext errorContext = this.handler.getErrorContext(this.sender);
                    retriableWorkItem.error(exc != null ? new OperationCancelledException(String.format(Locale.US, "Entity(%s): send operation failed. Please see cause for more details", this.entityPath), exc, errorContext) : new OperationCancelledException(String.format(Locale.US, "Entity(%s): send operation failed while advancing delivery(tag: %s).", this.entityPath, str), errorContext));
                }
            } else {
                if (str != null) {
                    this.logger.verbose("clientId[{}]. path[{}], linkName[{}], deliveryTag[{}]: sendData not found for this delivery.", new Object[]{this.handler.getConnectionId(), this.entityPath, getLinkName(), str});
                    return;
                }
                return;
            }
        }
    }

    private void processDeliveredMessage(Delivery delivery) {
        int i;
        DeliveryState remoteState = delivery.getRemoteState();
        String str = new String(delivery.getTag(), StandardCharsets.UTF_8);
        this.logger.verbose("entityPath[{}], linkName[{}], deliveryTag[{}]: process delivered message", new Object[]{this.entityPath, getLinkName(), str});
        RetriableWorkItem remove = this.pendingSendsMap.remove(str);
        if (remove == null) {
            this.logger.verbose("clientId[{}]. path[{}], linkName[{}], delivery[{}] - mismatch (or send timed out)", new Object[]{this.handler.getConnectionId(), this.entityPath, getLinkName(), str});
            return;
        }
        if (remove.isDeliveryStateProvided()) {
            remove.success(remoteState);
            return;
        }
        if (remoteState instanceof Accepted) {
            synchronized (this.errorConditionLock) {
                this.lastKnownLinkError = null;
                this.lastKnownErrorReportedAt = null;
                this.retryAttempts.set(0);
            }
            remove.success(remoteState);
            return;
        }
        if (!(remoteState instanceof Rejected)) {
            if (remoteState instanceof Released) {
                cleanupFailedSend(remove, new OperationCancelledException(remoteState.toString(), this.handler.getErrorContext(this.sender)));
                return;
            } else if (remoteState instanceof Declared) {
                remove.success((Declared) remoteState);
                return;
            } else {
                cleanupFailedSend(remove, new AmqpException(false, remoteState.toString(), this.handler.getErrorContext(this.sender)));
                return;
            }
        }
        Rejected rejected = (Rejected) remoteState;
        ErrorCondition error = rejected.getError();
        Exception exception = ExceptionUtil.toException(error.getCondition().toString(), error.getDescription(), this.handler.getErrorContext(this.sender));
        this.logger.warning("entityPath[{}], linkName[{}], deliveryTag[{}]: Delivery rejected. [{}]", new Object[]{this.entityPath, getLinkName(), str, rejected});
        if (isGeneralSendError(error.getCondition())) {
            synchronized (this.errorConditionLock) {
                this.lastKnownLinkError = exception;
                this.lastKnownErrorReportedAt = Instant.now();
                i = this.retryAttempts.incrementAndGet();
            }
        } else {
            i = this.retryAttempts.get();
        }
        Duration calculateRetryDelay = this.retry.calculateRetryDelay(exception, i);
        if (calculateRetryDelay == null || calculateRetryDelay.compareTo(remove.getTimeoutTracker().remaining()) > 0) {
            cleanupFailedSend(remove, exception);
            return;
        }
        remove.setLastKnownException(exception);
        try {
            this.reactorProvider.getReactorDispatcher().invoke(() -> {
                sendWork(remove);
            }, calculateRetryDelay);
        } catch (IOException | RejectedExecutionException e) {
            exception.initCause(e);
            cleanupFailedSend(remove, new AmqpException(false, String.format(Locale.US, "Entity(%s): send operation failed while scheduling a retry on Reactor, see cause for more details.", this.entityPath), e, this.handler.getErrorContext(this.sender)));
        }
    }

    private void scheduleWorkOnDispatcher() {
        try {
            this.reactorProvider.getReactorDispatcher().invoke(this::processSendWork);
        } catch (IOException e) {
            this.logger.error("Error scheduling work on reactor.", new Object[]{e});
        }
    }

    private void cleanupFailedSend(RetriableWorkItem retriableWorkItem, Exception exc) {
        retriableWorkItem.error(exc);
    }

    private static boolean isGeneralSendError(Symbol symbol) {
        return symbol == AmqpErrorCode.SERVER_BUSY_ERROR || symbol == AmqpErrorCode.TIMEOUT_ERROR || symbol == AmqpErrorCode.RESOURCE_LIMIT_EXCEEDED;
    }

    static {
        $assertionsDisabled = !ReactorSender.class.desiredAssertionStatus();
    }
}
