package com.cloudera.oryx.kafka.util;

import com.cloudera.oryx.common.collection.Pair;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import kafka.admin.AdminUtils;
import kafka.api.OffsetRequest$;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.common.TopicExistsException;
import kafka.consumer.ConsumerConfig;
import kafka.javaapi.OffsetRequest;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.utils.ZKGroupTopicDirs;
import kafka.utils.ZkUtils;
import kafka.utils.ZkUtils$;
import org.eclipse.persistence.internal.oxm.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Tuple2;
import scala.collection.JavaConversions;
import scala.collection.Seq;

/* loaded from: input_file:com/cloudera/oryx/kafka/util/KafkaUtils.class */
public final class KafkaUtils {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) KafkaUtils.class);
    private static final int ZK_TIMEOUT_MSEC = (int) TimeUnit.MILLISECONDS.convert(30, TimeUnit.SECONDS);

    private KafkaUtils() {
    }

    public static void maybeCreateTopic(String str, String str2, int i) {
        maybeCreateTopic(str, str2, i, new Properties());
    }

    public static void maybeCreateTopic(String str, String str2, int i, Properties properties) {
        ZkUtils apply = ZkUtils.apply(str, ZK_TIMEOUT_MSEC, ZK_TIMEOUT_MSEC, false);
        try {
            if (AdminUtils.topicExists(apply, str2)) {
                log.info("No need to create topic {} as it already exists", str2);
            } else {
                log.info("Creating topic {}", str2);
                try {
                    AdminUtils.createTopic(apply, str2, i, 1, properties);
                    log.info("Created Zookeeper topic {}", str2);
                } catch (TopicExistsException e) {
                    log.info("Zookeeper topic {} already exists", str2);
                }
            }
        } finally {
            apply.close();
        }
    }

    public static boolean topicExists(String str, String str2) {
        ZkUtils apply = ZkUtils.apply(str, ZK_TIMEOUT_MSEC, ZK_TIMEOUT_MSEC, false);
        try {
            boolean z = AdminUtils.topicExists(apply, str2);
            apply.close();
            return z;
        } catch (Throwable th) {
            apply.close();
            throw th;
        }
    }

    public static void deleteTopic(String str, String str2) {
        ZkUtils apply = ZkUtils.apply(str, ZK_TIMEOUT_MSEC, ZK_TIMEOUT_MSEC, false);
        try {
            if (AdminUtils.topicExists(apply, str2)) {
                log.info("Deleting topic {}", str2);
                AdminUtils.deleteTopic(apply, str2);
                log.info("Deleted Zookeeper topic {}", str2);
            } else {
                log.info("No need to delete topic {} as it does not exist", str2);
            }
        } finally {
            apply.close();
        }
    }

    public static Map<Pair<String, Integer>, Long> getOffsets(String str, String str2, String str3) {
        ZKGroupTopicDirs zKGroupTopicDirs = new ZKGroupTopicDirs(str2, str3);
        HashMap hashMap = new HashMap();
        ZkUtils apply = ZkUtils.apply(str, ZK_TIMEOUT_MSEC, ZK_TIMEOUT_MSEC, false);
        try {
            JavaConversions.seqAsJavaList((Seq) ((Tuple2) apply.getPartitionsForTopics(JavaConversions.asScalaBuffer(Collections.singletonList(str3))).head())._2()).forEach(obj -> {
                Option option = (Option) apply.readDataMaybeNull(zKGroupTopicDirs.consumerOffsetDir() + Constants.XPATH_SEPARATOR + obj)._1();
                hashMap.put(new Pair(str3, Integer.valueOf(Integer.parseInt(obj.toString()))), option.isDefined() ? Long.valueOf(Long.parseLong((String) option.get())) : null);
            });
            apply.close();
            return hashMap;
        } catch (Throwable th) {
            apply.close();
            throw th;
        }
    }

    public static void setOffsets(String str, String str2, Map<Pair<String, Integer>, Long> map) {
        ZkUtils apply = ZkUtils.apply(str, ZK_TIMEOUT_MSEC, ZK_TIMEOUT_MSEC, false);
        try {
            map.forEach((pair, l) -> {
                apply.updatePersistentPath(new ZKGroupTopicDirs(str2, (String) pair.getFirst()).consumerOffsetDir() + Constants.XPATH_SEPARATOR + ((Integer) pair.getSecond()).intValue(), Long.toString(l.longValue()), ZkUtils$.MODULE$.DefaultAcls(false));
            });
            apply.close();
        } catch (Throwable th) {
            apply.close();
            throw th;
        }
    }

    public static void fillInLatestOffsets(Map<Pair<String, Integer>, Long> map, Map<String, String> map2) {
        Properties properties = new Properties();
        properties.getClass();
        map2.forEach((v1, v2) -> {
            r1.put(v1, v2);
        });
        ConsumerConfig consumerConfig = new ConsumerConfig(properties);
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        map.keySet().forEach(pair -> {
            TopicAndPartition topicAndPartition = new TopicAndPartition((String) pair.getFirst(), ((Integer) pair.getSecond()).intValue());
            hashMap.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest$.MODULE$.LatestTime(), 1));
            hashMap2.put(topicAndPartition, new PartitionOffsetRequestInfo(OffsetRequest$.MODULE$.EarliestTime(), 1));
        });
        OffsetRequest offsetRequest = new OffsetRequest(hashMap, OffsetRequest$.MODULE$.CurrentVersion(), consumerConfig.clientId());
        OffsetRequest offsetRequest2 = new OffsetRequest(hashMap2, OffsetRequest$.MODULE$.CurrentVersion(), consumerConfig.clientId());
        SimpleConsumer simpleConsumer = null;
        for (String str : map2.get("bootstrap.servers").split(",")) {
            log.info("Connecting to broker {}", str);
            String[] split = str.split(":");
            String str2 = split[0];
            int parseInt = Integer.parseInt(split[1]);
            try {
                simpleConsumer = new SimpleConsumer(str2, parseInt, consumerConfig.socketTimeoutMs(), consumerConfig.socketReceiveBufferBytes(), consumerConfig.clientId());
                break;
            } catch (Exception e) {
                log.warn("Error while connecting to broker {}:{}", str2, Integer.valueOf(parseInt), e);
            }
        }
        Objects.requireNonNull(simpleConsumer, "No available brokers");
        try {
            OffsetResponse requestOffsets = requestOffsets(simpleConsumer, offsetRequest);
            OffsetResponse requestOffsets2 = requestOffsets(simpleConsumer, offsetRequest2);
            map.keySet().forEach(pair2 -> {
                long offset = getOffset(requestOffsets, pair2);
                Long l = (Long) map.get(pair2);
                if (l == null) {
                    log.info("No initial offsets for {}; using latest offset {} from topic", pair2, Long.valueOf(offset));
                    map.put(pair2, Long.valueOf(offset));
                } else {
                    if (l.longValue() > offset) {
                        log.warn("Initial offset {} for {} after latest offset {} from topic! using topic offset", l, pair2, Long.valueOf(offset));
                        map.put(pair2, Long.valueOf(offset));
                        return;
                    }
                    long offset2 = getOffset(requestOffsets2, pair2);
                    if (l.longValue() < offset2) {
                        log.warn("Initial offset {} for {} before earliest offset {} from topic! using topic offset", l, pair2, Long.valueOf(offset2));
                        map.put(pair2, Long.valueOf(offset2));
                    }
                }
            });
            simpleConsumer.close();
        } catch (Throwable th) {
            simpleConsumer.close();
            throw th;
        }
    }

    private static long getOffset(OffsetResponse offsetResponse, Pair<String, Integer> pair) {
        String first = pair.getFirst();
        int intValue = pair.getSecond().intValue();
        long[] offsets = offsetResponse.offsets(first, intValue);
        if (offsets.length > 0) {
            return offsets[0];
        }
        short errorCode = offsetResponse.errorCode(first, intValue);
        if (errorCode == ErrorMapping.UnknownTopicOrPartitionCode()) {
            return 0L;
        }
        throw new IllegalStateException("Error reading offset for " + first + " / " + intValue + ": " + ErrorMapping.exceptionNameFor(errorCode));
    }

    private static OffsetResponse requestOffsets(SimpleConsumer simpleConsumer, OffsetRequest offsetRequest) {
        return simpleConsumer.getOffsetsBefore(offsetRequest);
    }
}
