package pl.touk.nussknacker.engine.kafka;

import com.typesafe.scalalogging.LazyLogging;
import com.typesafe.scalalogging.Logger;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import pl.touk.nussknacker.engine.util.ThreadUtils$;
import scala.Function1;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.StringContext;
import scala.collection.IterableLike;
import scala.collection.JavaConversions$;
import scala.collection.immutable.List;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.concurrent.Await$;
import scala.concurrent.ExecutionContext$Implicits$;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.Promise;
import scala.concurrent.Promise$;
import scala.concurrent.duration.Duration$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Failure;
import scala.util.Success;

/* compiled from: KafkaEspUtils.scala */
/* loaded from: input_file:pl/touk/nussknacker/engine/kafka/KafkaEspUtils$.class */
public final class KafkaEspUtils$ implements LazyLogging {
    public static final KafkaEspUtils$ MODULE$ = null;
    private final int defaultTimeoutMillis;
    private final Logger logger;
    private volatile boolean bitmap$0;

    static {
        new KafkaEspUtils$();
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v5 */
    private Logger logger$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (!this.bitmap$0) {
                this.logger = LazyLogging.Cclass.logger(this);
                this.bitmap$0 = true;
            }
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            r0 = r0;
            return this.logger;
        }
    }

    @Override // com.typesafe.scalalogging.LazyLogging
    public Logger logger() {
        return this.bitmap$0 ? this.logger : logger$lzycompute();
    }

    public int defaultTimeoutMillis() {
        return this.defaultTimeoutMillis;
    }

    public void setOffsetToLatest(String str, String str2, KafkaConfig kafkaConfig) {
        long readTimeout = readTimeout(kafkaConfig);
        if (logger().underlying().isInfoEnabled()) {
            logger().underlying().info(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Setting offset to latest for topic: ", ", groupId: ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{str, str2})));
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
        Await$.MODULE$.result(Future$.MODULE$.apply(new KafkaEspUtils$$anonfun$1(str, str2, kafkaConfig), ExecutionContext$Implicits$.MODULE$.global()), Duration$.MODULE$.apply(readTimeout, TimeUnit.MILLISECONDS));
    }

    public Properties toProperties(KafkaConfig kafkaConfig, Option<String> option) {
        Properties properties = new Properties();
        properties.setProperty("zookeeper.connect", kafkaConfig.zkAddress());
        properties.setProperty("bootstrap.servers", kafkaConfig.kafkaAddress());
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        option.foreach(new KafkaEspUtils$$anonfun$toProperties$1(properties));
        kafkaConfig.kafkaProperties().map(new KafkaEspUtils$$anonfun$toProperties$2()).foreach(new KafkaEspUtils$$anonfun$toProperties$3(properties));
        return properties;
    }

    public Properties toProducerProperties(KafkaConfig kafkaConfig) {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", kafkaConfig.kafkaAddress());
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
        properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
        properties.setProperty("retries", "0");
        properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
        properties.setProperty(ProducerConfig.LINGER_MS_CONFIG, "1");
        properties.setProperty(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
        ((IterableLike) kafkaConfig.kafkaProperties().getOrElse(new KafkaEspUtils$$anonfun$toProducerProperties$1())).foreach(new KafkaEspUtils$$anonfun$toProducerProperties$2(properties));
        return properties;
    }

    public Properties pl$touk$nussknacker$engine$kafka$KafkaEspUtils$$toPropertiesForTempConsumer(KafkaConfig kafkaConfig, Option<String> option) {
        Properties properties = toProperties(kafkaConfig, option);
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
        properties.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, BoxesRunTime.boxToLong(readTimeout(kafkaConfig)).toString());
        return properties;
    }

    public List<byte[]> readLastMessages(String str, int i, KafkaConfig kafkaConfig) {
        return (List) pl$touk$nussknacker$engine$kafka$KafkaEspUtils$$doWithTempKafkaConsumer(kafkaConfig, None$.MODULE$, new KafkaEspUtils$$anonfun$readLastMessages$1(str, i));
    }

    public <T> T pl$touk$nussknacker$engine$kafka$KafkaEspUtils$$doWithTempKafkaConsumer(KafkaConfig kafkaConfig, Option<String> option, Function1<KafkaConsumer<byte[], byte[]>, T> function1) {
        return (T) ThreadUtils$.MODULE$.withThisAsContextClassLoader(KafkaClient.class.getClassLoader(), new KafkaEspUtils$$anonfun$pl$touk$nussknacker$engine$kafka$KafkaEspUtils$$doWithTempKafkaConsumer$1(kafkaConfig, option, function1));
    }

    private long readTimeout(KafkaConfig kafkaConfig) {
        return BoxesRunTime.unboxToLong(kafkaConfig.kafkaProperties().flatMap(new KafkaEspUtils$$anonfun$readTimeout$2()).getOrElse(new KafkaEspUtils$$anonfun$readTimeout$1()));
    }

    public void pl$touk$nussknacker$engine$kafka$KafkaEspUtils$$setOffsetToLatest(String str, KafkaConsumer<?, ?> kafkaConsumer) {
        Buffer buffer = (Buffer) JavaConversions$.MODULE$.asScalaBuffer(kafkaConsumer.partitionsFor(str)).map(new KafkaEspUtils$$anonfun$2(), Buffer$.MODULE$.canBuildFrom());
        kafkaConsumer.assign(JavaConversions$.MODULE$.bufferAsJavaList(buffer));
        kafkaConsumer.seekToEnd(JavaConversions$.MODULE$.bufferAsJavaList(buffer));
        buffer.foreach(new KafkaEspUtils$$anonfun$pl$touk$nussknacker$engine$kafka$KafkaEspUtils$$setOffsetToLatest$1(kafkaConsumer));
        kafkaConsumer.commitSync();
    }

    public Future<RecordMetadata> sendToKafkaWithTempProducer(String str, byte[] bArr, byte[] bArr2, KafkaConfig kafkaConfig) {
        KafkaProducer<byte[], byte[]> kafkaProducer = null;
        try {
            kafkaProducer = createProducer(kafkaConfig);
            Future<RecordMetadata> sendToKafka = sendToKafka(str, bArr, bArr2, kafkaProducer);
            if (kafkaProducer != null) {
                kafkaProducer.close();
            }
            return sendToKafka;
        } catch (Throwable th) {
            if (kafkaProducer != null) {
                kafkaProducer.close();
            }
            throw th;
        }
    }

    public <K, V> Future<RecordMetadata> sendToKafka(String str, K k, V v, KafkaProducer<K, V> kafkaProducer) {
        Promise<RecordMetadata> apply = Promise$.MODULE$.apply();
        kafkaProducer.send(new ProducerRecord<>(str, k, v), producerCallback(apply));
        return apply.future();
    }

    public KafkaProducer<byte[], byte[]> createProducer(KafkaConfig kafkaConfig) {
        return new KafkaProducer<>(toProducerProperties(kafkaConfig));
    }

    public Callback producerCallback(final Promise<RecordMetadata> promise) {
        return new Callback(promise) { // from class: pl.touk.nussknacker.engine.kafka.KafkaEspUtils$$anon$1
            private final Promise promise$1;

            @Override // org.apache.kafka.clients.producer.Callback
            public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                this.promise$1.complete(exc == null ? new Success(recordMetadata) : new Failure(exc));
            }

            {
                this.promise$1 = promise;
            }
        };
    }

    private KafkaEspUtils$() {
        MODULE$ = this;
        LazyLogging.Cclass.$init$(this);
        this.defaultTimeoutMillis = 10000;
    }
}
