package com.rabbitmq.stream.impl;

import com.rabbitmq.stream.Codec;
import com.rabbitmq.stream.ConfirmationHandler;
import com.rabbitmq.stream.ConfirmationStatus;
import com.rabbitmq.stream.Message;
import com.rabbitmq.stream.MessageBuilder;
import com.rabbitmq.stream.Producer;
import com.rabbitmq.stream.StreamException;
import com.rabbitmq.stream.compression.Compression;
import com.rabbitmq.stream.impl.Client;
import com.rabbitmq.stream.impl.MessageAccumulator;
import io.netty.buffer.ByteBuf;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.ToLongFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/rabbitmq/stream/impl/StreamProducer.class */
public class StreamProducer implements Producer {
    private static final Logger LOGGER = LoggerFactory.getLogger(StreamProducer.class);
    private static final ConfirmationHandler NO_OP_CONFIRMATION_HANDLER = confirmationStatus -> {
    };
    private final MessageAccumulator accumulator;
    private final ConcurrentMap<Long, MessageAccumulator.AccumulatedEntity> unconfirmedMessages;
    private final int batchSize;
    private final String name;
    private final String stream;
    private final Client.OutboundEntityWriteCallback writeCallback;
    private final Semaphore unconfirmedMessagesSemaphore;
    private final Runnable closingCallback;
    private final StreamEnvironment environment;
    private final int maxUnconfirmedMessages;
    private final Codec codec;
    private final long enqueueTimeoutMs;
    private final boolean blockOnMaxUnconfirmed;
    private volatile Client client;
    private volatile byte publisherId;
    private volatile Status status;
    private volatile ScheduledFuture<?> confirmTimeoutFuture;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final ToLongFunction<Object> publishSequenceFunction = obj -> {
        return ((MessageAccumulator.AccumulatedEntity) obj).publishindId();
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/rabbitmq/stream/impl/StreamProducer$ConfirmationCallback.class */
    public interface ConfirmationCallback {
        int handle(boolean z, short s);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/rabbitmq/stream/impl/StreamProducer$Status.class */
    public enum Status {
        RUNNING,
        NOT_AVAILABLE,
        CLOSED
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamProducer(String str, String str2, int i, int i2, Compression compression, Duration duration, int i3, Duration duration2, Duration duration3, StreamEnvironment streamEnvironment) {
        Client.OutboundEntityWriteCallback outboundEntityWriteCallback;
        this.environment = streamEnvironment;
        this.name = str;
        this.stream = str2;
        this.enqueueTimeoutMs = duration3.toMillis();
        this.blockOnMaxUnconfirmed = duration3.isZero();
        this.closingCallback = streamEnvironment.registerProducer(this, str, this.stream);
        AtomicLong atomicLong = new AtomicLong(computeFirstValueOfPublishingSequence());
        ToLongFunction toLongFunction = message -> {
            return message.hasPublishingId() ? message.getPublishingId() : atomicLong.getAndIncrement();
        };
        if (i <= 1) {
            this.accumulator = new SimpleMessageAccumulator(i2, streamEnvironment.codec(), this.client.maxFrameSize(), toLongFunction, this.environment.clock());
            outboundEntityWriteCallback = Client.OUTBOUND_MESSAGE_WRITE_CALLBACK;
        } else {
            this.accumulator = new SubEntryMessageAccumulator(i, i2, compression == Compression.NONE ? null : streamEnvironment.compressionCodecFactory().get(compression), streamEnvironment.codec(), this.environment.byteBufAllocator(), this.client.maxFrameSize(), toLongFunction, this.environment.clock());
            outboundEntityWriteCallback = Client.OUTBOUND_MESSAGE_BATCH_WRITE_CALLBACK;
        }
        this.maxUnconfirmedMessages = i3;
        this.unconfirmedMessagesSemaphore = new Semaphore(i3, true);
        this.unconfirmedMessages = new ConcurrentHashMap(this.maxUnconfirmedMessages, 0.75f, 2);
        final Client.OutboundEntityWriteCallback outboundEntityWriteCallback2 = outboundEntityWriteCallback;
        this.writeCallback = new Client.OutboundEntityWriteCallback() { // from class: com.rabbitmq.stream.impl.StreamProducer.1
            @Override // com.rabbitmq.stream.impl.Client.OutboundEntityWriteCallback
            public int write(ByteBuf byteBuf, Object obj, long j) {
                MessageAccumulator.AccumulatedEntity accumulatedEntity = (MessageAccumulator.AccumulatedEntity) obj;
                StreamProducer.this.unconfirmedMessages.put(Long.valueOf(j), accumulatedEntity);
                return outboundEntityWriteCallback2.write(byteBuf, accumulatedEntity.encodedEntity(), j);
            }

            @Override // com.rabbitmq.stream.impl.Client.OutboundEntityWriteCallback
            public int fragmentLength(Object obj) {
                return outboundEntityWriteCallback2.fragmentLength(((MessageAccumulator.AccumulatedEntity) obj).encodedEntity());
            }
        };
        if (!duration.isNegative() && !duration.isZero()) {
            AtomicReference atomicReference = new AtomicReference();
            Runnable runnable = () -> {
                if (canSend()) {
                    synchronized (this) {
                        publishBatch(true);
                    }
                }
                if (this.status != Status.CLOSED) {
                    streamEnvironment.scheduledExecutorService().schedule((Runnable) atomicReference.get(), duration.toMillis(), TimeUnit.MILLISECONDS);
                }
            };
            atomicReference.set(runnable);
            streamEnvironment.scheduledExecutorService().schedule(runnable, duration.toMillis(), TimeUnit.MILLISECONDS);
        }
        this.batchSize = i2;
        this.codec = streamEnvironment.codec();
        if (!duration2.isZero()) {
            AtomicReference atomicReference2 = new AtomicReference();
            Runnable confirmTimeoutTask = confirmTimeoutTask(duration2);
            atomicReference2.set(() -> {
                try {
                    confirmTimeoutTask.run();
                } catch (Exception e) {
                    LOGGER.info("Error while executing confirm timeout check task: {}", e.getCause());
                }
                if (this.status != Status.CLOSED) {
                    this.confirmTimeoutFuture = this.environment.scheduledExecutorService().schedule((Runnable) atomicReference2.get(), duration2.toMillis(), TimeUnit.MILLISECONDS);
                }
            });
            this.confirmTimeoutFuture = this.environment.scheduledExecutorService().schedule((Runnable) atomicReference2.get(), duration2.toMillis(), TimeUnit.MILLISECONDS);
        }
        this.status = Status.RUNNING;
    }

    private Runnable confirmTimeoutTask(Duration duration) {
        return () -> {
            long time = this.environment.clock().time() - duration.toNanos();
            TreeMap treeMap = new TreeMap(this.unconfirmedMessages);
            LOGGER.debug("Starting confirm timeout check task");
            int i = 0;
            for (Map.Entry entry : treeMap.entrySet()) {
                if (((MessageAccumulator.AccumulatedEntity) entry.getValue()).time() >= time) {
                    break;
                }
                if (Thread.currentThread().isInterrupted()) {
                    return;
                }
                error(((Long) entry.getKey()).longValue(), (short) 10004);
                i++;
            }
            LOGGER.debug("Failed {} message(s) which had timed out (limit {})", Integer.valueOf(i), Long.valueOf(time));
        };
    }

    private long computeFirstValueOfPublishingSequence() {
        if (this.name == null || this.name.isEmpty()) {
            return 0L;
        }
        long queryPublisherSequence = this.client.queryPublisherSequence(this.name, this.stream);
        if (queryPublisherSequence == 0) {
            return 0L;
        }
        return queryPublisherSequence + 1;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void confirm(long j) {
        MessageAccumulator.AccumulatedEntity remove = this.unconfirmedMessages.remove(Long.valueOf(j));
        if (remove == null) {
            this.unconfirmedMessagesSemaphore.release();
        } else {
            this.unconfirmedMessagesSemaphore.release(remove.confirmationCallback().handle(true, (short) 1));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void error(long j, short s) {
        MessageAccumulator.AccumulatedEntity remove = this.unconfirmedMessages.remove(Long.valueOf(j));
        if (remove == null) {
            this.unconfirmedMessagesSemaphore.release();
        } else {
            this.unconfirmedMessagesSemaphore.release(remove.confirmationCallback().handle(false, s));
        }
    }

    @Override // com.rabbitmq.stream.Producer
    public MessageBuilder messageBuilder() {
        return this.codec.messageBuilder();
    }

    @Override // com.rabbitmq.stream.Producer
    public long getLastPublishingId() {
        if (this.name == null || this.name.isEmpty()) {
            throw new IllegalStateException("The producer has no name");
        }
        if (!canSend()) {
            throw new IllegalStateException("The producer has no connection");
        }
        try {
            return this.client.queryPublisherSequence(this.name, this.stream);
        } catch (Exception e) {
            throw new IllegalStateException("Error while trying to query last publishing ID for producer " + this.name + " on stream " + this.stream);
        }
    }

    @Override // com.rabbitmq.stream.Producer
    public void send(Message message, ConfirmationHandler confirmationHandler) {
        if (confirmationHandler == null) {
            confirmationHandler = NO_OP_CONFIRMATION_HANDLER;
        }
        try {
            if (!canSend()) {
                failPublishing(message, confirmationHandler);
            } else if (this.blockOnMaxUnconfirmed) {
                this.unconfirmedMessagesSemaphore.acquire();
                doSend(message, confirmationHandler);
            } else if (this.unconfirmedMessagesSemaphore.tryAcquire(this.enqueueTimeoutMs, TimeUnit.MILLISECONDS)) {
                doSend(message, confirmationHandler);
            } else {
                confirmationHandler.handle(new ConfirmationStatus(message, false, (short) 10001));
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new StreamException("Interrupted while waiting to accumulate outbound message", e);
        }
    }

    private void doSend(Message message, ConfirmationHandler confirmationHandler) {
        if (!canSend()) {
            failPublishing(message, confirmationHandler);
        } else if (this.accumulator.add(message, confirmationHandler)) {
            synchronized (this) {
                publishBatch(true);
            }
        }
    }

    private void failPublishing(Message message, ConfirmationHandler confirmationHandler) {
        if (this.status == Status.NOT_AVAILABLE) {
            confirmationHandler.handle(new ConfirmationStatus(message, false, (short) 10002));
        } else if (this.status == Status.CLOSED) {
            confirmationHandler.handle(new ConfirmationStatus(message, false, (short) 10003));
        } else {
            confirmationHandler.handle(new ConfirmationStatus(message, false, (short) 10002));
        }
    }

    private boolean canSend() {
        return this.status == Status.RUNNING;
    }

    @Override // com.rabbitmq.stream.Producer, java.lang.AutoCloseable
    public void close() {
        if (this.closed.compareAndSet(false, true)) {
            if (this.status != Status.RUNNING || this.client == null) {
                LOGGER.debug("No need to delete producer {}, it is currently unavailable", Byte.valueOf(this.publisherId));
            } else {
                LOGGER.debug("Deleting producer {}", Byte.valueOf(this.publisherId));
                Client.Response deletePublisher = this.client.deletePublisher(this.publisherId);
                if (!deletePublisher.isOk()) {
                    LOGGER.info("Could not delete publisher {} on producer closing: {}", Byte.valueOf(this.publisherId), Utils.formatConstant(deletePublisher.getResponseCode()));
                }
            }
            this.environment.removeProducer(this);
            closeFromEnvironment();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void closeFromEnvironment() {
        this.closingCallback.run();
        cancelConfirmTimeoutTask();
        this.closed.set(true);
        this.status = Status.CLOSED;
        LOGGER.debug("Closed publisher {} successfully", Byte.valueOf(this.publisherId));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void closeAfterStreamDeletion(short s) {
        if (this.closed.compareAndSet(false, true)) {
            if (!this.unconfirmedMessages.isEmpty()) {
                Iterator<Map.Entry<Long, MessageAccumulator.AccumulatedEntity>> it = this.unconfirmedMessages.entrySet().iterator();
                while (it.hasNext()) {
                    this.unconfirmedMessagesSemaphore.release(it.next().getValue().confirmationCallback().handle(false, s));
                    it.remove();
                }
            }
            cancelConfirmTimeoutTask();
            this.environment.removeProducer(this);
            this.status = Status.CLOSED;
        }
    }

    private void cancelConfirmTimeoutTask() {
        if (this.confirmTimeoutFuture != null) {
            this.confirmTimeoutFuture.cancel(true);
        }
    }

    private void publishBatch(boolean z) {
        MessageAccumulator.AccumulatedEntity accumulatedEntity;
        if ((!z || canSend()) && !this.accumulator.isEmpty()) {
            ArrayList arrayList = new ArrayList(this.batchSize);
            for (int i = 0; i != this.batchSize && (accumulatedEntity = this.accumulator.get()) != null; i++) {
                arrayList.add(accumulatedEntity);
            }
            this.client.publishInternal(this.publisherId, arrayList, this.writeCallback, this.publishSequenceFunction);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isOpen() {
        return !this.closed.get();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void unavailable() {
        this.status = Status.NOT_AVAILABLE;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void running() {
        synchronized (this) {
            LOGGER.debug("Re-publishing {} unconfirmed message(s) and {} accumulated message(s)", Integer.valueOf(this.unconfirmedMessages.size()), Integer.valueOf(this.accumulator.size()));
            if (!this.unconfirmedMessages.isEmpty()) {
                TreeMap treeMap = new TreeMap(this.unconfirmedMessages);
                this.unconfirmedMessages.clear();
                Iterator it = treeMap.entrySet().iterator();
                while (it.hasNext()) {
                    ArrayList arrayList = new ArrayList(this.batchSize);
                    for (int i = 0; i != this.batchSize; i++) {
                        Object value = it.hasNext() ? ((Map.Entry) it.next()).getValue() : null;
                        if (value == null) {
                            break;
                        }
                        arrayList.add(value);
                    }
                    this.client.publishInternal(this.publisherId, arrayList, this.writeCallback, this.publishSequenceFunction);
                }
            }
            publishBatch(false);
            if (this.unconfirmedMessagesSemaphore.availablePermits() != this.maxUnconfirmedMessages) {
                this.unconfirmedMessagesSemaphore.release(this.maxUnconfirmedMessages - this.unconfirmedMessagesSemaphore.availablePermits());
                if (!this.unconfirmedMessagesSemaphore.tryAcquire(this.unconfirmedMessages.size())) {
                    LOGGER.debug("Could not acquire {} permit(s) for message republishing", Integer.valueOf(this.unconfirmedMessages.size()));
                }
            }
        }
        this.status = Status.RUNNING;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void setClient(Client client) {
        this.client = client;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized void setPublisherId(byte b) {
        this.publisherId = b;
    }

    Status status() {
        return this.status;
    }
}
