package com.cloudera.oryx.lambda.serving;

import com.cloudera.oryx.api.KeyMessage;
import com.cloudera.oryx.api.TopicProducer;
import com.cloudera.oryx.api.serving.ScalaServingModelManager;
import com.cloudera.oryx.api.serving.ServingModelManager;
import com.cloudera.oryx.common.collection.CloseableIterator;
import com.cloudera.oryx.common.lang.ClassUtils;
import com.cloudera.oryx.common.lang.LoggingCallable;
import com.cloudera.oryx.common.settings.ConfigUtils;
import com.cloudera.oryx.kafka.util.ConsumeDataIterator;
import com.cloudera.oryx.kafka.util.KafkaUtils;
import com.google.common.base.Preconditions;
import com.typesafe.config.Config;
import java.io.Closeable;
import java.util.Collections;
import java.util.Enumeration;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.servlet.ServletContext;
import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;
import javax.servlet.annotation.WebListener;
import org.apache.hadoop.conf.Configuration;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@WebListener
/* loaded from: input_file:com/cloudera/oryx/lambda/serving/ModelManagerListener.class */
public final class ModelManagerListener<K, M, U> implements ServletContextListener, Closeable {
    private static final Logger log = LoggerFactory.getLogger(ModelManagerListener.class);
    static final String MANAGER_KEY = ModelManagerListener.class.getName() + ".ModelManager";
    private static final String INPUT_PRODUCER_KEY = ModelManagerListener.class.getName() + ".InputProducer";
    private Config config;
    private String updateTopic;
    private int maxMessageSize;
    private String updateTopicLockMaster;
    private String updateTopicBroker;
    private boolean readOnly;
    private String inputTopic;
    private String inputTopicLockMaster;
    private String inputTopicBroker;
    private String modelManagerClassName;
    private Class<? extends Deserializer<U>> updateDecoderClass;
    private CloseableIterator<KeyMessage<String, U>> consumerIterator;
    private ServingModelManager<U> modelManager;
    private TopicProducer<K, M> inputProducer;

    void init(ServletContext servletContext) {
        String initParameter = servletContext.getInitParameter(ConfigUtils.class.getName() + ".serialized");
        Objects.requireNonNull(initParameter);
        this.config = ConfigUtils.deserialize(initParameter);
        this.updateTopic = this.config.getString("oryx.update-topic.message.topic");
        this.maxMessageSize = this.config.getInt("oryx.update-topic.message.max-size");
        this.updateTopicLockMaster = this.config.getString("oryx.update-topic.lock.master");
        this.updateTopicBroker = this.config.getString("oryx.update-topic.broker");
        this.readOnly = this.config.getBoolean("oryx.serving.api.read-only");
        if (!this.readOnly) {
            this.inputTopic = this.config.getString("oryx.input-topic.message.topic");
            this.inputTopicLockMaster = this.config.getString("oryx.input-topic.lock.master");
            this.inputTopicBroker = this.config.getString("oryx.input-topic.broker");
        }
        this.modelManagerClassName = this.config.getString("oryx.serving.model-manager-class");
        this.updateDecoderClass = ClassUtils.loadClass(this.config.getString("oryx.update-topic.message.decoder-class"), Deserializer.class);
        Preconditions.checkArgument(this.maxMessageSize > 0);
    }

    public void contextInitialized(ServletContextEvent servletContextEvent) {
        log.info("ModelManagerListener initializing");
        ServletContext servletContext = servletContextEvent.getServletContext();
        init(servletContext);
        if (!this.readOnly) {
            Preconditions.checkArgument(KafkaUtils.topicExists(this.inputTopicLockMaster, this.inputTopic), "Topic %s does not exist; did you create it?", new Object[]{this.inputTopic});
            Preconditions.checkArgument(KafkaUtils.topicExists(this.updateTopicLockMaster, this.updateTopic), "Topic %s does not exist; did you create it?", new Object[]{this.updateTopic});
            this.inputProducer = new TopicProducerImpl(this.inputTopicBroker, this.inputTopic);
            servletContext.setAttribute(INPUT_PRODUCER_KEY, this.inputProducer);
        }
        KafkaConsumer kafkaConsumer = new KafkaConsumer(ConfigUtils.keyValueToProperties(new Object[]{"group.id", "OryxGroup-ServingLayer-" + UUID.randomUUID(), "bootstrap.servers", this.updateTopicBroker, "max.partition.fetch.bytes", Integer.valueOf(this.maxMessageSize), "key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer", "value.deserializer", this.updateDecoderClass.getName(), "auto.offset.reset", "earliest", "reconnect.backoff.ms", "1000", "reconnect.backoff.max.ms", "10000"}));
        kafkaConsumer.subscribe(Collections.singletonList(this.updateTopic));
        this.consumerIterator = new ConsumeDataIterator(kafkaConsumer);
        this.modelManager = loadManagerInstance();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        new Thread(LoggingCallable.log(() -> {
            try {
                Configuration configuration = new Configuration();
                countDownLatch.countDown();
                this.modelManager.consume(this.consumerIterator, configuration);
            } catch (Throwable th) {
                log.error("Error while consuming updates", th);
            } finally {
                close();
            }
        }).asRunnable(), "OryxServingLayerUpdateConsumerThread").start();
        try {
            countDownLatch.await(10L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            log.warn("Failed to wait for OryxServingLayerUpdateConsumerThread; continue");
        }
        servletContext.setAttribute(MANAGER_KEY, this.modelManager);
    }

    public void contextDestroyed(ServletContextEvent servletContextEvent) {
        log.info("ModelManagerListener destroying");
        ServletContext servletContext = servletContextEvent.getServletContext();
        Enumeration attributeNames = servletContext.getAttributeNames();
        while (attributeNames.hasMoreElements()) {
            servletContext.removeAttribute((String) attributeNames.nextElement());
        }
        close();
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public synchronized void close() {
        log.info("ModelManagerListener closing");
        if (this.modelManager != null) {
            log.info("Shutting down model manager");
            this.modelManager.close();
            this.modelManager = null;
        }
        if (this.inputProducer != null) {
            log.info("Shutting down input producer");
            this.inputProducer.close();
            this.inputProducer = null;
        }
        if (this.consumerIterator != null) {
            log.info("Shutting down consumer");
            this.consumerIterator.close();
            this.consumerIterator = null;
        }
    }

    private ServingModelManager<U> loadManagerInstance() {
        Class loadClass = ClassUtils.loadClass(this.modelManagerClassName);
        if (ServingModelManager.class.isAssignableFrom(loadClass)) {
            try {
                return (ServingModelManager) ClassUtils.loadInstanceOf(this.modelManagerClassName, ServingModelManager.class, new Class[]{Config.class}, new Object[]{this.config});
            } catch (IllegalArgumentException e) {
                return (ServingModelManager) ClassUtils.loadInstanceOf(this.modelManagerClassName, ServingModelManager.class);
            }
        }
        if (!ScalaServingModelManager.class.isAssignableFrom(loadClass)) {
            throw new IllegalArgumentException("Bad manager class: " + loadClass);
        }
        try {
            return new ScalaServingModelManagerAdapter((ScalaServingModelManager) ClassUtils.loadInstanceOf(this.modelManagerClassName, ScalaServingModelManager.class, new Class[]{Config.class}, new Object[]{this.config}));
        } catch (IllegalArgumentException e2) {
            return new ScalaServingModelManagerAdapter((ScalaServingModelManager) ClassUtils.loadInstanceOf(this.modelManagerClassName, ScalaServingModelManager.class));
        }
    }
}
