package com.bq.corbel.eworker.internal;

import com.bq.corbel.evci.converter.DomainObjectJsonMessageConverterFactory;
import com.bq.corbel.evci.eworker.Eworker;
import com.bq.corbel.evci.eworker.EworkerRegistry;
import com.bq.corbel.evci.service.EvciMQ;
import com.bq.corbel.lib.rabbitmq.config.AmqpConfigurer;
import com.bq.corbel.lib.rabbitmq.config.BackoffOptions;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;

/* loaded from: input_file:com/bq/corbel/eworker/internal/AmqpEworkerRegistry.class */
public class AmqpEworkerRegistry implements EworkerRegistry {
    private static final Logger LOG = LoggerFactory.getLogger(AmqpEworkerRegistry.class);
    private final DomainObjectJsonMessageConverterFactory converterFactory;
    private final AmqpConfigurer configurer;
    private final BackoffOptions backoffOptions;
    private final int maxAttempts;
    private final UnaryOperator<String> routingPatternFunction;

    public AmqpEworkerRegistry(DomainObjectJsonMessageConverterFactory domainObjectJsonMessageConverterFactory, AmqpConfigurer amqpConfigurer, BackoffOptions backoffOptions, int i, UnaryOperator<String> unaryOperator) {
        this.converterFactory = domainObjectJsonMessageConverterFactory;
        this.configurer = amqpConfigurer;
        this.backoffOptions = backoffOptions;
        this.maxAttempts = i;
        this.routingPatternFunction = unaryOperator;
    }

    public <E> void registerEworker(Eworker<E> eworker, String str, String str2, boolean z, int i) {
        LOG.error("threads number in evci: " + i);
        Type type = ((ParameterizedType) eworker.getClass().getGenericInterfaces()[0]).getActualTypeArguments()[0];
        String str3 = "evci.eworker." + str2 + ".queue";
        String str4 = "evci.eworker." + str2 + ".dlq";
        this.configurer.bind(EvciMQ.EVENTS_EXCHANGE, this.configurer.queue(str3, new Function[]{this.configurer.setDeadLetterExchange(EvciMQ.EVENTS_DEAD_LETTER_EXCHANGE)}), Optional.of(this.routingPatternFunction.apply(str)), Optional.empty());
        this.configurer.bind(EvciMQ.EVENTS_DEAD_LETTER_EXCHANGE, this.configurer.queue(str4), Optional.of(this.routingPatternFunction.apply(str)), Optional.empty());
        MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(eworker, this.converterFactory.createConverter(type));
        LOG.info("Registering eworker " + str3 + " with " + i + " threads");
        SimpleMessageListenerContainer listenerContainer = this.configurer.listenerContainer(Executors.newFixedThreadPool(i), this.configurer.setRetryOpertations(Optional.of(Integer.valueOf(this.maxAttempts)), Optional.ofNullable(this.backoffOptions)), new String[]{str3});
        listenerContainer.setMessageListener(messageListenerAdapter);
        if (z) {
            MessageListenerAdapter messageListenerAdapter2 = new MessageListenerAdapter(eworker, this.converterFactory.createConverter(type));
            messageListenerAdapter2.setDefaultListenerMethod("handleFailedMessage");
            SimpleMessageListenerContainer listenerContainer2 = this.configurer.listenerContainer(Executors.newFixedThreadPool(i), new String[]{str4});
            listenerContainer2.setMessageListener(messageListenerAdapter2);
            listenerContainer2.start();
        }
        listenerContainer.start();
    }
}
