package fun.nibaba.lazyfish.rabbit.delay;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;

/* loaded from: input_file:fun/nibaba/lazyfish/rabbit/delay/AbstractLazyRabbitDelay.class */
public abstract class AbstractLazyRabbitDelay<DelayMessage extends Serializable> implements ILazyRabbitDelay<LazyDelayMessage<DelayMessage>> {

    @Autowired
    private RabbitTemplate rabbitTemplate;
    protected Exchange exchange;
    protected Queue deadLetterQueue;
    protected Queue delayQueue;

    @Override // fun.nibaba.lazyfish.rabbit.delay.ILazyRabbitDelay
    public Exchange getExchange() {
        if (this.exchange == null) {
            this.exchange = ExchangeBuilder.topicExchange(getClass().getSimpleName()).build();
        }
        return this.exchange;
    }

    @Override // fun.nibaba.lazyfish.rabbit.delay.ILazyRabbitDelay
    public String getDeadLetterRoutingKey() {
        return getClass().getSimpleName() + "_DeadLetterRoutingKey";
    }

    @Override // fun.nibaba.lazyfish.rabbit.delay.ILazyRabbitDelay
    public Queue getDeadLetterQueue() {
        if (this.deadLetterQueue == null) {
            this.deadLetterQueue = QueueBuilder.durable(getClass().getSimpleName() + "_DeadLetterQueue").withArgument(ILazyRabbitDelay.X_DEAD_LETTER_EXCHANGE_KEY, getExchange().getName()).withArgument(ILazyRabbitDelay.X_DEAD_LETTER_ROUTING_KEY, getDelayRoutingKey()).build();
        }
        return this.deadLetterQueue;
    }

    @Override // fun.nibaba.lazyfish.rabbit.delay.ILazyRabbitDelay
    public String getDelayRoutingKey() {
        return getClass().getSimpleName() + "_DelayRoutingKey";
    }

    @Override // fun.nibaba.lazyfish.rabbit.delay.ILazyRabbitDelay
    public Queue getDelayQueue() {
        if (this.delayQueue == null) {
            this.delayQueue = QueueBuilder.durable(getClass().getSimpleName() + "_DelayQueue").build();
        }
        return this.delayQueue;
    }

    @Override // fun.nibaba.lazyfish.rabbit.delay.ILazyRabbitDelay
    public void sendMessage(LazyDelayMessage<DelayMessage> lazyDelayMessage) {
        this.rabbitTemplate.convertAndSend(getExchange().getName(), getDeadLetterRoutingKey(), lazyDelayMessage, message -> {
            message.getMessageProperties().setExpiration(String.valueOf(getDelaySeconds() * 1000));
            return message;
        }, new CorrelationData(lazyDelayMessage.getMessageId()));
    }

    @Override // fun.nibaba.lazyfish.rabbit.delay.ILazyRabbitDelay
    public Consumer getConsumer(Channel channel) {
        return new DefaultConsumer(channel) { // from class: fun.nibaba.lazyfish.rabbit.delay.AbstractLazyRabbitDelay.1
            public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
                AbstractLazyRabbitDelay.this.listenerMessage(AbstractLazyRabbitDelay.this.deserialize(bArr));
            }
        };
    }

    @Override // fun.nibaba.lazyfish.rabbit.delay.ILazyRabbitDelay
    public void declare(Channel channel) throws IOException {
        Exchange exchange = getExchange();
        Queue deadLetterQueue = getDeadLetterQueue();
        Queue delayQueue = getDelayQueue();
        channel.exchangeDeclare(exchange.getName(), exchange.getType());
        channel.queueDeclare(deadLetterQueue.getName(), deadLetterQueue.isDurable(), deadLetterQueue.isExclusive(), deadLetterQueue.isAutoDelete(), deadLetterQueue.getArguments());
        channel.queueDeclare(delayQueue.getName(), delayQueue.isDurable(), delayQueue.isExclusive(), delayQueue.isAutoDelete(), delayQueue.getArguments());
        channel.queueBind(deadLetterQueue.getName(), exchange.getName(), getDeadLetterRoutingKey());
        channel.queueBind(delayQueue.getName(), exchange.getName(), getDelayRoutingKey());
        channel.basicConsume(delayQueue.getName(), true, getConsumer(channel));
    }

    protected LazyDelayMessage<DelayMessage> deserialize(byte[] bArr) {
        return (LazyDelayMessage) new ObjectInputStream(new ByteArrayInputStream(bArr)).readObject();
    }
}
