package com.solace.messaging.publisher;

import com.solace.messaging.MessagingService;
import com.solace.messaging.PubSubPlusClientException;
import com.solace.messaging.publisher.DirectMessagePublisher;
import com.solace.messaging.publisher.OutboundMessageBuilder;
import com.solace.messaging.publisher.PublisherBuffers;
import com.solace.messaging.publisher.PublisherHealthCheck;
import com.solace.messaging.resources.Topic;
import com.solace.messaging.util.CompletionListener;
import com.solace.messaging.util.LifecycleControl;
import com.solace.messaging.util.Manageable;
import com.solace.messaging.util.ManageablePublisher;
import com.solace.messaging.util.PublisherCongestionNotificationDispatcher;
import com.solace.messaging.util.TypedProperties;
import com.solace.messaging.util.async.ExtendedCompletableFuture;
import com.solace.messaging.util.async.ThreadFactories;
import com.solace.messaging.util.internal.BiTask;
import com.solace.messaging.util.internal.ClientSession;
import com.solace.messaging.util.internal.Internal;
import com.solace.messaging.util.internal.MessagingServiceInternalView;
import com.solace.messaging.util.internal.Task;
import com.solace.messaging.util.internal.TerminationEventImpl;
import com.solace.messaging.util.internal.TerminationNotificationDispatcher;
import com.solace.messaging.util.internal.Validation;
import com.solacesystems.jcsmp.ClosedFacilityException;
import com.solacesystems.jcsmp.JCSMPException;
import com.solacesystems.jcsmp.JCSMPStreamingPublishCorrelatingEventHandler;
import com.solacesystems.jcsmp.JCSMPStreamingPublishEventHandler;
import com.solacesystems.jcsmp.XMLMessageProducer;
import com.solacesystems.jcsmp.statistics.StatType;
import java.time.Instant;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.osgi.annotation.versioning.ProviderType;

@Internal
@ProviderType
/* loaded from: input_file:com/solace/messaging/publisher/DirectMessagePublisherImpl.class */
public class DirectMessagePublisherImpl implements DirectMessagePublisher {
    private volatile XMLMessageProducer producer;
    private final MessagingServiceInternalView serviceInternalView;
    private final TypedProperties publisherConfiguration;
    private final PublisherBuffers.PublisherBuffer<Topic> buffer;
    private final OutboundMessageBuilder messageBuilder;
    static final int STATE_NOT_STARTED = 0;
    static final int STATE_STARTED = 1;
    static final int STATE_TERMINATED = 2;
    private static final AtomicLong instanceIdGenerator = new AtomicLong(0);
    private static final Log logger = LogFactory.getLog(DirectMessagePublisherImpl.class);
    private static final BiTask<DirectMessagePublisherImpl, AtomicInteger> postTerminateNowTask = (directMessagePublisherImpl, atomicInteger) -> {
        try {
            int size = directMessagePublisherImpl.buffer.size();
            boolean z = size < STATE_STARTED;
            directMessagePublisherImpl.buffer.clear();
            if (!z) {
                atomicInteger.set(size);
                directMessagePublisherImpl.serviceInternalView.getApiMetricsCollector().increaseMetric(Manageable.ApiMetrics.Metric.PUBLISH_MESSAGES_TERMINATION_DISCARDED, size);
                if (logger.isWarnEnabled()) {
                    logger.warn(directMessagePublisherImpl.instanceName + " not gracefully terminated before all buffered messages were processed.");
                }
            }
        } catch (PubSubPlusClientException.RequestInterruptedException e) {
            if (logger.isWarnEnabled()) {
                logger.warn("Non-graceful termination of " + directMessagePublisherImpl.instanceName + " was interrupted");
            }
        }
        if (directMessagePublisherImpl.publisherBotExecutorService.isShutdown()) {
            return;
        }
        directMessagePublisherImpl.publisherBotExecutorService.shutdown();
    };
    final AtomicInteger stateHolder = new AtomicInteger(STATE_NOT_STARTED);
    private volatile boolean gracefulShutdownInProgress = false;
    private final long id = instanceIdGenerator.incrementAndGet();
    private final String instanceName = "DirectMessagePublisher@" + this.id;
    private final ExecutorService publisherBotExecutorService = Executors.newSingleThreadExecutor(new ThreadFactories.NamedDaemonThreadFactory(this.instanceName + "-message-dispatcher"));
    private final ManageablePublisher.DirectPublisherInfo publisherInfo = new DirectPublisherInfoImpl();
    private final TerminationNotificationDispatcher terminationNotificationDispatcher = new TerminationNotificationDispatcher();
    private final PublishFailureNotificationDispatcher errorNotificationDispatcher = new PublishFailureNotificationDispatcher();
    private final JCSMPStreamingPublishCorrelatingEventHandler publishEventHandler = new JCSMPStreamingPublishCorrelatingEventHandler() { // from class: com.solace.messaging.publisher.DirectMessagePublisherImpl.1
        public void responseReceivedEx(Object obj) {
        }

        public void handleErrorEx(Object obj, JCSMPException jCSMPException, long j) {
            if (DirectMessagePublisherImpl.logger.isErrorEnabled()) {
                DirectMessagePublisherImpl.logger.error(DirectMessagePublisherImpl.this.instanceName + " encountered problem during message publishing.", jCSMPException);
            }
            DirectMessagePublisherImpl.this.errorNotificationDispatcher.onException((Exception) jCSMPException, j);
        }

        public void handleError(String str, JCSMPException jCSMPException, long j) {
        }

        public void responseReceived(String str) {
        }
    };
    private final Task<DirectMessagePublisherImpl> postTerminationClearBufferSilentTask = directMessagePublisherImpl -> {
        try {
            int size = directMessagePublisherImpl.buffer.size();
            if (!(size < STATE_STARTED)) {
                directMessagePublisherImpl.buffer.clear();
                directMessagePublisherImpl.serviceInternalView.getApiMetricsCollector().increaseMetric(Manageable.ApiMetrics.Metric.PUBLISH_MESSAGES_TERMINATION_DISCARDED, size);
                if (logger.isWarnEnabled()) {
                    logger.warn(directMessagePublisherImpl.instanceName + " non-gracefully terminated before all buffered messages were processed.");
                }
            }
        } catch (PubSubPlusClientException.RequestInterruptedException e) {
            if (logger.isWarnEnabled()) {
                logger.warn("Non-graceful termination of " + directMessagePublisherImpl.instanceName + " was interrupted");
            }
        }
        if (this.publisherBotExecutorService.isShutdown()) {
            return;
        }
        this.publisherBotExecutorService.shutdown();
    };
    private final MessagingService.ServiceInterruptionListener serviceInterruptionListener = new MessagingService.ServiceInterruptionListener() { // from class: com.solace.messaging.publisher.DirectMessagePublisherImpl.2
        @Override // com.solace.messaging.MessagingService.ServiceInterruptionListener
        public void onServiceInterrupted(MessagingService.ServiceEvent serviceEvent) {
            if (DirectMessagePublisherImpl.logger.isWarnEnabled()) {
                DirectMessagePublisherImpl.logger.warn("Shutting down publisher due to Service interruption");
            }
            DirectMessagePublisherImpl.this.stateHolder.set(DirectMessagePublisherImpl.STATE_TERMINATED);
            DirectMessagePublisherImpl.this.terminationNotificationDispatcher.onTermination(new TerminationEventImpl(serviceEvent.getTimestamp(), serviceEvent.getMessage(), serviceEvent.getCause()));
            DirectMessagePublisherImpl.this.onTerminate(null, DirectMessagePublisherImpl.this.postTerminationClearBufferSilentTask);
            if (DirectMessagePublisherImpl.this.producer != null) {
                DirectMessagePublisherImpl.this.producer.close();
            }
        }
    };
    private final ClientSession.ClientSessionStateListener closedSessionListener = new ClientSession.ClientSessionStateListener() { // from class: com.solace.messaging.publisher.DirectMessagePublisherImpl.3
        @Override // com.solace.messaging.util.internal.ClientSession.ClientSessionStateListener
        public void onClientSessionState(ClientSession.ClientSessionStateChangeEvent clientSessionStateChangeEvent) {
            if (DirectMessagePublisherImpl.logger.isWarnEnabled()) {
                DirectMessagePublisherImpl.logger.warn("Shutting down receiver due to service closure");
            }
            DirectMessagePublisherImpl.this.stateHolder.set(DirectMessagePublisherImpl.STATE_TERMINATED);
            DirectMessagePublisherImpl.this.terminationNotificationDispatcher.onTermination(new TerminationEventImpl(clientSessionStateChangeEvent.getTimestamp(), clientSessionStateChangeEvent.getMessage(), clientSessionStateChangeEvent.getCause()));
            DirectMessagePublisherImpl.this.onTerminate(null, DirectMessagePublisherImpl.this.postTerminationClearBufferSilentTask);
            if (DirectMessagePublisherImpl.this.producer != null) {
                DirectMessagePublisherImpl.this.producer.close();
            }
        }
    };
    private final PublisherCongestionNotificationDispatcher bufferCongestionNotificationDispatcher = new PublisherCongestionNotificationDispatcher(this);

    @Internal
    @ProviderType
    /* loaded from: input_file:com/solace/messaging/publisher/DirectMessagePublisherImpl$DirectPublisherInfoImpl.class */
    private class DirectPublisherInfoImpl implements ManageablePublisher.DirectPublisherInfo {
        private DirectPublisherInfoImpl() {
        }

        @Override // com.solace.messaging.util.ManageablePublisher.PublisherInfo
        public String getInstanceName() {
            return DirectMessagePublisherImpl.this.instanceName;
        }

        @Override // com.solace.messaging.util.Identifiable
        public long getId() {
            return DirectMessagePublisherImpl.this.id;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @Internal
    @ProviderType
    /* loaded from: input_file:com/solace/messaging/publisher/DirectMessagePublisherImpl$PublishFailureNotificationDispatcher.class */
    public class PublishFailureNotificationDispatcher {
        private final ExecutorService failureNotificationExecutorService;
        private volatile DirectMessagePublisher.PublishFailureListener publishFailureListener;

        /* JADX INFO: Access modifiers changed from: package-private */
        @Internal
        @ProviderType
        /* loaded from: input_file:com/solace/messaging/publisher/DirectMessagePublisherImpl$PublishFailureNotificationDispatcher$ScheduledFailureNotification.class */
        public class ScheduledFailureNotification implements Callable<Void> {
            final Exception e;
            final long timeStamp;
            final PublisherBuffers.Publishable<Topic> publishable;

            ScheduledFailureNotification(PublisherBuffers.Publishable<Topic> publishable, Exception exc, long j) {
                this.e = exc;
                this.timeStamp = j;
                this.publishable = publishable;
            }

            ScheduledFailureNotification(Exception exc, long j) {
                this.e = exc;
                this.timeStamp = j;
                this.publishable = PublisherBuffers.Publishable.none();
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Void call() throws Exception {
                DirectMessagePublisher.PublishFailureListener publishFailureListener = PublishFailureNotificationDispatcher.this.publishFailureListener;
                if (publishFailureListener == null) {
                    return null;
                }
                publishFailureListener.onFailedPublish(new DirectMessagePublisher.FailedPublishEvent(this.publishable.getMessage(), this.publishable.getDestination(), mapException(this.e), this.timeStamp));
                return null;
            }

            PubSubPlusClientException mapException(Exception exc) {
                return exc instanceof PubSubPlusClientException ? (PubSubPlusClientException) exc : new PubSubPlusClientException(exc);
            }
        }

        private PublishFailureNotificationDispatcher() {
            this.failureNotificationExecutorService = Executors.newSingleThreadExecutor(new ThreadFactories.NamedDaemonThreadFactory(DirectMessagePublisherImpl.this.instanceName + "-error-dispatcher"));
        }

        void onException(PublisherBuffers.Publishable<Topic> publishable, Exception exc) {
            if (exc == null || publishable == null || this.publishFailureListener == null) {
                return;
            }
            ScheduledFailureNotification scheduledFailureNotification = new ScheduledFailureNotification(publishable, exc, Instant.now().toEpochMilli());
            try {
                this.failureNotificationExecutorService.submit(scheduledFailureNotification);
            } catch (RejectedExecutionException e) {
                if (DirectMessagePublisherImpl.logger.isWarnEnabled()) {
                    DirectMessagePublisherImpl.logger.warn(DirectMessagePublisherImpl.this.instanceName + " could not schedule publisher failure notification, processing notification on a dispatcher thread");
                }
                try {
                    scheduledFailureNotification.call();
                } catch (Exception e2) {
                    DirectMessagePublisherImpl.logger.debug("Exception by customer callback during publish error notification processing", e2);
                }
            }
        }

        void onException(Exception exc) {
            onException(exc, Instant.now().toEpochMilli());
        }

        void onException(Exception exc, long j) {
            if (exc == null || this.publishFailureListener == null) {
                return;
            }
            ScheduledFailureNotification scheduledFailureNotification = new ScheduledFailureNotification(exc, j);
            try {
                this.failureNotificationExecutorService.submit(scheduledFailureNotification);
            } catch (RejectedExecutionException e) {
                DirectMessagePublisherImpl.logger.warn(DirectMessagePublisherImpl.this.instanceName + " could not schedule publisher failure notification, processing notification on a dispatcher thread");
                try {
                    scheduledFailureNotification.call();
                } catch (Exception e2) {
                    DirectMessagePublisherImpl.logger.debug("Exception by customer callback during publish error notification processing", e2);
                }
            }
        }

        void setPublishFailureListener(DirectMessagePublisher.PublishFailureListener publishFailureListener) {
            this.publishFailureListener = publishFailureListener;
        }
    }

    public DirectMessagePublisherImpl(MessagingServiceInternalView messagingServiceInternalView, TypedProperties typedProperties, OutboundMessageBuilder outboundMessageBuilder) {
        this.serviceInternalView = messagingServiceInternalView;
        this.messageBuilder = configureMessageBuilder(outboundMessageBuilder);
        this.publisherConfiguration = typedProperties;
        this.buffer = PublisherBuffers.createBuffer(this.publisherConfiguration);
        this.buffer.setBufferCongestionMonitor(this.bufferCongestionNotificationDispatcher, STATE_STARTED);
    }

    @Override // com.solace.messaging.publisher.DirectMessagePublisher, com.solace.messaging.util.ManageablePublisher
    public ManageablePublisher.DirectPublisherInfo publisherInfo() {
        return this.publisherInfo;
    }

    @Override // com.solace.messaging.publisher.DirectMessagePublisher, com.solace.messaging.util.LifecycleControl
    public DirectMessagePublisher start() {
        if (STATE_STARTED == this.stateHolder.get()) {
            return this;
        }
        try {
            startAsync().get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PubSubPlusClientException.RequestInterruptedException("Publisher start was canceled", e);
        } catch (CancellationException e2) {
            throw new PubSubPlusClientException.RequestInterruptedException("Publisher start was canceled", e2);
        } catch (ExecutionException e3) {
            Throwable cause = e3.getCause();
            if (cause != null) {
                if (cause instanceof PubSubPlusClientException) {
                    throw ((PubSubPlusClientException) cause);
                }
                if (cause instanceof IllegalStateException) {
                    throw ((IllegalStateException) cause);
                }
                throw new PubSubPlusClientException(cause);
            }
            if (logger.isErrorEnabled()) {
                logger.error(this.instanceName + " failed to start");
            }
        }
        return this;
    }

    @Override // com.solace.messaging.util.LifecycleControl
    public void terminate(long j) throws PubSubPlusClientException {
        if (j == 0) {
            terminateNow();
            return;
        }
        try {
            terminateAsync(j).get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new PubSubPlusClientException.RequestInterruptedException("Publisher termination was canceled", e);
        } catch (ExecutionException e2) {
            Throwable cause = e2.getCause();
            if (cause == null) {
                throw new PubSubPlusClientException(e2);
            }
            if (cause instanceof PubSubPlusClientException) {
                throw ((PubSubPlusClientException) cause);
            }
            if (!(cause instanceof IllegalStateException)) {
                throw new PubSubPlusClientException(cause);
            }
            throw ((IllegalStateException) cause);
        }
    }

    void terminateNow() throws PubSubPlusClientException.IncompleteMessageDeliveryException, PubSubPlusClientException, IllegalStateException {
        AtomicInteger atomicInteger = new AtomicInteger(STATE_NOT_STARTED);
        try {
            logger.debug(this.instanceName + " is being non gracefully terminated");
            onTerminate(null, postTerminateNowTask, atomicInteger);
            logger.debug(this.instanceName + " is terminated");
            int i = atomicInteger.get();
            if (i > 0) {
                throw new PubSubPlusClientException.IncompleteMessageDeliveryException(String.format("Delivery of %d messages could not be completed due to non graceful termination", Integer.valueOf(i)), i);
            }
        } catch (Throwable th) {
            int i2 = atomicInteger.get();
            if (i2 <= 0) {
                throw th;
            }
            throw new PubSubPlusClientException.IncompleteMessageDeliveryException(String.format("Delivery of %d messages could not be completed due to non graceful termination", Integer.valueOf(i2)), i2);
        }
    }

    @Override // com.solace.messaging.util.LifecycleControl
    public boolean isRunning() {
        return STATE_STARTED == this.stateHolder.get();
    }

    @Override // com.solace.messaging.util.LifecycleControl
    public boolean isTerminated() {
        return STATE_TERMINATED == this.stateHolder.get();
    }

    @Override // com.solace.messaging.util.LifecycleControl
    public boolean isTerminating() {
        return this.gracefulShutdownInProgress;
    }

    @Override // com.solace.messaging.util.LifecycleControl
    public void setTerminationNotificationListener(LifecycleControl.TerminationNotificationListener terminationNotificationListener) {
        this.terminationNotificationDispatcher.setTerminationNotificationListener(terminationNotificationListener);
    }

    @Override // com.solace.messaging.publisher.DirectMessagePublisher, com.solace.messaging.util.AsyncLifecycleControl
    public <DirectMessagePublisher> CompletableFuture<DirectMessagePublisher> startAsync() throws PubSubPlusClientException {
        ExtendedCompletableFuture extendedCompletableFuture;
        if (!this.serviceInternalView.isConnected()) {
            throw new IllegalStateException("Publisher can't be started before it is connected to a messaging service");
        }
        if (this.stateHolder.get() == STATE_TERMINATED) {
            throw new IllegalStateException("Message publisher is already terminated");
        }
        do {
            extendedCompletableFuture = new ExtendedCompletableFuture();
            if (STATE_TERMINATED == this.stateHolder.get()) {
                return ExtendedCompletableFuture.failedFuture(new IllegalStateException("Publisher is already terminated"));
            }
            if (this.stateHolder.compareAndSet(STATE_NOT_STARTED, STATE_STARTED)) {
                if (logger.isDebugEnabled()) {
                    logger.debug(this.instanceName + " is being started");
                }
                try {
                    onStart();
                    extendedCompletableFuture.complete(this);
                    if (logger.isDebugEnabled()) {
                        logger.debug(this.instanceName + " is started");
                    }
                } catch (Exception e) {
                    logger.error(this.instanceName + " failed to start and is terminating", e);
                    onTerminate(null, this.postTerminationClearBufferSilentTask);
                    extendedCompletableFuture.completeExceptionally(new IllegalStateException("Publisher is already closed due to internal error"));
                }
                return ExtendedCompletableFuture.onCancellation(extendedCompletableFuture, (obj, th) -> {
                    this.stateHolder.set(STATE_TERMINATED);
                    onTerminate(null, this.postTerminationClearBufferSilentTask);
                    if (logger.isDebugEnabled()) {
                        logger.debug(this.instanceName + " async start was canceled");
                    }
                });
            }
        } while (!(STATE_STARTED == this.stateHolder.get()));
        extendedCompletableFuture.complete(this);
        return extendedCompletableFuture;
    }

    @Override // com.solace.messaging.publisher.DirectMessagePublisher, com.solace.messaging.util.AsyncLifecycleControl
    public <DirectMessagePublisher> void startAsync(CompletionListener<DirectMessagePublisher> completionListener) throws PubSubPlusClientException, IllegalStateException {
        Validation.nullIllegal(completionListener, "Start listener can't be null");
        startAsync().whenComplete((obj, th) -> {
            Throwable cause;
            if (th == null) {
                cause = null;
            } else {
                try {
                    cause = th.getCause();
                } catch (Exception e) {
                    e.printStackTrace();
                    if (logger.isErrorEnabled()) {
                        logger.error(this.instanceName + " failed to start", e);
                        return;
                    }
                    return;
                }
            }
            completionListener.onCompletion(obj, cause);
        });
    }

    @Override // com.solace.messaging.publisher.DirectMessagePublisher
    public void setPublishFailureListener(DirectMessagePublisher.PublishFailureListener publishFailureListener) {
        this.errorNotificationDispatcher.setPublishFailureListener(publishFailureListener);
    }

    @Override // com.solace.messaging.util.AsyncLifecycleControl
    public CompletableFuture<Void> terminateAsync(long j) throws PubSubPlusClientException, IllegalStateException {
        Validation.smallerThanNumbersIllegal(1L, j, "Grace period < 1");
        ExtendedCompletableFuture extendedCompletableFuture = new ExtendedCompletableFuture();
        Task<DirectMessagePublisherImpl> task = directMessagePublisherImpl -> {
            directMessagePublisherImpl.stateHolder.set(STATE_TERMINATED);
            directMessagePublisherImpl.gracefulShutdownInProgress = true;
        };
        AtomicInteger atomicInteger = new AtomicInteger(STATE_NOT_STARTED);
        Task<DirectMessagePublisherImpl> task2 = directMessagePublisherImpl2 -> {
            try {
                boolean awaitEmpty = directMessagePublisherImpl2.buffer.awaitEmpty(j, TimeUnit.MILLISECONDS);
                directMessagePublisherImpl2.gracefulShutdownInProgress = false;
                if (!awaitEmpty) {
                    if (logger.isWarnEnabled()) {
                        logger.warn(directMessagePublisherImpl2.instanceName + " gracefully terminated before all buffered messages were processed, not sufficient grace period of " + j);
                    }
                    atomicInteger.set(directMessagePublisherImpl2.buffer.size());
                    onGracefulTerminateEmptyRemainingBuffer(directMessagePublisherImpl2.buffer);
                }
            } catch (PubSubPlusClientException.RequestInterruptedException e) {
                if (logger.isWarnEnabled()) {
                    logger.warn("Graceful termination of " + directMessagePublisherImpl2.instanceName + " was interrupted");
                }
            }
            if (this.publisherBotExecutorService.isShutdown()) {
                return;
            }
            this.publisherBotExecutorService.shutdown();
        };
        if (logger.isDebugEnabled()) {
            logger.debug(this.instanceName + " is being terminated");
        }
        onTerminate(task, task2);
        int i = atomicInteger.get();
        if (i > 0) {
            this.serviceInternalView.getApiMetricsCollector().increaseMetric(Manageable.ApiMetrics.Metric.PUBLISH_MESSAGES_TERMINATION_DISCARDED, i);
            extendedCompletableFuture.completeExceptionally(new PubSubPlusClientException.IncompleteMessageDeliveryException(String.format("Delivery of %d messages could not be completed due to expiration of a grace period", Integer.valueOf(i)), i));
        } else {
            extendedCompletableFuture.complete(null);
        }
        if (logger.isDebugEnabled()) {
            logger.debug(this.instanceName + " is terminated");
        }
        return extendedCompletableFuture;
    }

    @Override // com.solace.messaging.util.AsyncLifecycleControl
    public void terminateAsync(CompletionListener<Void> completionListener, long j) throws PubSubPlusClientException, IllegalStateException {
        Validation.nullIllegal(completionListener, "Termination listener can't be null");
        terminateAsync(j).whenComplete((r6, th) -> {
            Throwable cause;
            if (th == null) {
                cause = null;
            } else {
                try {
                    cause = th.getCause();
                } catch (Exception e) {
                    logger.warn(this.instanceName + "  encountered problem during termination.", e);
                    return;
                }
            }
            completionListener.onCompletion(null, cause);
        });
    }

    @Override // com.solace.messaging.publisher.PublisherHealthCheck
    public boolean isReady() {
        return isRunning() && this.buffer.remainingCapacity() > 0 && !this.gracefulShutdownInProgress;
    }

    @Override // com.solace.messaging.publisher.PublisherHealthCheck
    public void setPublisherReadinessListener(PublisherHealthCheck.PublisherReadinessListener publisherReadinessListener) {
        this.bufferCongestionNotificationDispatcher.setPublisherReadinessListener(publisherReadinessListener);
    }

    @Override // com.solace.messaging.publisher.PublisherHealthCheck
    public void notifyWhenReady() {
        this.bufferCongestionNotificationDispatcher.notifyWhenReady();
    }

    @Override // com.solace.messaging.publisher.DirectMessagePublisher
    public void publish(byte[] bArr, Topic topic) throws PubSubPlusClientException {
        validatePublisher();
        Validation.nullIllegal(bArr, "Message can't be null");
        Validation.nullIllegal(topic, "Message destination can't be null");
        publishInternalMessage(this.messageBuilder.build(bArr), topic, null);
    }

    @Override // com.solace.messaging.publisher.DirectMessagePublisher
    public void publish(String str, Topic topic) throws PubSubPlusClientException {
        validatePublisher();
        Validation.nullIllegal(str, "Message can't be null");
        Validation.nullIllegal(topic, "Message destination can't be null");
        publishInternalMessage(this.messageBuilder.build(str), topic, null);
    }

    @Override // com.solace.messaging.publisher.DirectMessagePublisher
    public void publish(OutboundMessage outboundMessage, Topic topic) throws PubSubPlusClientException {
        publish(outboundMessage, topic, null);
    }

    @Override // com.solace.messaging.publisher.DirectMessagePublisher
    public void publish(OutboundMessage outboundMessage, Topic topic, Properties properties) throws PubSubPlusClientException, IllegalStateException, IllegalArgumentException, PubSubPlusClientException.PublisherOverflowException {
        validatePublisher();
        Validation.nullIllegal(outboundMessage, "Message can't be null");
        Validation.nullIllegal(topic, "Message destination can't be null");
        publishExternalMessage(outboundMessage, topic, properties);
    }

    @Internal
    void publishExternalMessage(OutboundMessage outboundMessage, Topic topic, Properties properties) {
        this.buffer.insert(PublisherBuffers.Publishable.of((properties == null || properties.isEmpty()) ? outboundMessage : OutboundMessageBuilder.deepCopy(outboundMessage, properties), topic));
    }

    @Internal
    void publishInternalMessage(OutboundMessage outboundMessage, Topic topic, Properties properties) {
        this.buffer.insert(PublisherBuffers.Publishable.of((properties == null || properties.isEmpty()) ? outboundMessage : OutboundMessageBuilder.OutboundMessageBuilderImpl.injectExtendedMessageProperties(outboundMessage, properties), topic));
    }

    void validatePublisher() {
        if (isTerminating() || isTerminated()) {
            throw new IllegalStateException("Message publisher was terminated");
        }
        if (!isRunning()) {
            throw new IllegalStateException("Message publisher not started");
        }
    }

    OutboundMessageBuilder configureMessageBuilder(OutboundMessageBuilder outboundMessageBuilder) {
        if (outboundMessageBuilder instanceof OutboundMessageBuilder.OutboundMessageBuilderImpl) {
            ((OutboundMessageBuilder.OutboundMessageBuilderImpl) outboundMessageBuilder).forDirectMessagePublisher();
        }
        return outboundMessageBuilder;
    }

    void onStart() throws PubSubPlusClientException {
        this.serviceInternalView.getClientSession().addServiceInterruptionListener(this.serviceInterruptionListener);
        this.serviceInternalView.getClientSession().addClientSessionStateListener(this.closedSessionListener);
        this.producer = createMessageProducer(this.publishEventHandler);
        this.publisherBotExecutorService.submit(() -> {
            XMLMessageProducer xMLMessageProducer = this.producer;
            boolean z = STATE_NOT_STARTED;
            while (true) {
                if (!isRunning()) {
                    boolean z2 = this.gracefulShutdownInProgress;
                    z = z2;
                    if (!z2) {
                        return;
                    }
                }
                if (xMLMessageProducer == null) {
                    onTerminate(null, this.postTerminationClearBufferSilentTask);
                    if (logger.isErrorEnabled()) {
                        logger.error(this.instanceName + " could not create an internal service to publish messages to a broker.");
                        return;
                    }
                    return;
                }
                if (!xMLMessageProducer.isClosed()) {
                    PublisherBuffers.Publishable<Topic> consume = this.buffer.consume();
                    if (consume != null) {
                        try {
                            if (xMLMessageProducer.isClosed()) {
                                this.serviceInternalView.getClientSession().getSessionStats().incStat(StatType.MESSAGES_DISCARDED_INTERNAL);
                                if (this.gracefulShutdownInProgress) {
                                    return;
                                } else {
                                    xMLMessageProducer = this.producer;
                                }
                            } else {
                                xMLMessageProducer.send(OutboundMessageBuilder.OutboundMessageBuilderImpl.OutboundMessageImpl.toByteMessage(consume.getMessage()), consume.getDestination());
                            }
                        } catch (Exception e) {
                            this.serviceInternalView.getClientSession().getSessionStats().incStat(StatType.MESSAGES_DISCARDED_INTERNAL);
                            if ((e instanceof ClosedFacilityException) && this.gracefulShutdownInProgress) {
                                return;
                            }
                            if (logger.isErrorEnabled()) {
                                logger.error(this.instanceName + " could not publish message to a broker. Message:" + consume.getMessage() + ", destination: " + consume.getDestination(), e);
                            }
                            xMLMessageProducer = this.producer;
                            this.errorNotificationDispatcher.onException(consume, e);
                        }
                    } else {
                        continue;
                    }
                } else {
                    if (z) {
                        return;
                    }
                    try {
                        Thread.sleep(500L);
                    } catch (InterruptedException e2) {
                    }
                    xMLMessageProducer = this.producer;
                }
            }
        });
    }

    void onTerminate(Task<DirectMessagePublisherImpl> task, Task<DirectMessagePublisherImpl> task2) {
        if (task != null) {
            try {
                task.run(this);
            } finally {
                if (task2 != null) {
                    task2.run(this);
                }
            }
        }
        this.stateHolder.set(STATE_TERMINATED);
        this.serviceInternalView.getClientSession().removeServiceInterruptionListener(this.serviceInterruptionListener);
        this.serviceInternalView.getClientSession().removeClientSessionStateListener(this.closedSessionListener);
        this.buffer.setBufferCongestionMonitor(null, -1);
        this.bufferCongestionNotificationDispatcher.close();
        if (logger.isDebugEnabled()) {
            logger.debug(this.instanceName + " publisher is shutdown");
        }
    }

    <C> void onTerminate(Task<DirectMessagePublisherImpl> task, BiTask<DirectMessagePublisherImpl, C> biTask, C c) {
        if (task != null) {
            try {
                task.run(this);
            } finally {
                if (biTask != null) {
                    biTask.run(this, c);
                }
            }
        }
        this.stateHolder.set(STATE_TERMINATED);
        this.serviceInternalView.getClientSession().removeServiceInterruptionListener(this.serviceInterruptionListener);
        this.serviceInternalView.getClientSession().removeClientSessionStateListener(this.closedSessionListener);
        this.buffer.setBufferCongestionMonitor(null, -1);
        this.bufferCongestionNotificationDispatcher.close();
        if (logger.isDebugEnabled()) {
            logger.debug(this.instanceName + " publisher is shutdown");
        }
    }

    XMLMessageProducer createMessageProducer(JCSMPStreamingPublishEventHandler jCSMPStreamingPublishEventHandler) throws PubSubPlusClientException {
        try {
            return this.serviceInternalView.getClientSession().getMessageProducer(jCSMPStreamingPublishEventHandler);
        } catch (Exception e) {
            throw new PubSubPlusClientException("Failed to create message publisher", e);
        }
    }

    void onGracefulTerminateEmptyRemainingBuffer(PublisherBuffers.PublisherBuffer<Topic> publisherBuffer) {
        publisherBuffer.clear();
    }
}
