package com.xunmo.utils;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.exceptions.ExceptionUtil;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.core.util.StrUtil;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.xunmo.rabbitmq.MqConnectionPool;
import com.xunmo.rabbitmq.MqConnectionPoolConfig;
import com.xunmo.rabbitmq.MqConnectionPoolObjectFactory;
import com.xunmo.rabbitmq.entity.DeadConfig;
import com.xunmo.rabbitmq.entity.MqConfig;
import com.xunmo.rabbitmq.enums.ConsumeAction;
import com.xunmo.rabbitmq.enums.ExchangeType;
import com.xunmo.rabbitmq.enums.SendAction;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Supplier;
import org.noear.solon.Solon;
import org.noear.solon.SolonProps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/xunmo/utils/MqHelper.class */
public class MqHelper {
    private static final int tryCountMax = 3;
    private static final int channelUseMaxCount = 30;
    private static MqConnectionPool mqConnectionPool;
    private static final Logger log = LoggerFactory.getLogger(MqHelper.class);
    private static final AtomicBoolean isInit = new AtomicBoolean(false);
    private static final Object initLock = new Object();
    private static final AtomicInteger sendChannelCount = new AtomicInteger(1);
    private static final AtomicInteger consumerChannelCount = new AtomicInteger(1);
    private static final Map<MqConfig, List<Channel>> channelMap = new ConcurrentHashMap();
    private static final Map<Channel, Integer> channelUseCountMap = new ConcurrentHashMap();
    private static final ConcurrentLinkedQueue<Channel> stayCloseList = new ConcurrentLinkedQueue<>();
    private static final List<String> sendExistsChannelNames = new CopyOnWriteArrayList();
    private static final List<String> consumerExistsChannelNames = new CopyOnWriteArrayList();
    private static final List<String> closeChannelNames = new CopyOnWriteArrayList();

    /* JADX INFO: Access modifiers changed from: package-private */
    @FunctionalInterface
    /* loaded from: input_file:com/xunmo/utils/MqHelper$TryDoing.class */
    public interface TryDoing {
        void tryDo() throws IOException;
    }

    private MqHelper() {
    }

    public static synchronized void initSolon() {
        if (isInit.get()) {
            return;
        }
        synchronized (initLock) {
            if (isInit.get()) {
                return;
            }
            if (Solon.app() != null) {
                SolonProps cfg = Solon.cfg();
                MqConnectionPoolConfig mqConnectionPoolConfig = new MqConnectionPoolConfig();
                mqConnectionPoolConfig.setHost(cfg.get("xm.mq.host"));
                mqConnectionPoolConfig.setUsername(cfg.get("xm.mq.username"));
                mqConnectionPoolConfig.setPassword(cfg.get("xm.mq.password"));
                mqConnectionPoolConfig.setPort(cfg.getInt("xm.mq.port", 5432));
                mqConnectionPoolConfig.setMaxIdle(10);
                mqConnectionPoolConfig.setMaxTotal(20);
                mqConnectionPoolConfig.setMinIdle(1);
                mqConnectionPool = new MqConnectionPool(new MqConnectionPoolObjectFactory(mqConnectionPoolConfig));
                isInit.compareAndSet(false, true);
            }
        }
    }

    public static synchronized void initArgs(String str, String str2, String str3, int i) {
        if (isInit.get()) {
            return;
        }
        synchronized (initLock) {
            if (isInit.get()) {
                return;
            }
            MqConnectionPoolConfig mqConnectionPoolConfig = new MqConnectionPoolConfig();
            mqConnectionPoolConfig.setHost(str);
            mqConnectionPoolConfig.setUsername(str2);
            mqConnectionPoolConfig.setPassword(str3);
            mqConnectionPoolConfig.setPort(i);
            mqConnectionPoolConfig.setMaxIdle(10);
            mqConnectionPoolConfig.setMaxTotal(20);
            mqConnectionPoolConfig.setMinIdle(1);
            mqConnectionPool = new MqConnectionPool(new MqConnectionPoolObjectFactory(mqConnectionPoolConfig));
            isInit.compareAndSet(false, true);
        }
    }

    private static void sendMsg(Connection connection, MqConfig mqConfig, String str, Integer num, final BiConsumer<Channel, SendAction> biConsumer) throws IOException {
        String changeName = mqConfig.getChangeName();
        String routingKey = mqConfig.getRoutingKey();
        Boolean durable = mqConfig.getDurable();
        Boolean isDelay = mqConfig.getIsDelay();
        ExchangeType exchangeType = mqConfig.getExchangeType();
        Long delayTime = mqConfig.getDelayTime();
        Boolean isAutoClose = mqConfig.getIsAutoClose();
        String queueName = mqConfig.getQueueName();
        DeadConfig deadConfig = mqConfig.getDeadConfig();
        if (exchangeType == null) {
            exchangeType = ExchangeType.direct;
        }
        if (durable == null) {
            durable = true;
        }
        if (isAutoClose == null) {
            isAutoClose = false;
        }
        if (StrUtil.isBlankOrUndefined(routingKey)) {
            routingKey = "#";
        }
        if (isDelay == null) {
            isDelay = false;
        } else {
            if (isDelay.booleanValue() && delayTime == null) {
                delayTime = 1000L;
            }
            if (isDelay.booleanValue() && StrUtil.isBlankOrUndefined(changeName)) {
                throw new NullPointerException("延迟队列, 交换机不能为空");
            }
        }
        final Channel sendChannel = getSendChannel(connection, MqConfig.of().title(mqConfig.getTitle()).changeName(changeName).queueName(mqConfig.getQueueName()).routingKey(routingKey).durable(durable).ttl(mqConfig.getTtl()).max(mqConfig.getMax()).exchangeType(exchangeType).isAutoClose(isAutoClose).isDelay(isDelay).delayTime(delayTime).deadConfig(deadConfig).build());
        ConfirmListener confirmListener = new ConfirmListener() { // from class: com.xunmo.utils.MqHelper.2
            public void handleAck(long j, boolean z) throws IOException {
                MqHelper.log.debug("已收到消息，标识：{}", Long.valueOf(j));
                biConsumer.accept(sendChannel, SendAction.SUCCESS);
            }

            public void handleNack(long j, boolean z) throws IOException {
                MqHelper.log.warn("未确认消息，标识：{}", Long.valueOf(j));
                biConsumer.accept(sendChannel, SendAction.MQ_FAIL);
            }
        };
        sendChannel.clearConfirmListeners();
        sendChannel.addConfirmListener(confirmListener);
        try {
            try {
                AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
                if (num != null) {
                    builder.expiration(num.toString());
                }
                builder.deliveryMode(2);
                String str2 = routingKey;
                if (isDelay.booleanValue()) {
                    sendChannel.queueBind(queueName, changeName, routingKey);
                    HashMap hashMap = new HashMap();
                    hashMap.put("x-delay", delayTime);
                    builder.headers(hashMap);
                    tryDo(() -> {
                        sendChannel.basicPublish(changeName, str2, builder.build(), str.getBytes(StandardCharsets.UTF_8));
                    }, () -> {
                        return "发送消息异常";
                    });
                } else if (StrUtil.isNotBlank(changeName)) {
                    tryDo(() -> {
                        sendChannel.basicPublish(changeName, str2, builder.build(), str.getBytes(StandardCharsets.UTF_8));
                    }, () -> {
                        return "发送消息异常";
                    });
                } else {
                    builder.priority((Integer) null);
                    tryDo(() -> {
                        sendChannel.basicPublish(str2, queueName, builder.build(), str.getBytes(StandardCharsets.UTF_8));
                    }, () -> {
                        return "发送消息异常";
                    });
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } finally {
            if (sendChannel.isOpen()) {
                channelUseCountMap.put(sendChannel, Integer.valueOf(channelUseCountMap.computeIfAbsent(sendChannel, channel -> {
                    return 0;
                }).intValue() + 1));
            }
            if (isAutoClose.booleanValue()) {
                closeChannelAndConnection(sendChannel, connection);
            } else if (connection != null) {
                closeConnection(connection);
            }
            countChannel(sendChannelCount, sendChannel);
        }
    }

    private static void consumeMsg(Connection connection, MqConfig mqConfig, final BiFunction<Channel, String, ConsumeAction> biFunction) throws IOException {
        ExchangeType exchangeType = mqConfig.getExchangeType();
        Boolean durable = mqConfig.getDurable();
        String routingKey = mqConfig.getRoutingKey();
        Boolean isAutoClose = mqConfig.getIsAutoClose();
        String changeName = mqConfig.getChangeName();
        String queueName = mqConfig.getQueueName();
        DeadConfig deadConfig = mqConfig.getDeadConfig();
        if (exchangeType == null) {
            exchangeType = ExchangeType.direct;
        }
        if (durable == null) {
            durable = true;
        }
        if (isAutoClose == null) {
            isAutoClose = false;
        }
        if (StrUtil.isBlankOrUndefined(routingKey)) {
            routingKey = "#";
        }
        final Channel consumeChannel = getConsumeChannel(connection, MqConfig.of().title(mqConfig.getTitle()).changeName(changeName).queueName(mqConfig.getQueueName()).routingKey(routingKey).durable(durable).ttl(mqConfig.getTtl()).max(mqConfig.getMax()).exchangeType(exchangeType).isAutoClose(isAutoClose).deadConfig(deadConfig).build());
        try {
            try {
                DefaultConsumer defaultConsumer = new DefaultConsumer(consumeChannel) { // from class: com.xunmo.utils.MqHelper.3
                    public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
                        long deliveryTag = envelope.getDeliveryTag();
                        ConsumeAction consumeAction = ConsumeAction.RETRY;
                        try {
                            ConsumeAction consumeAction2 = (ConsumeAction) biFunction.apply(consumeChannel, new String(bArr, StandardCharsets.UTF_8));
                            if (consumeAction2 == ConsumeAction.ACCEPT) {
                                consumeChannel.basicAck(deliveryTag, false);
                            } else {
                                consumeChannel.basicNack(deliveryTag, false, consumeAction2 == ConsumeAction.RETRY);
                            }
                        } catch (Exception e) {
                            MqHelper.log.error("异常: {}", ExceptionUtil.stacktraceToString(e));
                            throw new RuntimeException(e);
                        }
                    }
                };
                tryDo(() -> {
                    consumeChannel.basicConsume(queueName, false, defaultConsumer);
                }, () -> {
                    return "消费回信应答失败";
                });
                if (consumeChannel.isOpen()) {
                    channelUseCountMap.put(consumeChannel, Integer.valueOf(channelUseCountMap.computeIfAbsent(consumeChannel, channel -> {
                        return 0;
                    }).intValue() + 1));
                }
                if (isAutoClose.booleanValue()) {
                    closeChannelAndConnection(consumeChannel, connection);
                } else if (connection != null) {
                    closeConnection(connection);
                }
                countChannel(consumerChannelCount, consumeChannel);
            } catch (Exception e) {
                log.error("异常: {}", ExceptionUtil.stacktraceToString(e));
                throw e;
            }
        } catch (Throwable th) {
            if (consumeChannel.isOpen()) {
                channelUseCountMap.put(consumeChannel, Integer.valueOf(channelUseCountMap.computeIfAbsent(consumeChannel, channel2 -> {
                    return 0;
                }).intValue() + 1));
            }
            if (isAutoClose.booleanValue()) {
                closeChannelAndConnection(consumeChannel, connection);
            } else if (connection != null) {
                closeConnection(connection);
            }
            countChannel(consumerChannelCount, consumeChannel);
            throw th;
        }
    }

    private static Channel getSendChannel(Connection connection, MqConfig mqConfig) throws IOException {
        Channel initSendChannel;
        if (channelMap.containsKey(mqConfig)) {
            initSendChannel = getActiveChannel(channelMap.get(mqConfig));
            if (initSendChannel == null) {
                initSendChannel = initSendChannel(connection, mqConfig);
            } else if (channelUseCountMap.containsKey(initSendChannel) && channelUseCountMap.get(initSendChannel).intValue() > channelUseMaxCount) {
                String title = mqConfig.getTitle();
                if (initSendChannel.isOpen()) {
                    log.warn("{} send channel {} 超出使用次数限制, 重新制作新的 channel", title, Integer.valueOf(initSendChannel.getChannelNumber()));
                } else {
                    log.warn("{} send channel 超出使用次数限制, 重新制作新的 channel", title);
                }
                initSendChannel = initSendChannel(connection, mqConfig);
                channelUseCountMap.remove(initSendChannel);
                stayCloseList.add(initSendChannel);
            }
        } else {
            initSendChannel = initSendChannel(connection, mqConfig);
        }
        return initSendChannel;
    }

    private static Channel getConsumeChannel(Connection connection, MqConfig mqConfig) throws IOException {
        Channel initConsumeChannel;
        if (channelMap.containsKey(mqConfig)) {
            initConsumeChannel = getActiveChannel(channelMap.get(mqConfig));
            if (initConsumeChannel == null) {
                initConsumeChannel = initConsumeChannel(connection, mqConfig);
            } else if (channelUseCountMap.containsKey(initConsumeChannel) && channelUseCountMap.get(initConsumeChannel).intValue() > channelUseMaxCount) {
                String title = mqConfig.getTitle();
                if (initConsumeChannel.isOpen()) {
                    log.warn("{} consume channel {} 超出使用次数限制, 重新制作新的 channel", title, Integer.valueOf(initConsumeChannel.getChannelNumber()));
                } else {
                    log.warn("{} consume channel 超出使用次数限制, 重新制作新的 channel", title);
                }
                initConsumeChannel = initConsumeChannel(connection, mqConfig);
                channelUseCountMap.remove(initConsumeChannel);
                stayCloseList.add(initConsumeChannel);
            }
        } else {
            initConsumeChannel = initConsumeChannel(connection, mqConfig);
        }
        return initConsumeChannel;
    }

    private static Channel getActiveChannel(List<Channel> list) {
        Iterator<Channel> it = list.iterator();
        while (it.hasNext()) {
            Channel next = it.next();
            if (next.isOpen()) {
                return next;
            }
            it.remove();
        }
        return null;
    }

    private static Channel initSendChannel(Connection connection, MqConfig mqConfig) throws IOException {
        String changeName = mqConfig.getChangeName();
        String queueName = mqConfig.getQueueName();
        String routingKey = mqConfig.getRoutingKey();
        Boolean durable = mqConfig.getDurable();
        Boolean isDelay = mqConfig.getIsDelay();
        ExchangeType exchangeType = mqConfig.getExchangeType();
        Long ttl = mqConfig.getTtl();
        Long max = mqConfig.getMax();
        DeadConfig deadConfig = mqConfig.getDeadConfig();
        Channel channel = (Channel) tryReturn(() -> {
            try {
                return connection.createChannel();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }, () -> {
            return "获取 channel 异常";
        });
        ArrayList arrayList = new ArrayList();
        arrayList.add(channel);
        channelMap.put(mqConfig, arrayList);
        sendExistsChannelNames.add(channel.toString());
        sendChannelCount.incrementAndGet();
        HashMap hashMap = new HashMap();
        if (max != null) {
            hashMap.put("x-max-length", max);
        }
        if (ttl != null) {
            hashMap.put("x-message-ttl", ttl);
        }
        if (deadConfig != null) {
            String changeName2 = deadConfig.getChangeName();
            String routingKey2 = deadConfig.getRoutingKey();
            if (StrUtil.isBlankOrUndefined(routingKey2)) {
                routingKey2 = "#";
            }
            hashMap.put("x-dead-letter-exchange", changeName2);
            hashMap.put("x-dead-letter-routing-key", routingKey2);
        }
        if (isDelay.booleanValue()) {
            hashMap.put("x-delayed-type", exchangeType.name());
            channel.exchangeDeclare(changeName, "x-delayed-message", durable.booleanValue(), false, hashMap);
        } else if (StrUtil.isNotBlank(changeName)) {
            channel.exchangeDeclare(changeName, exchangeType.name(), durable.booleanValue(), false, hashMap);
            channel.queueBind(queueName, changeName, routingKey);
        }
        tryDo(() -> {
            channel.queueDeclare(queueName, durable.booleanValue(), false, false, hashMap);
        }, () -> {
            return "获取队列异常";
        });
        channel.confirmSelect();
        return channel;
    }

    private static Channel initConsumeChannel(Connection connection, MqConfig mqConfig) throws IOException {
        String changeName = mqConfig.getChangeName();
        String queueName = mqConfig.getQueueName();
        String routingKey = mqConfig.getRoutingKey();
        Boolean durable = mqConfig.getDurable();
        mqConfig.getIsDelay();
        ExchangeType exchangeType = mqConfig.getExchangeType();
        mqConfig.getTtl();
        mqConfig.getMax();
        DeadConfig deadConfig = mqConfig.getDeadConfig();
        Channel channel = (Channel) tryReturn(() -> {
            try {
                return connection.createChannel();
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }, () -> {
            return "获取 channel 异常";
        });
        ArrayList arrayList = new ArrayList();
        arrayList.add(channel);
        channelMap.put(mqConfig, arrayList);
        consumerExistsChannelNames.add(channel.toString());
        consumerChannelCount.incrementAndGet();
        HashMap hashMap = new HashMap();
        if (deadConfig != null) {
            String changeName2 = deadConfig.getChangeName();
            String queueName2 = deadConfig.getQueueName();
            String routingKey2 = deadConfig.getRoutingKey();
            if (StrUtil.isBlankOrUndefined(routingKey2)) {
                routingKey2 = "#";
            }
            tryDo(() -> {
                channel.queueDeclare(queueName2, durable.booleanValue(), false, false, hashMap);
            }, () -> {
                return "获取队列异常";
            });
            hashMap.put("x-dead-letter-exchange", changeName2);
            hashMap.put("x-dead-letter-routing-key", routingKey2);
            if (!StrUtil.isBlankOrUndefined(changeName2)) {
                tryDo(() -> {
                    channel.exchangeDeclare(changeName2, exchangeType.name(), durable.booleanValue());
                }, () -> {
                    return "创建死信交换机失败";
                });
                String str = routingKey2;
                tryDo(() -> {
                    channel.queueBind(queueName2, changeName2, str);
                }, () -> {
                    return "绑定死信交换机和死信队列失败";
                });
            }
        }
        tryDo(() -> {
            channel.queueDeclare(queueName, durable.booleanValue(), false, false, hashMap);
        }, () -> {
            return "获取队列失败";
        });
        if (StrUtil.isNotBlank(changeName)) {
            tryDo(() -> {
                channel.exchangeDeclare(changeName, exchangeType.name(), durable.booleanValue());
            }, () -> {
                return "创建交换机失败";
            });
            tryDo(() -> {
                channel.queueBind(queueName, changeName, routingKey);
            }, () -> {
                return "绑定交换机和队列失败";
            });
        }
        channel.basicQos(0, 1, false);
        return channel;
    }

    private static void tryDo(TryDoing tryDoing, Supplier<String> supplier) {
        int i = tryCountMax;
        do {
            try {
                tryDoing.tryDo();
                return;
            } catch (IOException e) {
                log.error("{}, 重试第{}次获取!", supplier.get(), Integer.valueOf(tryCountMax - i));
                i--;
            }
        } while (i > 0);
    }

    private static <T> T tryReturn(Supplier<T> supplier, Supplier<String> supplier2) throws IOException {
        String str = supplier2.get();
        int i = tryCountMax;
        do {
            try {
                return supplier.get();
            } catch (Exception e) {
                log.error("{}, 重试第{}次获取!", str, Integer.valueOf(tryCountMax - i));
                i--;
            }
        } while (i > 0);
        throw new IOException(StrUtil.format("无法继续执行, 因为{} 错误!", new Object[]{str}));
    }

    private static void countChannel(AtomicInteger atomicInteger, Channel channel) {
        ThreadUtil.execute(() -> {
            for (int i = tryCountMax; i > 0; i--) {
                if (channel == null || !channel.isOpen()) {
                    atomicInteger.decrementAndGet();
                    return;
                }
                try {
                    TimeUnit.SECONDS.sleep(1L);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        });
    }

    public static void sendMsg(MqConfig mqConfig, String str, BiConsumer<Channel, SendAction> biConsumer) throws IOException {
        sendMsg(getMqPoolConnection(), mqConfig, str, null, biConsumer);
    }

    public static void consumeMsg(MqConfig mqConfig, BiFunction<Channel, String, ConsumeAction> biFunction) throws IOException {
        consumeMsg(getMqPoolConnection(), mqConfig, biFunction);
    }

    private static Connection getMqPoolConnection() {
        return mqConnectionPool.getConnection();
    }

    public static int getMessageCount(String str) throws IOException, TimeoutException {
        return getMessageCount(getMqPoolConnection(), str);
    }

    public static int getMessageCount(Connection connection, String str) throws IOException, TimeoutException {
        try {
            Channel createChannel = connection.createChannel();
            Throwable th = null;
            try {
                createChannel.queueDeclare(str, true, false, false, (Map) null);
                int messageCount = (int) createChannel.messageCount(str);
                if (createChannel != null) {
                    if (0 != 0) {
                        try {
                            createChannel.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createChannel.close();
                    }
                }
                return messageCount;
            } finally {
            }
        } finally {
            closeConnection(connection);
        }
    }

    private static void closeChannelAndConnection(Channel channel, Connection connection) {
        String str = null;
        if (channel != null) {
            str = channel.toString();
        }
        while (true) {
            if (channel != null && channel.isOpen()) {
                break;
            }
            try {
                channel.close();
                try {
                    TimeUnit.SECONDS.sleep(1L);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            } catch (IOException | TimeoutException e2) {
                throw new RuntimeException(e2);
            }
        }
        if (str != null) {
            closeChannelNames.add(str);
        }
        log.trace("关闭channel");
        if (connection != null) {
            closeConnection(connection);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void closeChannel(Channel channel) {
        String str = null;
        if (channel != null) {
            str = channel.toString();
        }
        while (true) {
            if (channel != null && channel.isOpen()) {
                break;
            }
            try {
                channel.close();
                try {
                    TimeUnit.SECONDS.sleep(1L);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            } catch (IOException | TimeoutException e2) {
                throw new RuntimeException(e2);
            }
        }
        if (str != null) {
            closeChannelNames.add(str);
        }
        log.trace("关闭channel");
    }

    private static void closeConnection(Connection connection) {
        if (connection != null) {
            closeConnec(connection);
        }
    }

    private static void closeConnec(Connection connection) {
        mqConnectionPool.returnConnection(connection);
    }

    public static AtomicInteger getConsumerChannelCount() {
        return consumerChannelCount;
    }

    public static List<String> getCloseChannelNames() {
        return closeChannelNames;
    }

    public static List<String> getConsumerExistsChannelNames() {
        return consumerExistsChannelNames;
    }

    public static List<String> getSendExistsChannelNames() {
        return sendExistsChannelNames;
    }

    public static Map<MqConfig, List<Channel>> getChannelMap() {
        return channelMap;
    }

    static {
        new Timer().schedule(new TimerTask() { // from class: com.xunmo.utils.MqHelper.1
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                if (CollUtil.isNotEmpty(MqHelper.stayCloseList)) {
                    while (MqHelper.stayCloseList.size() > 2) {
                        MqHelper.closeChannel((Channel) MqHelper.stayCloseList.poll());
                    }
                }
            }
        }, 1000L, 10000L);
    }
}
