package com.hortonworks.registries.streams.examples;

import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
import com.hortonworks.registries.schemaregistry.serdes.avro.AbstractAvroSnapshotDeserializer;
import com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroDeserializer;
import com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroSerializer;
import com.hortonworks.registries.streams.examples.dtos.PageView;
import com.hortonworks.registries.streams.examples.dtos.UserProfile;
import java.io.PrintStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;

/* loaded from: input_file:com/hortonworks/registries/streams/examples/ClickStreamEnrichmentDriver.class */
public class ClickStreamEnrichmentDriver {
    static final String BOOTSTRAP_SERVERS = "localhost:9092";
    static final String SCHEMA_REGISTRY_URL = "http://localhost:9090/api/v1";
    static final String USER_PROFILE_TOPIC = "user-profile";
    static final String PAGE_VIEWS_TOPIC = "page-views";
    static final String USER_ACTIVITY_TOPIC = "user-activity";
    static final String STORE_SCHEMA_VERSION_ID_IN_HEADER_POLICY = "true";

    private ClickStreamEnrichmentDriver() {
    }

    private void generateFakeProfilesAndPageViews() {
        UserProfile[] userProfileArr = {new UserProfile(0L, "Neo", 25, "asia"), new UserProfile(1L, "Morpheus", 28, "africa"), new UserProfile(2L, "Smith", 40, "america"), new UserProfile(3L, "Trinity", 35, "america"), new UserProfile(4L, "Dozer", 30, "south america")};
        PageView[] pageViewArr = {new PageView(0L, "the one"), new PageView(1L, "trainer"), new PageView(2L, "agent"), new PageView(4L, "submarine"), new PageView(10L, "optical")};
        ArrayList arrayList = new ArrayList();
        for (UserProfile userProfile : userProfileArr) {
            arrayList.add(new ProducerRecord(USER_PROFILE_TOPIC, Long.valueOf(userProfile.getId()), userProfile));
        }
        for (PageView pageView : pageViewArr) {
            arrayList.add(new ProducerRecord(PAGE_VIEWS_TOPIC, Long.valueOf(pageView.getUserId()), pageView));
        }
        Properties properties = new Properties();
        properties.put("bootstrap.servers", BOOTSTRAP_SERVERS);
        properties.put(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), "http://localhost:9090/api/v1");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);
        properties.put(KafkaAvroSerializer.STORE_SCHEMA_VERSION_ID_IN_HEADER, "true");
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        Throwable th = null;
        try {
            try {
                arrayList.forEach(producerRecord -> {
                    kafkaProducer.send(producerRecord, (recordMetadata, exc) -> {
                        if (exc != null) {
                            System.err.println("Error while sending record : " + recordMetadata + " with message : " + exc.getMessage());
                        }
                    });
                });
                if (kafkaProducer != null) {
                    if (0 == 0) {
                        kafkaProducer.close();
                        return;
                    }
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaProducer != null) {
                if (th != null) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaProducer.close();
                }
            }
            throw th4;
        }
    }

    private void consumeUserActivity() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", BOOTSTRAP_SERVERS);
        properties.put(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), "http://localhost:9090/api/v1");
        properties.put("group.id", "user-activity-reader");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.put(AbstractAvroSnapshotDeserializer.SPECIFIC_AVRO_READER, true);
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        Throwable th = null;
        try {
            try {
                kafkaConsumer.subscribe(Collections.singleton(USER_ACTIVITY_TOPIC));
                while (true) {
                    ConsumerRecords poll = kafkaConsumer.poll(Duration.ofSeconds(1L));
                    PrintStream printStream = System.out;
                    printStream.getClass();
                    poll.forEach((v1) -> {
                        r1.println(v1);
                    });
                }
            } finally {
            }
        } catch (Throwable th2) {
            if (kafkaConsumer != null) {
                if (th != null) {
                    try {
                        kafkaConsumer.close();
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                    }
                } else {
                    kafkaConsumer.close();
                }
            }
            throw th2;
        }
    }

    public static void main(String[] strArr) {
        ClickStreamEnrichmentDriver clickStreamEnrichmentDriver = new ClickStreamEnrichmentDriver();
        clickStreamEnrichmentDriver.generateFakeProfilesAndPageViews();
        clickStreamEnrichmentDriver.consumeUserActivity();
    }
}
