package com.cloudera.oryx.kafka.util;

import com.cloudera.oryx.api.KeyMessage;
import com.cloudera.oryx.common.OryxTest;
import com.cloudera.oryx.common.io.IOUtils;
import com.cloudera.oryx.common.lang.LoggingCallable;
import java.util.Iterator;
import java.util.List;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/cloudera/oryx/kafka/util/ProduceConsumeIT.class */
public final class ProduceConsumeIT extends OryxTest {
    private static final Logger log = LoggerFactory.getLogger(ProduceConsumeIT.class);
    private static final String TOPIC = ProduceConsumeIT.class.getSimpleName() + "Topic";
    private static final int NUM_DATA = 100;

    /* JADX WARN: Finally extract failed */
    @Test
    public void testProduceConsume() throws Exception {
        int chooseFreePort = IOUtils.chooseFreePort();
        int chooseFreePort2 = IOUtils.chooseFreePort();
        LocalZKServer localZKServer = new LocalZKServer(chooseFreePort);
        Throwable th = null;
        try {
            LocalKafkaBroker localKafkaBroker = new LocalKafkaBroker(chooseFreePort2, chooseFreePort);
            Throwable th2 = null;
            try {
                localZKServer.start();
                localKafkaBroker.start();
                String str = "localhost:" + chooseFreePort;
                KafkaUtils.deleteTopic(str, TOPIC);
                KafkaUtils.maybeCreateTopic(str, TOPIC, 4);
                ProduceData produceData = new ProduceData(new DefaultCSVDatumGenerator(), localKafkaBroker.getPort(), TOPIC, NUM_DATA, 0);
                try {
                    Iterator<KeyMessage<String, String>> iterator2 = new ConsumeData(TOPIC, chooseFreePort2).iterator2();
                    Throwable th3 = null;
                    try {
                        try {
                            log.info("Starting consumer thread");
                            ConsumeTopicRunnable consumeTopicRunnable = new ConsumeTopicRunnable(iterator2, NUM_DATA);
                            new Thread(LoggingCallable.log(consumeTopicRunnable).asRunnable(), "ConsumeTopicThread").start();
                            consumeTopicRunnable.awaitRun();
                            log.info("Producing data");
                            produceData.start();
                            consumeTopicRunnable.awaitMessages();
                            List<String> keys = consumeTopicRunnable.getKeys();
                            if (iterator2 != null) {
                                if (0 != 0) {
                                    try {
                                        iterator2.close();
                                    } catch (Throwable th4) {
                                        th3.addSuppressed(th4);
                                    }
                                } else {
                                    iterator2.close();
                                }
                            }
                            KafkaUtils.deleteTopic(str, TOPIC);
                            if (keys.size() != NUM_DATA) {
                                log.info("keys = {}", keys);
                                assertEquals(100L, keys.size());
                            }
                            for (int i = 0; i < NUM_DATA; i++) {
                                assertContains(keys, Integer.toString(i));
                            }
                            if (localKafkaBroker != null) {
                                if (0 != 0) {
                                    try {
                                        localKafkaBroker.close();
                                    } catch (Throwable th5) {
                                        th2.addSuppressed(th5);
                                    }
                                } else {
                                    localKafkaBroker.close();
                                }
                            }
                            if (localZKServer != null) {
                                if (0 == 0) {
                                    localZKServer.close();
                                    return;
                                }
                                try {
                                    localZKServer.close();
                                } catch (Throwable th6) {
                                    th.addSuppressed(th6);
                                }
                            }
                        } catch (Throwable th7) {
                            th3 = th7;
                            throw th7;
                        }
                    } catch (Throwable th8) {
                        if (iterator2 != null) {
                            if (th3 != null) {
                                try {
                                    iterator2.close();
                                } catch (Throwable th9) {
                                    th3.addSuppressed(th9);
                                }
                            } else {
                                iterator2.close();
                            }
                        }
                        throw th8;
                    }
                } catch (Throwable th10) {
                    KafkaUtils.deleteTopic(str, TOPIC);
                    throw th10;
                }
            } catch (Throwable th11) {
                if (localKafkaBroker != null) {
                    if (0 != 0) {
                        try {
                            localKafkaBroker.close();
                        } catch (Throwable th12) {
                            th2.addSuppressed(th12);
                        }
                    } else {
                        localKafkaBroker.close();
                    }
                }
                throw th11;
            }
        } catch (Throwable th13) {
            if (localZKServer != null) {
                if (0 != 0) {
                    try {
                        localZKServer.close();
                    } catch (Throwable th14) {
                        th.addSuppressed(th14);
                    }
                } else {
                    localZKServer.close();
                }
            }
            throw th13;
        }
    }
}
