package com.hortonworks.registries.schemaregistry.examples.avro;

import java.io.FileInputStream;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.io.DecoderFactory;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionGroup;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/hortonworks/registries/schemaregistry/examples/avro/KafkaAvroSerDesApp.class */
public class KafkaAvroSerDesApp {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaAvroSerDesApp.class);
    public static final String MSGS_LIMIT_OLD = "msgsLimit";
    public static final String MSGS_LIMIT = "msgs.limit";
    public static final String TOPIC = "topic";
    public static final int DEFAULT_MSGS_LIMIT = 50;
    public static final String IGNORE_INVALID_MSGS = "ignore.invalid.messages";
    private String producerProps;
    private String schemaFile;
    private String consumerProps;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hortonworks/registries/schemaregistry/examples/avro/KafkaAvroSerDesApp$MyProducerCallback.class */
    public static class MyProducerCallback implements Callback {
        private MyProducerCallback() {
        }

        public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
            KafkaAvroSerDesApp.LOG.info("#### received [{}], ex: [{}]", recordMetadata, exc);
        }
    }

    public KafkaAvroSerDesApp(String str, String str2) {
        this.producerProps = str;
        this.schemaFile = str2;
    }

    public KafkaAvroSerDesApp(String str) {
        this.consumerProps = str;
    }

    /* JADX WARN: Code restructure failed: missing block: B:49:0x0164, code lost:
    
        if (r0 == null) goto L68;
     */
    /* JADX WARN: Code restructure failed: missing block: B:51:0x0169, code lost:
    
        if (0 == 0) goto L53;
     */
    /* JADX WARN: Code restructure failed: missing block: B:52:0x0180, code lost:
    
        r0.close();
     */
    /* JADX WARN: Code restructure failed: missing block: B:54:0x016c, code lost:
    
        r0.close();
     */
    /* JADX WARN: Code restructure failed: missing block: B:56:0x0174, code lost:
    
        r18 = move-exception;
     */
    /* JADX WARN: Code restructure failed: missing block: B:57:0x0176, code lost:
    
        r0.addSuppressed(r18);
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void sendMessages(java.lang.String r7) throws java.lang.Exception {
        /*
            Method dump skipped, instructions count: 514
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: com.hortonworks.registries.schemaregistry.examples.avro.KafkaAvroSerDesApp.sendMessages(java.lang.String):void");
    }

    private Object jsonToAvro(String str, Schema schema) throws Exception {
        Object read = new GenericDatumReader(schema).read((Object) null, DecoderFactory.get().jsonDecoder(schema, str));
        if (schema.getType().equals(Schema.Type.STRING)) {
            read = read.toString();
        }
        return read;
    }

    public void consumeMessages() throws Exception {
        Properties properties = new Properties();
        FileInputStream fileInputStream = new FileInputStream(this.consumerProps);
        Throwable th = null;
        try {
            try {
                properties.load(fileInputStream);
                if (fileInputStream != null) {
                    if (0 != 0) {
                        try {
                            fileInputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        fileInputStream.close();
                    }
                }
                String property = properties.getProperty(TOPIC);
                KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
                kafkaConsumer.subscribe(Collections.singletonList(property));
                while (true) {
                    ConsumerRecords poll = kafkaConsumer.poll(Duration.ofMillis(1000L));
                    LOG.info("records size " + poll.count());
                    Iterator it = poll.iterator();
                    while (it.hasNext()) {
                        ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                        LOG.info("Received message: (" + ((String) consumerRecord.key()) + ", " + consumerRecord.value() + ") at offset " + consumerRecord.offset() + " with headers : " + Arrays.toString(consumerRecord.headers().toArray()));
                    }
                }
            } finally {
            }
        } catch (Throwable th3) {
            if (fileInputStream != null) {
                if (th != null) {
                    try {
                        fileInputStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    fileInputStream.close();
                }
            }
            throw th3;
        }
    }

    public static void main(String[] strArr) {
        Option build = Option.builder("sm").longOpt("send-messages").desc("Send Messages to Kafka").type(Boolean.class).build();
        Option build2 = Option.builder("cm").longOpt("consume-messages").desc("Consume Messages from Kafka").type(Boolean.class).build();
        Option build3 = Option.builder("d").longOpt("data-file").hasArg().desc("Provide a data file").type(String.class).build();
        Option build4 = Option.builder("p").longOpt("producer-config").hasArg().desc("Provide a Kafka producer config file").type(String.class).build();
        Option build5 = Option.builder("s").longOpt("schema-file").hasArg().desc("Provide a schema file").type(String.class).build();
        Option build6 = Option.builder("c").longOpt("consumer-config").hasArg().desc("Provide a Kafka Consumer config file").type(String.class).build();
        OptionGroup optionGroup = new OptionGroup();
        optionGroup.addOption(build);
        optionGroup.addOption(build2);
        optionGroup.setRequired(true);
        Options options = new Options();
        options.addOptionGroup(optionGroup);
        options.addOption(build3);
        options.addOption(build4);
        options.addOption(build5);
        options.addOption(build6);
        try {
            CommandLine parse = new DefaultParser().parse(options, strArr);
            if (parse.hasOption("sm")) {
                if (parse.hasOption("p") && parse.hasOption("d") && parse.hasOption("s")) {
                    new KafkaAvroSerDesApp(parse.getOptionValue("p"), parse.getOptionValue("s")).sendMessages(parse.getOptionValue("d"));
                } else {
                    LOG.error("please provide following options for sending messages to Kafka");
                    LOG.error("-d or --data-file");
                    LOG.error("-s or --schema-file");
                    LOG.error("-p or --producer-config");
                }
            } else if (parse.hasOption("cm")) {
                if (parse.hasOption("c")) {
                    new KafkaAvroSerDesApp(parse.getOptionValue("c")).consumeMessages();
                } else {
                    LOG.error("please provide following options for consuming messages from Kafka");
                    LOG.error("-c or --consumer-config");
                }
            }
        } catch (Exception e) {
            LOG.error("Failed to send/receive messages ", e);
        } catch (ParseException e2) {
            LOG.error("Please provide all the options ", e2);
        }
    }
}
