package com.cloudera.oryx.lambda.batch;

import com.cloudera.oryx.api.TopicProducer;
import com.cloudera.oryx.api.batch.BatchLayerUpdate;
import com.cloudera.oryx.common.settings.ConfigUtils;
import com.cloudera.oryx.lambda.TopicProducerImpl;
import com.typesafe.config.Config;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.util.StringUtils;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/cloudera/oryx/lambda/batch/BatchUpdateFunction.class */
final class BatchUpdateFunction<K, M, U> implements VoidFunction2<JavaPairRDD<K, M>, Time> {
    private static final Logger log = LoggerFactory.getLogger(BatchUpdateFunction.class);
    private final Class<K> keyClass;
    private final Class<M> messageClass;
    private final Class<? extends Writable> keyWritableClass;
    private final Class<? extends Writable> messageWritableClass;
    private final String dataDirString;
    private final String modelDirString;
    private final BatchLayerUpdate<K, M, U> updateInstance;
    private final String updateBroker;
    private final String updateTopic;
    private final JavaSparkContext sparkContext;

    /* JADX INFO: Access modifiers changed from: package-private */
    public BatchUpdateFunction(Config config, Class<K> cls, Class<M> cls2, Class<? extends Writable> cls3, Class<? extends Writable> cls4, String str, String str2, BatchLayerUpdate<K, M, U> batchLayerUpdate, JavaStreamingContext javaStreamingContext) {
        this.keyClass = cls;
        this.messageClass = cls2;
        this.keyWritableClass = cls3;
        this.messageWritableClass = cls4;
        this.dataDirString = str;
        this.modelDirString = str2;
        this.updateBroker = ConfigUtils.getOptionalString(config, "oryx.update-topic.broker");
        this.updateTopic = ConfigUtils.getOptionalString(config, "oryx.update-topic.message.topic");
        this.updateInstance = batchLayerUpdate;
        this.sparkContext = javaStreamingContext.sparkContext();
    }

    public void call(JavaPairRDD<K, M> javaPairRDD, Time time) throws IOException, InterruptedException {
        JavaPairRDD javaPairRDD2;
        if (javaPairRDD.isEmpty()) {
            log.info("No data in current generation's RDD; nothing to do");
            return;
        }
        log.info("Beginning update at {}", time);
        Configuration hadoopConfiguration = this.sparkContext.hadoopConfiguration();
        if (hadoopConfiguration.getResource("core-site.xml") == null) {
            log.warn("Hadoop config like core-site.xml was not found; is the Hadoop config directory on the classpath?");
        }
        Path path = new Path(this.dataDirString + "/*/part-*");
        FileSystem fileSystem = FileSystem.get(path.toUri(), hadoopConfiguration);
        FileStatus[] globStatus = fileSystem.globStatus(path);
        if (globStatus == null || globStatus.length == 0) {
            log.info("No past data at path(s) {}", path);
            javaPairRDD2 = null;
        } else {
            log.info("Found past data at path(s) like {}", globStatus[0].getPath());
            Configuration configuration = new Configuration(hadoopConfiguration);
            configuration.set("mapreduce.input.fileinputformat.inputdir", joinFSPaths(fileSystem, globStatus));
            javaPairRDD2 = this.sparkContext.newAPIHadoopRDD(configuration, SequenceFileInputFormat.class, this.keyWritableClass, this.messageWritableClass).mapToPair(new WritableToValueFunction(this.keyClass, this.messageClass, this.keyWritableClass, this.messageWritableClass));
        }
        if (this.updateTopic == null || this.updateBroker == null) {
            log.info("Not producing updates to update topic since none was configured");
            this.updateInstance.runUpdate(this.sparkContext, time.milliseconds(), javaPairRDD, javaPairRDD2, this.modelDirString, (TopicProducer) null);
            return;
        }
        TopicProducerImpl topicProducerImpl = new TopicProducerImpl(this.updateBroker, this.updateTopic, false);
        Throwable th = null;
        try {
            try {
                this.updateInstance.runUpdate(this.sparkContext, time.milliseconds(), javaPairRDD, javaPairRDD2, this.modelDirString, topicProducerImpl);
                if (topicProducerImpl != null) {
                    if (0 == 0) {
                        topicProducerImpl.close();
                        return;
                    }
                    try {
                        topicProducerImpl.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (topicProducerImpl != null) {
                if (th != null) {
                    try {
                        topicProducerImpl.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    topicProducerImpl.close();
                }
            }
            throw th4;
        }
    }

    private static String joinFSPaths(FileSystem fileSystem, FileStatus[] fileStatusArr) {
        StringBuilder sb = new StringBuilder();
        for (FileStatus fileStatus : fileStatusArr) {
            if (sb.length() > 0) {
                sb.append(',');
            }
            sb.append(StringUtils.escapeString(fileSystem.makeQualified(fileStatus.getPath()).toString()));
        }
        return sb.toString();
    }
}
