package com.cloudera.oryx.lambda.batch;

import com.cloudera.oryx.api.batch.BatchLayerUpdate;
import com.cloudera.oryx.api.batch.ScalaBatchLayerUpdate;
import com.cloudera.oryx.common.lang.ClassUtils;
import com.cloudera.oryx.lambda.AbstractSparkLayer;
import com.cloudera.oryx.lambda.DeleteOldDataFn;
import com.cloudera.oryx.lambda.UpdateOffsetsFn;
import com.google.common.base.Preconditions;
import com.typesafe.config.Config;
import java.lang.invoke.SerializedLambda;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

/* loaded from: input_file:com/cloudera/oryx/lambda/batch/BatchLayer.class */
public final class BatchLayer<K, M, U> extends AbstractSparkLayer<K, M> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) BatchLayer.class);
    private static final int NO_MAX_AGE = -1;
    private final Class<? extends Writable> keyWritableClass;
    private final Class<? extends Writable> messageWritableClass;
    private final String updateClassName;
    private final String dataDirString;
    private final String modelDirString;
    private final int maxDataAgeHours;
    private final int maxModelAgeHours;
    private JavaStreamingContext streamingContext;

    public BatchLayer(Config config) {
        super(config);
        this.keyWritableClass = ClassUtils.loadClass(config.getString("oryx.batch.storage.key-writable-class"), Writable.class);
        this.messageWritableClass = ClassUtils.loadClass(config.getString("oryx.batch.storage.message-writable-class"), Writable.class);
        this.updateClassName = config.getString("oryx.batch.update-class");
        this.dataDirString = config.getString("oryx.batch.storage.data-dir");
        this.modelDirString = config.getString("oryx.batch.storage.model-dir");
        this.maxDataAgeHours = config.getInt("oryx.batch.storage.max-age-data-hours");
        this.maxModelAgeHours = config.getInt("oryx.batch.storage.max-age-model-hours");
        Preconditions.checkArgument(!this.dataDirString.isEmpty());
        Preconditions.checkArgument(!this.modelDirString.isEmpty());
        Preconditions.checkArgument(this.maxDataAgeHours >= 0 || this.maxDataAgeHours == -1);
        Preconditions.checkArgument(this.maxModelAgeHours >= 0 || this.maxModelAgeHours == -1);
    }

    @Override // com.cloudera.oryx.lambda.AbstractSparkLayer
    protected String getConfigGroup() {
        return "batch";
    }

    @Override // com.cloudera.oryx.lambda.AbstractSparkLayer
    protected String getLayerName() {
        return "BatchLayer";
    }

    public synchronized void start() {
        String id = getID();
        if (id != null) {
            log.info("Starting Batch Layer {}", id);
        }
        this.streamingContext = buildStreamingContext();
        JavaSparkContext sparkContext = this.streamingContext.sparkContext();
        Configuration hadoopConfiguration = sparkContext.hadoopConfiguration();
        Path path = new Path(new Path(this.modelDirString), ".checkpoint");
        log.info("Setting checkpoint dir to {}", path);
        sparkContext.setCheckpointDir(path.toString());
        log.info("Creating message stream from topic");
        JavaInputDStream<ConsumerRecord<K, M>> buildInputDStream = buildInputDStream(this.streamingContext);
        JavaPairDStream mapToPair = buildInputDStream.mapToPair(consumerRecord -> {
            return new Tuple2(consumerRecord.key(), consumerRecord.value());
        });
        Class<K> keyClass = getKeyClass();
        Class<M> messageClass = getMessageClass();
        mapToPair.foreachRDD(new BatchUpdateFunction(getConfig(), keyClass, messageClass, this.keyWritableClass, this.messageWritableClass, this.dataDirString, this.modelDirString, loadUpdateInstance(), this.streamingContext));
        mapToPair.foreachRDD(new SaveToHDFSFunction(this.dataDirString + "/oryx", "data", keyClass, messageClass, this.keyWritableClass, this.messageWritableClass, hadoopConfiguration));
        buildInputDStream.foreachRDD(new UpdateOffsetsFn(getGroupID(), getInputTopicLockMaster()));
        if (this.maxDataAgeHours != -1) {
            mapToPair.foreachRDD(new DeleteOldDataFn(hadoopConfiguration, this.dataDirString, Pattern.compile("-(\\d+)\\."), this.maxDataAgeHours));
        }
        if (this.maxModelAgeHours != -1) {
            mapToPair.foreachRDD(new DeleteOldDataFn(hadoopConfiguration, this.modelDirString, Pattern.compile("(\\d+)"), this.maxModelAgeHours));
        }
        log.info("Starting Spark Streaming");
        this.streamingContext.start();
    }

    public void await() throws InterruptedException {
        JavaStreamingContext javaStreamingContext;
        synchronized (this) {
            javaStreamingContext = this.streamingContext;
            Preconditions.checkState(javaStreamingContext != null);
        }
        log.info("Spark Streaming is running");
        javaStreamingContext.awaitTermination();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public synchronized void close() {
        if (this.streamingContext != null) {
            log.info("Shutting down Spark Streaming; this may take some time");
            this.streamingContext.stop(true, true);
            this.streamingContext = null;
        }
    }

    private BatchLayerUpdate<K, M, U> loadUpdateInstance() {
        Class loadClass = ClassUtils.loadClass(this.updateClassName);
        if (BatchLayerUpdate.class.isAssignableFrom(loadClass)) {
            try {
                return (BatchLayerUpdate) ClassUtils.loadInstanceOf(this.updateClassName, BatchLayerUpdate.class, new Class[]{Config.class}, new Object[]{getConfig()});
            } catch (IllegalArgumentException e) {
                return (BatchLayerUpdate) ClassUtils.loadInstanceOf(this.updateClassName, BatchLayerUpdate.class);
            }
        }
        if (!ScalaBatchLayerUpdate.class.isAssignableFrom(loadClass)) {
            throw new IllegalArgumentException("Bad update class: " + this.updateClassName);
        }
        try {
            return new ScalaBatchLayerUpdateAdapter((ScalaBatchLayerUpdate) ClassUtils.loadInstanceOf(this.updateClassName, ScalaBatchLayerUpdate.class, new Class[]{Config.class}, new Object[]{getConfig()}));
        } catch (IllegalArgumentException e2) {
            return new ScalaBatchLayerUpdateAdapter((ScalaBatchLayerUpdate) ClassUtils.loadInstanceOf(this.updateClassName, ScalaBatchLayerUpdate.class));
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -8675165:
                if (implMethodName.equals("lambda$start$61112581$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/PairFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Lscala/Tuple2;") && serializedLambda.getImplClass().equals("com/cloudera/oryx/lambda/batch/BatchLayer") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/kafka/clients/consumer/ConsumerRecord;)Lscala/Tuple2;")) {
                    return consumerRecord -> {
                        return new Tuple2(consumerRecord.key(), consumerRecord.value());
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
