package se.hiq.oss.spring.nats.consumer;

import io.nats.client.Connection;
import io.nats.client.Message;
import io.nats.client.MessageHandler;
import java.lang.reflect.Method;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.SmartLifecycle;
import se.hiq.oss.spring.nats.message.serde.NatsMessageSerDeFactory;
import se.hiq.oss.spring.nats.metrics.NatsMetricsRegistry;

/* loaded from: input_file:se/hiq/oss/spring/nats/consumer/NatsConsumerManagerImpl.class */
public class NatsConsumerManagerImpl implements SmartLifecycle, NatsConsumerManager {
    private static final String REGISTERING_MESSAGE = "Registering dispatcher %s";
    private static final Logger LOG = LoggerFactory.getLogger(NatsConsumerManagerImpl.class);
    private Connection natsConnection;
    private NatsMessageSerDeFactory serDeFactory;
    private Optional<NatsMetricsRegistry> natsMetricsRegistry;
    private Set<MessageConsumer> subscriptions;
    private volatile boolean running;
    private boolean autoStartup;
    private long drainTimeoutSeconds;

    public NatsConsumerManagerImpl(Connection connection, NatsMessageSerDeFactory natsMessageSerDeFactory, long j, boolean z) {
        this.natsMetricsRegistry = Optional.empty();
        this.subscriptions = new CopyOnWriteArraySet();
        this.natsConnection = connection;
        this.serDeFactory = natsMessageSerDeFactory;
        this.autoStartup = z;
        this.drainTimeoutSeconds = j;
    }

    public NatsConsumerManagerImpl(Connection connection, NatsMessageSerDeFactory natsMessageSerDeFactory, Optional<NatsMetricsRegistry> optional, long j, boolean z) {
        this.natsMetricsRegistry = Optional.empty();
        this.subscriptions = new CopyOnWriteArraySet();
        this.natsConnection = connection;
        this.serDeFactory = natsMessageSerDeFactory;
        this.natsMetricsRegistry = optional;
        this.autoStartup = z;
        this.drainTimeoutSeconds = j;
    }

    public void setNatsMetricsRegistry(NatsMetricsRegistry natsMetricsRegistry) {
        this.natsMetricsRegistry = Optional.ofNullable(natsMetricsRegistry);
    }

    @Override // se.hiq.oss.spring.nats.consumer.NatsConsumerManager
    public <T> SmartLifecycle register(Consumer<T> consumer, Class<T> cls, String str) {
        return register(new ConsumerMessageHandler(consumer, cls, this.serDeFactory), str, "");
    }

    @Override // se.hiq.oss.spring.nats.consumer.NatsConsumerManager
    public <T> SmartLifecycle register(Consumer<T> consumer, Class<T> cls, String str, String str2) {
        return register(new ConsumerMessageHandler(consumer, cls, this.serDeFactory), str, str2);
    }

    @Override // se.hiq.oss.spring.nats.consumer.NatsConsumerManager
    public <T> SmartLifecycle register(BiConsumer<T, ? super Message> biConsumer, Class<T> cls, String str) {
        return register(new BiConsumerMessageHandler(biConsumer, cls, this.serDeFactory), str, "");
    }

    @Override // se.hiq.oss.spring.nats.consumer.NatsConsumerManager
    public <T> SmartLifecycle register(BiConsumer<T, ? super Message> biConsumer, Class<T> cls, String str, String str2) {
        return register(new BiConsumerMessageHandler(biConsumer, cls, this.serDeFactory), str, str2);
    }

    private SmartLifecycle register(MessageHandler messageHandler, String str, String str2) {
        MessageConsumer messageConsumer = new MessageConsumer(this.natsConnection, str, str2, messageHandler, this.drainTimeoutSeconds, this.natsMetricsRegistry.orElse(null), true);
        register(messageConsumer);
        return messageConsumer;
    }

    @Override // se.hiq.oss.spring.nats.consumer.NatsConsumerManager
    public SmartLifecycle register(Object obj, Method method) {
        se.hiq.oss.spring.nats.annotation.Consumer consumer = (se.hiq.oss.spring.nats.annotation.Consumer) method.getAnnotation(se.hiq.oss.spring.nats.annotation.Consumer.class);
        if (consumer == null) {
            throw new IllegalArgumentException("Method " + method + " must be annotated with @" + se.hiq.oss.spring.nats.annotation.Consumer.class.getName() + " in order to be registered as a NATS consumer");
        }
        return register(obj, method, consumer.subject(), consumer.queueName());
    }

    @Override // se.hiq.oss.spring.nats.consumer.NatsConsumerManager
    public SmartLifecycle register(Object obj, Method method, String str) {
        return register(new MethodMessageHandler(this.serDeFactory, obj, method), str, "");
    }

    @Override // se.hiq.oss.spring.nats.consumer.NatsConsumerManager
    public SmartLifecycle register(Object obj, Method method, String str, String str2) {
        return register(new MethodMessageHandler(this.serDeFactory, obj, method), str, str2);
    }

    @Override // se.hiq.oss.spring.nats.consumer.NatsConsumerManager
    public void register(MessageConsumer messageConsumer) {
        this.subscriptions.add(messageConsumer);
        LOG.info(String.format(REGISTERING_MESSAGE, messageConsumer));
        if (isRunning() && messageConsumer.isAutoStartup()) {
            messageConsumer.start();
        }
    }

    public boolean isAutoStartup() {
        return this.autoStartup;
    }

    public void start() {
        this.subscriptions.stream().filter(messageConsumer -> {
            return messageConsumer.isAutoStartup();
        }).forEach(messageConsumer2 -> {
            messageConsumer2.start();
        });
        this.running = true;
    }

    public void stop() {
        this.subscriptions.stream().filter(messageConsumer -> {
            return messageConsumer.isRunning();
        }).forEach(messageConsumer2 -> {
            messageConsumer2.stop();
        });
        this.running = false;
    }

    public boolean isRunning() {
        return this.running;
    }
}
