package kz.greetgo.kafka.core;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import kz.greetgo.kafka.consumer.ConsumerConfigDefaults;
import kz.greetgo.kafka.consumer.ConsumerDefinition;
import kz.greetgo.kafka.consumer.ConsumerReactor;
import kz.greetgo.kafka.consumer.ConsumerReactorImpl;
import kz.greetgo.kafka.core.logger.Logger;
import kz.greetgo.kafka.core.logger.LoggerType;
import kz.greetgo.kafka.errors.NotDefined;
import kz.greetgo.kafka.model.Box;
import kz.greetgo.kafka.producer.ProducerConfigWorker;
import kz.greetgo.kafka.producer.ProducerSource;
import kz.greetgo.kafka.serializer.BoxSerializer;
import kz.greetgo.kafka.util.ConfigLines;
import kz.greetgo.kafka.util.KeyUtil;
import kz.greetgo.strconverter.StrConverter;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.ByteArraySerializer;

/* loaded from: input_file:kz/greetgo/kafka/core/KafkaReactorImpl.class */
public class KafkaReactorImpl extends KafkaReactorAbstract {
    private final List<ConsumerReactorImpl> consumerReactorList = new ArrayList();
    public ConsumerConfigDefaults consumerConfigDefaults = ConsumerConfigDefaults.withDefaults();
    private final ProducerConfigWorker producerConfigWorker = new ProducerConfigWorker(() -> {
        return this.producerConfigStorage;
    }, this::putProducerDefaultValues);
    private final ProducerSource producerSource = new ProducerSource() { // from class: kz.greetgo.kafka.core.KafkaReactorImpl.1
        @Override // kz.greetgo.kafka.producer.ProducerSource
        public Logger logger() {
            return KafkaReactorImpl.this.logger;
        }

        @Override // kz.greetgo.kafka.producer.ProducerSource
        public StrConverter getStrConverter() {
            return KafkaReactorImpl.this.strConverterSupplier().get();
        }

        @Override // kz.greetgo.kafka.producer.ProducerSource
        public byte[] extractKey(Object obj) {
            return KeyUtil.extractKey(obj);
        }

        @Override // kz.greetgo.kafka.producer.ProducerSource
        public String author() {
            if (KafkaReactorImpl.this.authorGetter == null) {
                return null;
            }
            return KafkaReactorImpl.this.authorGetter.get();
        }

        @Override // kz.greetgo.kafka.producer.ProducerSource
        public long getProducerConfigUpdateTimestamp(String str) {
            return KafkaReactorImpl.this.producerConfigWorker.getConfigUpdateTimestamp(str);
        }

        @Override // kz.greetgo.kafka.producer.ProducerSource
        public Map<String, Object> getConfigFor(String str) {
            return KafkaReactorImpl.this.producerConfigWorker.getConfigFor(str);
        }

        @Override // kz.greetgo.kafka.producer.ProducerSource
        public Producer<byte[], Box> createProducer(String str, ByteArraySerializer byteArraySerializer, BoxSerializer boxSerializer) {
            Map<String, Object> configFor = getConfigFor(str);
            configFor.put("bootstrap.servers", KafkaReactorImpl.this.bootstrapServers.get());
            if (KafkaReactorImpl.this.logger.isShow(LoggerType.SHOW_PRODUCER_CONFIG)) {
                KafkaReactorImpl.this.logger.logProducerConfigOnCreating(str, configFor);
            }
            return new KafkaProducer(configFor, byteArraySerializer, boxSerializer);
        }
    };

    @Override // kz.greetgo.kafka.core.KafkaReactor
    public void startConsumers() {
        verifyControllerList();
        List<ConsumerDefinition> accumulateConsumerDefinitionList = accumulateConsumerDefinitionList();
        if (this.consumerConfigStorage == null) {
            throw new NotDefined("consumerConfigStorage in " + getClass().getSimpleName() + ".startConsumers()");
        }
        if (this.bootstrapServers == null) {
            throw new NotDefined("bootstrapServers in " + getClass().getSimpleName() + ".startConsumers()");
        }
        for (ConsumerDefinition consumerDefinition : accumulateConsumerDefinitionList) {
            ConsumerReactorImpl consumerReactorImpl = new ConsumerReactorImpl();
            this.consumerReactorList.add(consumerReactorImpl);
            consumerReactorImpl.logger = this.logger;
            consumerReactorImpl.strConverterSupplier = strConverterSupplier();
            consumerReactorImpl.bootstrapServers = this.bootstrapServers;
            consumerReactorImpl.configStorage = this.consumerConfigStorage;
            consumerReactorImpl.consumerDefinition = consumerDefinition;
            consumerReactorImpl.producerSource = getProducerSource();
            consumerReactorImpl.hostId = this.hostId;
            consumerReactorImpl.consumerConfigDefaults = () -> {
                return this.consumerConfigDefaults;
            };
            consumerReactorImpl.start();
        }
    }

    @Override // kz.greetgo.kafka.core.KafkaReactorAbstract, kz.greetgo.kafka.core.KafkaReactor
    public Optional<ConsumerReactor> consumer(String str) {
        for (ConsumerReactorImpl consumerReactorImpl : this.consumerReactorList) {
            if (consumerReactorImpl.consumerDefinition.getConsumerName().equals(str)) {
                return Optional.of(consumerReactorImpl);
            }
        }
        return Optional.empty();
    }

    @Override // kz.greetgo.kafka.core.KafkaReactor
    public void stopConsumers() {
        this.consumerReactorList.forEach((v0) -> {
            v0.stop();
        });
        this.producerConfigWorker.close();
    }

    public void joinToConsumers() {
        this.consumerReactorList.forEach((v0) -> {
            v0.join();
        });
    }

    protected void putProducerDefaultValues(ConfigLines configLines) {
        configLines.putValue("prod.acts                    ", "all");
        configLines.putValue("prod.buffer.memory           ", "33554432");
        configLines.putValue("prod.compression.type        ", "none");
        configLines.putValue("prod.batch.size              ", "16384");
        configLines.putValue("prod.connections.max.idle.ms ", "540000");
        configLines.putValue("prod.request.timeout.ms      ", "30000");
        configLines.putValue("prod.linger.ms               ", "1");
        configLines.putValue("prod.batch.size              ", "16384");
        configLines.putValue("prod.retries                               ", "2147483647");
        configLines.putValue("prod.max.in.flight.requests.per.connection ", "1");
        configLines.putValue("prod.delivery.timeout.ms                   ", "35000");
    }

    @Override // kz.greetgo.kafka.core.KafkaReactorAbstract
    public ProducerSource getProducerSource() {
        return this.producerSource;
    }
}
