package kz.greetgo.kafka.core;

import com.esotericsoftware.kryo.Kryo;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import kz.greetgo.kafka.consumer.ConsumerDefinition;
import kz.greetgo.kafka.consumer.ConsumerReactor;
import kz.greetgo.kafka.errors.NotDefined;
import kz.greetgo.kafka.model.Box;
import kz.greetgo.kafka.producer.ProducerConfigWorker;
import kz.greetgo.kafka.producer.ProducerSource;
import kz.greetgo.kafka.serializer.BoxSerializer;
import kz.greetgo.kafka.util.KeyUtil;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.ByteArraySerializer;

/* loaded from: input_file:kz/greetgo/kafka/core/KafkaReactorImpl.class */
public class KafkaReactorImpl extends KafkaReactorAbstract {
    private final List<ConsumerReactor> consumerReactorList = new ArrayList();
    private final ProducerSource producerSource = new ProducerSource() { // from class: kz.greetgo.kafka.core.KafkaReactorImpl.1
        private final ProducerConfigWorker producerConfigWorker = new ProducerConfigWorker(() -> {
            return KafkaReactorImpl.this.producerConfigRootPath;
        }, () -> {
            return KafkaReactorImpl.this.configStorage;
        });

        @Override // kz.greetgo.kafka.producer.ProducerSource
        public Kryo getKryo() {
            return KafkaReactorImpl.this.kryo;
        }

        @Override // kz.greetgo.kafka.producer.ProducerSource
        public byte[] extractKey(Object obj) {
            return KeyUtil.extractKey(obj);
        }

        @Override // kz.greetgo.kafka.producer.ProducerSource
        public String author() {
            if (KafkaReactorImpl.this.authorGetter == null) {
                return null;
            }
            return KafkaReactorImpl.this.authorGetter.get();
        }

        @Override // kz.greetgo.kafka.producer.ProducerSource
        public Producer<byte[], Box> createProducer(String str, ByteArraySerializer byteArraySerializer, BoxSerializer boxSerializer) {
            Map<String, Object> configFor = this.producerConfigWorker.getConfigFor(str);
            configFor.put("bootstrap.servers", KafkaReactorImpl.this.bootstrapServers.get());
            return new KafkaProducer(configFor, byteArraySerializer, boxSerializer);
        }
    };

    @Override // kz.greetgo.kafka.core.KafkaReactor
    public void startConsumers() {
        verifyControllerList();
        List<ConsumerDefinition> accumulateConsumerDefinitionList = accumulateConsumerDefinitionList();
        if (this.configStorage == null) {
            throw new NotDefined("configStorage in " + KafkaReactor.class.getSimpleName() + ".start()");
        }
        if (this.bootstrapServers == null) {
            throw new NotDefined("bootstrapServers in " + KafkaReactor.class.getSimpleName() + ".start()");
        }
        for (ConsumerDefinition consumerDefinition : accumulateConsumerDefinitionList) {
            ConsumerReactor consumerReactor = new ConsumerReactor();
            this.consumerReactorList.add(consumerReactor);
            consumerReactor.kryo = this.kryo;
            consumerReactor.bootstrapServers = this.bootstrapServers;
            consumerReactor.configStorage = this.configStorage;
            consumerReactor.consumerDefinition = consumerDefinition;
            consumerReactor.consumerLogger = this.consumerLogger;
            consumerReactor.start();
        }
    }

    @Override // kz.greetgo.kafka.core.KafkaReactor
    public void stopConsumers() {
        this.consumerReactorList.forEach((v0) -> {
            v0.stop();
        });
        this.consumerReactorList.clear();
    }

    @Override // kz.greetgo.kafka.core.KafkaReactorAbstract
    public ProducerSource getProducerSource() {
        return this.producerSource;
    }
}
