package com.cloudera.oryx.lambda;

import com.cloudera.oryx.common.lang.ClassUtils;
import com.cloudera.oryx.common.settings.ConfigUtils;
import com.cloudera.oryx.kafka.util.KafkaUtils;
import com.google.common.base.Preconditions;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigValue;
import java.io.Closeable;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/cloudera/oryx/lambda/AbstractSparkLayer.class */
public abstract class AbstractSparkLayer<K, M> implements Closeable {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) AbstractSparkLayer.class);
    private final Config config;
    private final String id;
    private final String streamingMaster;
    private final String inputTopic;
    private final String inputTopicLockMaster;
    private final String inputBroker;
    private final String updateTopic;
    private final String updateTopicLockMaster;
    private final Class<K> keyClass;
    private final Class<M> messageClass;
    private final Class<? extends Deserializer<K>> keyDecoderClass;
    private final Class<? extends Deserializer<M>> messageDecoderClass;
    private final int generationIntervalSec;
    private final Map<String, Object> extraSparkConfig;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractSparkLayer(Config config) {
        Objects.requireNonNull(config);
        log.info("Configuration:\n{}", ConfigUtils.prettyPrint(config));
        String configGroup = getConfigGroup();
        this.config = config;
        String optionalString = ConfigUtils.getOptionalString(config, "oryx.id");
        this.id = optionalString == null ? UUID.randomUUID().toString() : optionalString;
        this.streamingMaster = config.getString("oryx." + configGroup + ".streaming.master");
        this.inputTopic = config.getString("oryx.input-topic.message.topic");
        this.inputTopicLockMaster = config.getString("oryx.input-topic.lock.master");
        this.inputBroker = config.getString("oryx.input-topic.broker");
        this.updateTopic = ConfigUtils.getOptionalString(config, "oryx.update-topic.message.topic");
        this.updateTopicLockMaster = ConfigUtils.getOptionalString(config, "oryx.update-topic.lock.master");
        this.keyClass = ClassUtils.loadClass(config.getString("oryx.input-topic.message.key-class"));
        this.messageClass = ClassUtils.loadClass(config.getString("oryx.input-topic.message.message-class"));
        this.keyDecoderClass = ClassUtils.loadClass(config.getString("oryx.input-topic.message.key-decoder-class"), Deserializer.class);
        this.messageDecoderClass = ClassUtils.loadClass(config.getString("oryx.input-topic.message.message-decoder-class"), Deserializer.class);
        this.generationIntervalSec = config.getInt("oryx." + configGroup + ".streaming.generation-interval-sec");
        this.extraSparkConfig = new HashMap();
        config.getConfig("oryx." + configGroup + ".streaming.config").entrySet().forEach(entry -> {
            this.extraSparkConfig.put(entry.getKey(), ((ConfigValue) entry.getValue()).unwrapped());
        });
        Preconditions.checkArgument(this.generationIntervalSec > 0);
    }

    protected abstract String getConfigGroup();

    protected abstract String getLayerName();

    /* JADX INFO: Access modifiers changed from: protected */
    public final Config getConfig() {
        return this.config;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final String getID() {
        return this.id;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final String getGroupID() {
        return "OryxGroup-" + getLayerName() + "-" + getID();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final String getInputTopicLockMaster() {
        return this.inputTopicLockMaster;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final Class<K> getKeyClass() {
        return this.keyClass;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final Class<M> getMessageClass() {
        return this.messageClass;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final JavaStreamingContext buildStreamingContext() {
        log.info("Starting SparkContext with interval {} seconds", Integer.valueOf(this.generationIntervalSec));
        SparkConf sparkConf = new SparkConf();
        if (sparkConf.getOption("spark.master").isEmpty()) {
            log.info("Overriding master to {} for tests", this.streamingMaster);
            sparkConf.setMaster(this.streamingMaster);
        }
        if (sparkConf.getOption("spark.app.name").isEmpty()) {
            String str = "Oryx" + getLayerName();
            if (this.id != null) {
                str = str + "-" + this.id;
            }
            log.info("Overriding app name to {} for tests", str);
            sparkConf.setAppName(str);
        }
        this.extraSparkConfig.forEach((str2, obj) -> {
            sparkConf.setIfMissing(str2, obj.toString());
        });
        sparkConf.setIfMissing("spark.streaming.gracefulStopTimeout", Long.toString(TimeUnit.MILLISECONDS.convert(this.generationIntervalSec, TimeUnit.SECONDS)));
        sparkConf.setIfMissing("spark.cleaner.ttl", Integer.toString(20 * this.generationIntervalSec));
        return new JavaStreamingContext(JavaSparkContext.fromSparkContext(SparkContext.getOrCreate(sparkConf)), new Duration(TimeUnit.MILLISECONDS.convert(this.generationIntervalSec, TimeUnit.SECONDS)));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final JavaInputDStream<ConsumerRecord<K, M>> buildInputDStream(JavaStreamingContext javaStreamingContext) {
        Preconditions.checkArgument(KafkaUtils.topicExists(this.inputTopicLockMaster, this.inputTopic), "Topic %s does not exist; did you create it?", this.inputTopic);
        if (this.updateTopic != null && this.updateTopicLockMaster != null) {
            Preconditions.checkArgument(KafkaUtils.topicExists(this.updateTopicLockMaster, this.updateTopic), "Topic %s does not exist; did you create it?", this.updateTopic);
        }
        String groupID = getGroupID();
        HashMap hashMap = new HashMap();
        hashMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupID);
        hashMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        hashMap.put("bootstrap.servers", this.inputBroker);
        hashMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, this.keyDecoderClass.getName());
        hashMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, this.messageDecoderClass.getName());
        return org.apache.spark.streaming.kafka010.KafkaUtils.createDirectStream(javaStreamingContext, LocationStrategies.PreferConsistent(), ConsumerStrategies.Subscribe(Collections.singleton(this.inputTopic), hashMap, (Map<TopicPartition, Long>) Collections.emptyMap()));
    }
}
