package ai.konduit.serving.vertx.protocols.kafka.verticle;

import ai.konduit.serving.pipeline.api.data.Data;
import ai.konduit.serving.pipeline.settings.KonduitSettings;
import ai.konduit.serving.vertx.config.KafkaConfiguration;
import ai.konduit.serving.vertx.verticle.InferenceVerticle;
import com.google.common.base.Strings;
import io.vertx.core.AsyncResult;
import io.vertx.core.Promise;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.HttpVersion;
import io.vertx.core.json.JsonObject;
import io.vertx.core.net.PemKeyCertOptions;
import io.vertx.core.net.SelfSignedCertificate;
import io.vertx.kafka.client.consumer.KafkaConsumer;
import io.vertx.kafka.client.consumer.KafkaConsumerRecord;
import io.vertx.kafka.client.producer.KafkaProducer;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import io.vertx.kafka.client.producer.RecordMetadata;
import io.vertx.kafka.client.serialization.BufferSerializer;
import io.vertx.kafka.client.serialization.JsonObjectSerializer;
import java.io.File;
import java.sql.Date;
import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:ai/konduit/serving/vertx/protocols/kafka/verticle/InferenceVerticleKafka.class */
public class InferenceVerticleKafka extends InferenceVerticle {
    private static final Logger log = LoggerFactory.getLogger(InferenceVerticleKafka.class);

    public void start(Promise<Void> promise) throws Exception {
        this.vertx.executeBlocking(promise2 -> {
            try {
                initialize();
                promise2.complete();
            } catch (Exception e) {
                promise2.fail(e);
                promise.fail(e);
            }
        }, asyncResult -> {
            int parseInt;
            if (asyncResult.failed()) {
                if (asyncResult.cause() != null) {
                    promise.fail(asyncResult.cause());
                    return;
                } else {
                    promise.fail("Failed to start. Unknown cause.");
                    return;
                }
            }
            String str = System.getenv("KONDUIT_SERVING_PORT");
            if (str != null) {
                try {
                    parseInt = Integer.parseInt(str);
                } catch (NumberFormatException e) {
                    log.error("Environment variable \"{}={}\" isn't a valid port number.", "KONDUIT_SERVING_PORT", str);
                    promise.fail(e);
                    return;
                }
            } else {
                parseInt = this.inferenceConfiguration.port();
            }
            if (parseInt < 0 || parseInt > 65535) {
                promise.fail(new Exception("Valid port range is 0 <= port <= 65535. The given port was " + parseInt));
                return;
            }
            KafkaConfiguration kafkaConfiguration = this.inferenceConfiguration.kafkaConfiguration();
            HashMap hashMap = new HashMap();
            hashMap.put("bootstrap.servers", String.format("%s:%s", this.inferenceConfiguration.host(), Integer.valueOf(parseInt)));
            hashMap.put("key.deserializer", KonduitSettings.getKafkaConsumerKeyDeserializerClass(kafkaConfiguration != null ? kafkaConfiguration.consumerKeyDeserializerClass() : null));
            hashMap.put("value.deserializer", KonduitSettings.getKafkaConsumerValueDeserializerClass(kafkaConfiguration != null ? kafkaConfiguration.consumerValueDeserializerClass() : null));
            hashMap.put("group.id", KonduitSettings.getConsumerGroupId(kafkaConfiguration != null ? kafkaConfiguration.consumerGroupId() : null));
            hashMap.put("auto.offset.reset", KonduitSettings.getConsumerAutoOffsetReset(kafkaConfiguration != null ? kafkaConfiguration.consumerAutoOffsetReset() : null));
            hashMap.put("enable.auto.commit", KonduitSettings.getConsumerAutoCommit(kafkaConfiguration != null ? kafkaConfiguration.consumerAutoCommit() : null));
            String kafkaProducerValueSerializerClass = KonduitSettings.getKafkaProducerValueSerializerClass(kafkaConfiguration != null ? kafkaConfiguration.producerValueSerializerClass() : null);
            HashMap hashMap2 = new HashMap();
            hashMap2.put("bootstrap.servers", String.format("%s:%s", this.inferenceConfiguration.host(), Integer.valueOf(parseInt)));
            hashMap2.put("key.serializer", KonduitSettings.getKafkaProducerKeySerializerClass(kafkaConfiguration != null ? kafkaConfiguration.producerKeySerializerClass() : null));
            hashMap2.put("value.serializer", kafkaProducerValueSerializerClass);
            hashMap2.put("acks", KonduitSettings.getProducerAcks(kafkaConfiguration != null ? kafkaConfiguration.producerAcks() : null));
            KafkaConsumer create = KafkaConsumer.create(this.vertx, hashMap);
            KafkaProducer create2 = KafkaProducer.create(this.vertx, hashMap2);
            create.handler(obj -> {
                Data exec;
                KafkaProducerRecord create3;
                KafkaConsumerRecord kafkaConsumerRecord = (KafkaConsumerRecord) obj;
                Object value = kafkaConsumerRecord.value();
                log.debug("Processing input from topic: {} at {}. Headers={}, Key={}, Value={}, Partition={}, Offset={}", new Object[]{kafkaConsumerRecord.topic(), Date.from(Instant.ofEpochMilli(kafkaConsumerRecord.timestamp())), kafkaConsumerRecord.headers(), kafkaConsumerRecord.key(), value, Integer.valueOf(kafkaConsumerRecord.partition()), Long.valueOf(kafkaConsumerRecord.offset())});
                if (value instanceof Buffer) {
                    exec = this.pipelineExecutor.exec(Data.fromBytes(((Buffer) value).getBytes()));
                } else if (value instanceof JsonObject) {
                    exec = this.pipelineExecutor.exec(Data.fromJson(((JsonObject) value).encode()));
                } else {
                    if (!(value instanceof String)) {
                        throw new IllegalStateException("No conversion format exist for input value class type: " + value.getClass().getCanonicalName());
                    }
                    exec = this.pipelineExecutor.exec(Data.fromJson((String) value));
                }
                String producerTopicName = KonduitSettings.getProducerTopicName(kafkaConfiguration != null ? kafkaConfiguration.producerTopicName() : null);
                if (kafkaProducerValueSerializerClass.equals(BufferSerializer.class.getCanonicalName())) {
                    create3 = KafkaProducerRecord.create(producerTopicName, Buffer.buffer(exec.asBytes()));
                } else if (kafkaProducerValueSerializerClass.equals(JsonObjectSerializer.class.getCanonicalName())) {
                    create3 = KafkaProducerRecord.create(producerTopicName, new JsonObject(exec.toJson()));
                } else {
                    if (!kafkaProducerValueSerializerClass.equals(StringSerializer.class.getCanonicalName())) {
                        throw new IllegalStateException("No conversion format exist for output value class type: " + kafkaProducerValueSerializerClass);
                    }
                    create3 = KafkaProducerRecord.create(producerTopicName, exec.toJson());
                }
                KafkaProducerRecord kafkaProducerRecord = create3;
                create2.send(create3, obj -> {
                    AsyncResult asyncResult = (AsyncResult) obj;
                    if (asyncResult.succeeded()) {
                        log.debug("Sent output to topic: {} at {}. Headers={}, Key={}, Value={}, Partition={}, Offset={}", new Object[]{kafkaProducerRecord.topic(), Date.from(Instant.ofEpochMilli(kafkaProducerRecord.timestamp().longValue())), kafkaProducerRecord.headers(), kafkaProducerRecord.key(), kafkaProducerRecord.value(), kafkaProducerRecord.partition(), Long.valueOf(((RecordMetadata) asyncResult.result()).getOffset())});
                    } else {
                        log.error("Failed to send output to topic: {} at {}. Headers={}, Key={}, Value={}, Partition={}", new Object[]{kafkaProducerRecord.topic(), Date.from(Instant.ofEpochMilli(kafkaProducerRecord.timestamp().longValue())), kafkaProducerRecord.headers(), kafkaProducerRecord.key(), kafkaProducerRecord.value(), kafkaProducerRecord.partition(), asyncResult.cause()});
                    }
                });
            });
            String consumerTopicName = KonduitSettings.getConsumerTopicName(kafkaConfiguration != null ? kafkaConfiguration.consumerTopicName() : null);
            create.subscribe(consumerTopicName, obj2 -> {
                AsyncResult asyncResult = (AsyncResult) obj2;
                if (!asyncResult.succeeded()) {
                    log.error("Could not subscribe to topic: {}", consumerTopicName, asyncResult.cause());
                    promise.fail(asyncResult.cause());
                    return;
                }
                log.info("Subscribed to topic: {}", consumerTopicName);
                if (!KonduitSettings.getStartHttpServerForKafka(kafkaConfiguration != null ? kafkaConfiguration.startHttpServerForKafka() : true)) {
                    saveInspectionDataIfRequired(getPid());
                    promise.complete();
                    return;
                }
                String httpKafkaHost = KonduitSettings.getHttpKafkaHost(kafkaConfiguration != null ? kafkaConfiguration.httpKafkaHost() : "localhost");
                int httpKafkaPort = KonduitSettings.getHttpKafkaPort(kafkaConfiguration != null ? kafkaConfiguration.httpKafkaPort() : 0);
                log.info("Starting HTTP server for kafka on host {} and port {}", httpKafkaHost, Integer.valueOf(httpKafkaPort));
                HttpServerOptions useAlpn = new HttpServerOptions().setPort(httpKafkaPort).setHost(httpKafkaHost).setSsl(false).setSslHandshakeTimeout(0L).setCompressionSupported(true).setTcpKeepAlive(true).setTcpNoDelay(true).setAlpnVersions(Arrays.asList(HttpVersion.HTTP_1_0, HttpVersion.HTTP_1_1)).setUseAlpn(false);
                boolean useSsl = this.inferenceConfiguration.useSsl();
                String sslKeyPath = this.inferenceConfiguration.sslKeyPath();
                String sslCertificatePath = this.inferenceConfiguration.sslCertificatePath();
                if (useSsl) {
                    if (Strings.isNullOrEmpty(sslKeyPath) || Strings.isNullOrEmpty(sslCertificatePath)) {
                        if (Strings.isNullOrEmpty(sslKeyPath)) {
                            log.warn("No pem key file specified for SSL.");
                        }
                        if (Strings.isNullOrEmpty(sslCertificatePath)) {
                            log.warn("No pem certificate file specified for SSL.");
                        }
                        log.info("Using an auto generated self signed pem key and certificate with SSL.");
                        useAlpn.setKeyCertOptions(SelfSignedCertificate.create().keyCertOptions());
                    } else {
                        String absolutePath = new File(sslKeyPath).getAbsolutePath();
                        String absolutePath2 = new File(sslCertificatePath).getAbsolutePath();
                        log.info("Using SSL with PEM Key: {} and certificate {}.", absolutePath, absolutePath2);
                        useAlpn.setPemKeyCertOptions(new PemKeyCertOptions().setKeyPath(absolutePath).setCertPath(absolutePath2));
                    }
                }
                this.vertx.createHttpServer(useAlpn).requestHandler(httpServerRequest -> {
                    if (httpServerRequest.path().equals("/health")) {
                        httpServerRequest.response().end("Kafka server running");
                    } else {
                        httpServerRequest.response().setStatusCode(404).end("Route not implemented");
                    }
                }).exceptionHandler(th -> {
                    log.error("Error occurred during http request.", th);
                }).listen(httpKafkaPort, httpKafkaHost, asyncResult2 -> {
                    if (asyncResult2.failed()) {
                        promise.fail(asyncResult2.cause());
                        return;
                    }
                    int actualPort = ((HttpServer) asyncResult2.result()).actualPort();
                    if (this.inferenceConfiguration.kafkaConfiguration() == null) {
                        this.inferenceConfiguration.kafkaConfiguration(new KafkaConfiguration());
                    }
                    this.inferenceConfiguration.kafkaConfiguration().httpKafkaPort(actualPort);
                    try {
                        this.context.getDeployment().deploymentOptions().setConfig(new JsonObject(this.inferenceConfiguration.toJson()));
                        saveInspectionDataIfRequired(getPid());
                        log.info("HTTP server for kafka is listening on host: '{}'", this.inferenceConfiguration.host());
                        log.info("HTTP server for kafka started on port {}", Integer.valueOf(actualPort));
                        promise.complete();
                    } catch (Throwable th2) {
                        promise.fail(th2);
                    }
                });
            });
        });
    }
}
