package kz.greetgo.kafka.core;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import kz.greetgo.kafka.consumer.ConsumerDefinition;
import kz.greetgo.kafka.consumer.Invoker;
import kz.greetgo.kafka.core.logger.Logger;
import kz.greetgo.kafka.model.Box;
import kz.greetgo.kafka.model.BoxHolder;
import kz.greetgo.kafka.producer.ProducerFacadeBridge;
import kz.greetgo.kafka.producer.ProducerSource;
import kz.greetgo.kafka.serializer.BoxSerializer;
import kz.greetgo.kafka.util.BoxUtil;
import kz.greetgo.kafka.util.GenericUtil;
import kz.greetgo.kafka.util.KeyUtil;
import kz.greetgo.strconverter.StrConverter;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.ByteArraySerializer;

/* loaded from: input_file:kz/greetgo/kafka/core/KafkaSimulator.class */
public class KafkaSimulator extends KafkaReactorAbstract {
    private final ConcurrentHashMap<String, MockProducerHolder> producers = new ConcurrentHashMap<>();
    private final ProducerSource producerSource = new ProducerSource() { // from class: kz.greetgo.kafka.core.KafkaSimulator.1
        @Override // kz.greetgo.kafka.producer.ProducerSource
        public Logger logger() {
            return KafkaSimulator.this.logger;
        }

        @Override // kz.greetgo.kafka.producer.ProducerSource
        public StrConverter getStrConverter() {
            return KafkaSimulator.this.strConverterSupplier().get();
        }

        @Override // kz.greetgo.kafka.producer.ProducerSource
        public byte[] extractKey(Object obj) {
            return KeyUtil.extractKey(obj);
        }

        @Override // kz.greetgo.kafka.producer.ProducerSource
        public String author() {
            if (KafkaSimulator.this.authorGetter == null) {
                return null;
            }
            return KafkaSimulator.this.authorGetter.get();
        }

        @Override // kz.greetgo.kafka.producer.ProducerSource
        public long getProducerConfigUpdateTimestamp(String str) {
            return 0L;
        }

        @Override // kz.greetgo.kafka.producer.ProducerSource
        public Map<String, Object> getConfigFor(String str) {
            return new HashMap();
        }

        @Override // kz.greetgo.kafka.producer.ProducerSource
        public Producer<byte[], Box> createProducer(String str, ByteArraySerializer byteArraySerializer, BoxSerializer boxSerializer) {
            if (KafkaSimulator.this.producers.containsKey(str)) {
                throw new RuntimeException("Producer with name = " + str + " already created. Please select another name");
            }
            MockProducerHolder mockProducerHolder = new MockProducerHolder(str, byteArraySerializer, boxSerializer, KafkaSimulator.this.getCluster());
            KafkaSimulator.this.producers.put(mockProducerHolder.getProducerName(), mockProducerHolder);
            return mockProducerHolder.getProducer();
        }
    };
    private final List<ConsumerRecord<byte[], Box>> pushedRecords = Collections.synchronizedList(new ArrayList());
    private List<ConsumerDefinition> consumerDefinitionList;

    @Override // kz.greetgo.kafka.core.KafkaReactor
    public void stopConsumers() {
    }

    @Override // kz.greetgo.kafka.core.KafkaReactorAbstract
    protected ProducerSource getProducerSource() {
        return this.producerSource;
    }

    protected Cluster getCluster() {
        return Cluster.empty();
    }

    public void push() {
        for (MockProducerHolder mockProducerHolder : this.producers.values()) {
            mockProducerHolder.getProducer().flush();
            List history = mockProducerHolder.getProducer().history();
            mockProducerHolder.getProducer().clear();
            Iterator it = history.iterator();
            while (it.hasNext()) {
                pushRecord((ProducerRecord) it.next(), mockProducerHolder);
            }
        }
    }

    private void pushRecord(ProducerRecord<byte[], Box> producerRecord, MockProducerHolder mockProducerHolder) {
        TopicPartition topicPartition = mockProducerHolder.topicPartition(producerRecord);
        ConsumerRecord<byte[], Box> consumerRecord = new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(), 1L, GenericUtil.longNullAsZero(producerRecord.timestamp()), TimestampType.CREATE_TIME, 1L, 1, 1, producerRecord.key(), serialization((Box) producerRecord.value()), producerRecord.headers());
        List<ConsumerDefinition> list = this.consumerDefinitionList;
        if (list != null) {
            HashMap hashMap = new HashMap();
            hashMap.put(topicPartition, Collections.singletonList(consumerRecord));
            ConsumerRecords<byte[], Box> consumerRecords = new ConsumerRecords<>(hashMap);
            for (ConsumerDefinition consumerDefinition : list) {
                Invoker invoker = consumerDefinition.getInvoker();
                Set<String> usingProducerNames = invoker.getUsingProducerNames();
                Invoker.InvokeSession createSession = invoker.createSession();
                Throwable th = null;
                try {
                    try {
                        for (String str : usingProducerNames) {
                            createSession.putProducer(str, ProducerFacadeBridge.createPermanentBridge(str, this.producerSource));
                        }
                        if (!createSession.invoke(consumerRecords)) {
                            throw new RuntimeException("Cannot invoke consumer " + consumerDefinition.logDisplay() + " of record " + producerRecord.value());
                        }
                        if (createSession != null) {
                            if (0 != 0) {
                                try {
                                    createSession.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                createSession.close();
                            }
                        }
                    } finally {
                    }
                } catch (Throwable th3) {
                    if (createSession != null) {
                        if (th != null) {
                            try {
                                createSession.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            createSession.close();
                        }
                    }
                    throw th3;
                }
            }
        }
        this.pushedRecords.add(consumerRecord);
    }

    private Box serialization(Box box) {
        StrConverter strConverter = strConverterSupplier().get();
        return (Box) strConverter.fromStr(strConverter.toStr(box));
    }

    public void clearAllProducers() {
        Iterator<MockProducerHolder> it = this.producers.values().iterator();
        while (it.hasNext()) {
            it.next().getProducer().clear();
        }
    }

    public void clearPushed() {
        this.pushedRecords.clear();
    }

    public List<ConsumerRecord<byte[], Box>> allPushed() {
        return Collections.unmodifiableList(new ArrayList(this.pushedRecords));
    }

    public <T> List<BoxHolder<T>> pushedOf(Class<T> cls) {
        return (List) this.pushedRecords.stream().map(consumerRecord -> {
            return BoxUtil.hold((Box) consumerRecord.value(), cls);
        }).filter((v0) -> {
            return v0.isPresent();
        }).map((v0) -> {
            return v0.get();
        }).collect(Collectors.toList());
    }

    @Override // kz.greetgo.kafka.core.KafkaReactor
    public void startConsumers() {
        verifyControllerList();
        this.consumerDefinitionList = accumulateConsumerDefinitionList();
    }
}
