package com.hortonworks.registries.schemaregistry.avro;

import com.hortonworks.registries.common.test.IntegrationTest;
import com.hortonworks.registries.schemaregistry.avro.conf.SchemaRegistryTestProfileType;
import com.hortonworks.registries.schemaregistry.avro.helper.SchemaRegistryTestServerClientWrapper;
import com.hortonworks.registries.schemaregistry.avro.util.AvroSchemaRegistryClientUtil;
import com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroDeserializer;
import com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroSerializer;
import com.hortonworks.registries.util.CustomParameterizedRunner;
import com.hortonworks.registries.util.SchemaRegistryTestName;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(CustomParameterizedRunner.class)
@Category({IntegrationTest.class})
/* loaded from: input_file:com/hortonworks/registries/schemaregistry/avro/KafkaAvroSerDesWithKafkaServerTest.class */
public class KafkaAvroSerDesWithKafkaServerTest {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaAvroSerDesWithKafkaServerTest.class);
    private static LocalKafkaCluster CLUSTER;
    private static SchemaRegistryTestServerClientWrapper SCHEMA_REGISTRY_TEST_SERVER_CLIENT_WRAPPER;

    @Rule
    public SchemaRegistryTestName TEST_NAME_RULE = new SchemaRegistryTestName();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hortonworks/registries/schemaregistry/avro/KafkaAvroSerDesWithKafkaServerTest$LocalKafkaCluster.class */
    public static class LocalKafkaCluster extends EmbeddedKafkaCluster {
        private LocalKafkaCluster(int i) {
            super(i);
        }

        public LocalKafkaCluster(int i, Properties properties) {
            super(i, properties);
        }

        public LocalKafkaCluster(int i, Properties properties, long j) {
            super(i, properties, j);
        }

        public LocalKafkaCluster(int i, Properties properties, long j, long j2) {
            super(i, properties, j, j2);
        }

        void startCluster() {
            try {
                before();
            } catch (Throwable th) {
                throw new RuntimeException(th);
            }
        }

        void stopCluster() {
            after();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hortonworks/registries/schemaregistry/avro/KafkaAvroSerDesWithKafkaServerTest$ProducerCallback.class */
    public static class ProducerCallback implements Callback {
        private ProducerCallback() {
        }

        public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
            KafkaAvroSerDesWithKafkaServerTest.LOG.info("Received notification: [{}] and ex: [{}]", recordMetadata, exc);
        }
    }

    @CustomParameterizedRunner.Parameters
    public static Iterable<SchemaRegistryTestProfileType> profiles() {
        return Arrays.asList(SchemaRegistryTestProfileType.DEFAULT, SchemaRegistryTestProfileType.SSL);
    }

    @CustomParameterizedRunner.BeforeParam
    public static void beforeParam(SchemaRegistryTestProfileType schemaRegistryTestProfileType) throws Exception {
        SCHEMA_REGISTRY_TEST_SERVER_CLIENT_WRAPPER = new SchemaRegistryTestServerClientWrapper(schemaRegistryTestProfileType);
        SCHEMA_REGISTRY_TEST_SERVER_CLIENT_WRAPPER.startTestServer();
        CLUSTER = new LocalKafkaCluster(1);
        CLUSTER.startCluster();
    }

    @CustomParameterizedRunner.AfterParam
    public static void afterParam() throws Exception {
        CLUSTER.stopCluster();
        SCHEMA_REGISTRY_TEST_SERVER_CLIENT_WRAPPER.stopTestServer();
    }

    public KafkaAvroSerDesWithKafkaServerTest(SchemaRegistryTestProfileType schemaRegistryTestProfileType) {
    }

    @Test
    public void testPrimitivesInKafkaCluster() throws Exception {
        _testByStoringSchemaIdInHeaderOrPayload(this.TEST_NAME_RULE.getMethodName(), AvroSchemaRegistryClientUtil.generatePrimitivePayloads());
    }

    @Test
    public void testAvroRecordsInKafkaCluster() throws Exception {
        String methodName = this.TEST_NAME_RULE.getMethodName();
        _testByStoringSchemaIdInHeaderOrPayload(methodName + "-generic", AvroSchemaRegistryClientUtil.createGenericRecordForDevice());
        _testByStoringSchemaIdInHeaderOrPayload(methodName + "-specific", AvroSchemaRegistryClientUtil.createSpecificRecord());
    }

    private void _testByStoringSchemaIdInHeaderOrPayload(String str, Object[] objArr) throws InterruptedException {
        int length = objArr.length;
        for (int i = 0; i < length; i++) {
            Object obj = objArr[i];
            _testByStoringSchemaIdInHeaderOrPayload(str + (obj != null ? obj.getClass().getName().replace("$", "_") : "null"), obj);
        }
    }

    private void _testByStoringSchemaIdInHeaderOrPayload(String str, Object obj) throws InterruptedException {
        _testByStoringSchemaIdInHeaderOrPayload(str, obj, false);
        _testByStoringSchemaIdInHeaderOrPayload(str, obj, true);
    }

    private void _testByStoringSchemaIdInHeaderOrPayload(String str, Object obj, boolean z) throws InterruptedException {
        createTopic(str);
        try {
            ConsumerRecords<String, Object> consumeMessage = consumeMessage(str, produceMessage(str, obj, Boolean.valueOf(z)), str + "-group-" + new Random().nextLong());
            Assert.assertEquals(1L, consumeMessage.count());
            ConsumerRecord consumerRecord = (ConsumerRecord) consumeMessage.iterator().next();
            Headers headers = consumerRecord.headers();
            Assert.assertEquals(Boolean.valueOf(z), Boolean.valueOf(headers.lastHeader("key.schema.version.id") != null));
            Assert.assertEquals(Boolean.valueOf(z), Boolean.valueOf(headers.lastHeader("value.schema.version.id") != null));
            Object value = consumerRecord.value();
            Assert.assertEquals(getKey(obj), consumerRecord.key());
            AvroSchemaRegistryClientUtil.assertAvroObjs(obj, value);
            CLUSTER.deleteTopicAndWait(str);
        } catch (Throwable th) {
            CLUSTER.deleteTopicAndWait(str);
            throw th;
        }
    }

    private void createTopic(String str) throws InterruptedException {
        long currentTimeMillis = System.currentTimeMillis();
        while (true) {
            try {
                CLUSTER.createTopic(str);
                return;
            } catch (TopicExistsException e) {
                if (System.currentTimeMillis() - currentTimeMillis > 30000) {
                    throw e;
                }
                Thread.sleep(2000L);
            }
        }
    }

    private ConsumerRecords<String, Object> consumeMessage(String str, String str2, String str3) {
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", str2);
        hashMap.putAll(SCHEMA_REGISTRY_TEST_SERVER_CLIENT_WRAPPER.exportClientConf(true));
        hashMap.put("group.id", str3);
        hashMap.put("enable.auto.commit", "true");
        hashMap.put("auto.commit.interval.ms", "1000");
        hashMap.put("session.timeout.ms", "30000");
        hashMap.put("key.deserializer", KafkaAvroDeserializer.class.getName());
        hashMap.put("value.deserializer", KafkaAvroDeserializer.class.getName());
        KafkaConsumer kafkaConsumer = new KafkaConsumer(hashMap);
        List<PartitionInfo> partitionsFor = kafkaConsumer.partitionsFor(str);
        ArrayList arrayList = new ArrayList();
        for (PartitionInfo partitionInfo : partitionsFor) {
            arrayList.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
        }
        LOG.info("partitions [{}]", arrayList);
        LOG.info("subscribed topis: [{}] ", kafkaConsumer.listTopics());
        kafkaConsumer.assign(arrayList);
        kafkaConsumer.seekToBeginning(arrayList);
        ConsumerRecords<String, Object> consumerRecords = null;
        int i = 0;
        while (true) {
            int i2 = i;
            i++;
            if (i2 >= 100 || !(consumerRecords == null || consumerRecords.isEmpty())) {
                break;
            }
            LOG.info("Polling for consuming messages");
            consumerRecords = kafkaConsumer.poll(Duration.ofMillis(500L));
        }
        kafkaConsumer.commitSync();
        kafkaConsumer.close();
        return consumerRecords;
    }

    private String produceMessage(String str, Object obj, Boolean bool) {
        String bootstrapServers = CLUSTER.bootstrapServers();
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", bootstrapServers);
        hashMap.putAll(SCHEMA_REGISTRY_TEST_SERVER_CLIENT_WRAPPER.exportClientConf(true));
        hashMap.put("key.serializer", KafkaAvroSerializer.class.getName());
        hashMap.put("value.serializer", KafkaAvroSerializer.class.getName());
        hashMap.put("store.schema.version.id.in.header", bool.toString());
        KafkaProducer kafkaProducer = new KafkaProducer(hashMap);
        ProducerCallback producerCallback = new ProducerCallback();
        LOG.info("Sending message: [{}] to topic: [{}]", obj, str);
        kafkaProducer.send(new ProducerRecord(str, getKey(obj), obj), producerCallback);
        kafkaProducer.flush();
        LOG.info("Message successfully sent to topic: [{}]", str);
        kafkaProducer.close(5L, TimeUnit.SECONDS);
        return bootstrapServers;
    }

    private String getKey(Object obj) {
        return obj != null ? obj.toString() : "null";
    }

    @Test
    public void testSchemasCompatibility() throws Exception {
        String str = this.TEST_NAME_RULE.getMethodName() + "-" + System.currentTimeMillis();
        _testByStoringSchemaIdInHeaderOrPayload(str, AvroSchemaRegistryClientUtil.createGenericRecordForDevice());
        try {
            _testByStoringSchemaIdInHeaderOrPayload(str, AvroSchemaRegistryClientUtil.createGenericRecordForIncompatDevice());
            Assert.fail("An error should have been received here because of incompatible schemas");
        } catch (Exception e) {
        }
        _testByStoringSchemaIdInHeaderOrPayload(str, AvroSchemaRegistryClientUtil.createGenericRecordForCompatDevice());
    }
}
