package com.butor.notif.kafka;

import com.butor.kafka.KafkaConsumer;
import com.butor.kafka.KafkaMessageListener;
import com.butor.kafka.KafkaProducer;
import com.butor.notif.AbstractNotifRelay;
import com.google.api.client.util.Strings;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.eventbus.EventBus;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import kafka.message.MessageAndOffset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;

/* loaded from: input_file:com/butor/notif/kafka/KafkaTopicNotifRelay.class */
public class KafkaTopicNotifRelay<K, V> extends AbstractNotifRelay implements InitializingBean, KafkaMessageListener {
    private final Logger logger;
    private KafkaConsumer consumer;
    private KafkaProducer<Void, String> producer;
    private ExecutorService executorService;

    public KafkaTopicNotifRelay(EventBus eventBus) {
        super(eventBus);
        this.logger = LoggerFactory.getLogger(getClass());
    }

    public void postMessage(String str) {
        if (Strings.isNullOrEmpty(str)) {
            return;
        }
        this.producer.send((Object) null, str);
    }

    public void afterPropertiesSet() throws Exception {
        Preconditions.checkNotNull(this.executorService, "Executor Service is mandatory");
        Preconditions.checkNotNull(this.consumer, "Consumer is mandatory");
        Preconditions.checkNotNull(this.producer, "Producer is mandatory");
        this.consumer.checkIfAllPropertiesSet();
        this.consumer.setMessageListener(this);
        this.producer.init();
        this.executorService.submit((Runnable) this.consumer);
        this.logger.info("Kafka Notif relay ready to start!");
    }

    public void processInboundMessage(String str) {
        super.processInboundMessage(str);
    }

    public void onMessage(MessageAndOffset messageAndOffset) {
        ByteBuffer payload = messageAndOffset.message().payload();
        byte[] bArr = new byte[payload.limit()];
        payload.get(bArr);
        processInboundMessage(new String(bArr, Charsets.UTF_8));
    }

    public void setConsumer(KafkaConsumer kafkaConsumer) {
        this.consumer = kafkaConsumer;
    }

    public void setProducer(KafkaProducer<Void, String> kafkaProducer) {
        this.producer = kafkaProducer;
    }

    public void setExecutorService(ExecutorService executorService) {
        this.executorService = executorService;
    }
}
