package com.hortonworks.registries.streams.examples;

import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
import com.hortonworks.registries.schemaregistry.examples.avro.SampleSchemaRegistryClientApp;
import com.hortonworks.registries.schemaregistry.serdes.avro.AbstractAvroSnapshotDeserializer;
import com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroSerde;
import com.hortonworks.registries.schemaregistry.serdes.avro.kafka.KafkaAvroSerializer;
import com.hortonworks.registries.streams.examples.dtos.UserActivity;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.rocksdb.RateLimiter;

/* loaded from: input_file:com/hortonworks/registries/streams/examples/ClickStreamEnrichment.class */
public class ClickStreamEnrichment {
    public static void main(String[] strArr) throws InterruptedException {
        Properties properties = new Properties();
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "click_stream_enrichment");
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put(SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name(), SampleSchemaRegistryClientApp.DEFAULT_SCHEMA_REG_URL);
        properties.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp");
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.LongSerde.class);
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, KafkaAvroSerde.class);
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.put(AbstractAvroSnapshotDeserializer.SPECIFIC_AVRO_READER, true);
        properties.put(KafkaAvroSerializer.STORE_SCHEMA_VERSION_ID_IN_HEADER, "true");
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("page-views").leftJoin(streamsBuilder.table("user-profile"), (pageView, userProfile) -> {
            return userProfile != null ? new UserActivity(Long.valueOf(userProfile.getId()), userProfile.getName(), Integer.valueOf(userProfile.getAge()), userProfile.getRegion(), pageView.getPage()) : new UserActivity(Long.valueOf(pageView.getUserId()), "anonymous", -1, "unknown", pageView.getPage());
        }).to("user-activity");
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(), properties);
        try {
            kafkaStreams.start();
            Thread.sleep(RateLimiter.DEFAULT_REFILL_PERIOD_MICROS);
            kafkaStreams.close();
        } catch (Throwable th) {
            kafkaStreams.close();
            throw th;
        }
    }
}
