package it.agilelab.bigdata.wasp.core.kafka;

import akka.actor.Actor;
import akka.actor.ActorContext;
import akka.actor.ActorRef;
import akka.actor.Status;
import akka.actor.SupervisorStrategy;
import akka.actor.package$;
import it.agilelab.bigdata.wasp.core.logging.Logging;
import it.agilelab.bigdata.wasp.core.logging.WaspLogger;
import it.agilelab.bigdata.wasp.core.logging.WaspLogger$;
import it.agilelab.bigdata.wasp.models.configuration.KafkaConfigModel;
import it.agilelab.bigdata.wasp.models.configuration.KafkaEntryConfig;
import java.util.Collections;
import java.util.Properties;
import java.util.Set;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import scala.Function1;
import scala.Option;
import scala.PartialFunction;
import scala.Predef$;
import scala.Predef$any2stringadd$;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.reflect.ScalaSignature;
import scala.runtime.BooleanRef;
import scala.runtime.BoxedUnit;

/* compiled from: NewKafkaAdminActor.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005\u001dw!B\u000f\u001f\u0011\u0003Yc!B\u0017\u001f\u0011\u0003q\u0003\"B\u001b\u0002\t\u00031\u0004bB\u001c\u0002\u0005\u0004%\t\u0001\u000f\u0005\u0007\u0003\u0006\u0001\u000b\u0011B\u001d\t\u000f\t\u000b!\u0019!C\u0001q!11)\u0001Q\u0001\neBq\u0001R\u0001C\u0002\u0013\u0005Q\t\u0003\u0004J\u0003\u0001\u0006IA\u0012\u0005\b\u0015\u0006\u0011\r\u0011\"\u0001F\u0011\u0019Y\u0015\u0001)A\u0005\r\"9A*\u0001b\u0001\n\u0003)\u0005BB'\u0002A\u0003%a\tC\u0004O\u0003\t\u0007I\u0011A#\t\r=\u000b\u0001\u0015!\u0003G\r\u0011ic\u0004\u0001)\t\u000bUzA\u0011A0\t\u0013\u0005|\u0001\u0019!a\u0001\n\u0003\u0011\u0007\"\u00039\u0010\u0001\u0004\u0005\r\u0011\"\u0001r\u0011%9x\u00021A\u0001B\u0003&1\rC\u0003y\u001f\u0011\u0005\u0013\u0010C\u0004\u0002\u0004=!\t!!\u0002\t\u000f\u0005]q\u0002\"\u0011\u0002\u001a!9\u00111D\b\u0005\n\u0005u\u0001bBA%\u001f\u0011%\u00111\n\u0005\b\u0003+zA\u0011BA,\u0011\u001d\t\tg\u0004C\u0005\u0003GBq!!\u001c\u0010\t\u0013\ty\u0007C\u0004\u0002z=!I!a\u001f\u0002%9+woS1gW\u0006\fE-\\5o\u0003\u000e$xN\u001d\u0006\u0003?\u0001\nQa[1gW\u0006T!!\t\u0012\u0002\t\r|'/\u001a\u0006\u0003G\u0011\nAa^1ta*\u0011QEJ\u0001\bE&<G-\u0019;b\u0015\t9\u0003&\u0001\u0005bO&dW\r\\1c\u0015\u0005I\u0013AA5u\u0007\u0001\u0001\"\u0001L\u0001\u000e\u0003y\u0011!CT3x\u0017\u000647.Y!e[&t\u0017i\u0019;peN\u0011\u0011a\f\t\u0003aMj\u0011!\r\u0006\u0002e\u0005)1oY1mC&\u0011A'\r\u0002\u0007\u0003:L(+\u001a4\u0002\rqJg.\u001b;?)\u0005Y\u0013\u0001\u00028b[\u0016,\u0012!\u000f\t\u0003u}j\u0011a\u000f\u0006\u0003yu\nA\u0001\\1oO*\ta(\u0001\u0003kCZ\f\u0017B\u0001!<\u0005\u0019\u0019FO]5oO\u0006)a.Y7fA\u0005)Ao\u001c9jG\u00061Ao\u001c9jG\u0002\nab]3tg&|g\u000eV5nK>,H/F\u0001G!\t\u0001t)\u0003\u0002Ic\t\u0019\u0011J\u001c;\u0002\u001fM,7o]5p]RKW.Z8vi\u0002\n\u0011cY8o]\u0016\u001cG/[8o)&lWm\\;u\u0003I\u0019wN\u001c8fGRLwN\u001c+j[\u0016|W\u000f\u001e\u0011\u0002\u0015A\f'\u000f^5uS>t7/A\u0006qCJ$\u0018\u000e^5p]N\u0004\u0013\u0001\u0003:fa2L7-Y:\u0002\u0013I,\u0007\u000f\\5dCN\u00043\u0003B\b0#f\u0003\"AU,\u000e\u0003MS!\u0001V+\u0002\u000b\u0005\u001cGo\u001c:\u000b\u0003Y\u000bA!Y6lC&\u0011\u0001l\u0015\u0002\u0006\u0003\u000e$xN\u001d\t\u00035vk\u0011a\u0017\u0006\u00039\u0002\nq\u0001\\8hO&tw-\u0003\u0002_7\n9Aj\\4hS:<G#\u00011\u0011\u00051z\u0011aC1e[&t7\t\\5f]R,\u0012a\u0019\t\u0003I:l\u0011!\u001a\u0006\u0003M\u001e\fQ!\u00193nS:T!\u0001[5\u0002\u000f\rd\u0017.\u001a8ug*\u0011qD\u001b\u0006\u0003W2\fa!\u00199bG\",'\"A7\u0002\u0007=\u0014x-\u0003\u0002pK\nY\u0011\tZ7j]\u000ec\u0017.\u001a8u\u0003=\tG-\\5o\u00072LWM\u001c;`I\u0015\fHC\u0001:v!\t\u00014/\u0003\u0002uc\t!QK\\5u\u0011\u001d1(#!AA\u0002\r\f1\u0001\u001f\u00132\u00031\tG-\\5o\u00072LWM\u001c;!\u0003\u001d\u0011XmY3jm\u0016,\u0012A\u001f\t\u0003wzt!A\u0015?\n\u0005u\u001c\u0016!B!di>\u0014\u0018bA@\u0002\u0002\t9!+Z2fSZ,'BA?T\u00039Ig.\u001b;jC2L'0\u0019;j_:$B!a\u0002\u0002\u000eA\u0019\u0001'!\u0003\n\u0007\u0005-\u0011GA\u0004C_>dW-\u00198\t\u000f\u0005=Q\u00031\u0001\u0002\u0012\u00059Q.Z:tC\u001e,\u0007c\u0001\u0017\u0002\u0014%\u0019\u0011Q\u0003\u0010\u0003\u001d%s\u0017\u000e^5bY&T\u0018\r^5p]\u0006A\u0001o\\:u'R|\u0007\u000fF\u0001s\u0003\u0011\u0019\u0017\r\u001c7\u0016\t\u0005}\u0011q\u0005\u000b\u0006e\u0006\u0005\u0012\u0011\b\u0005\b\u0003\u001f9\u0002\u0019AA\u0012!\u0011\t)#a\n\r\u0001\u00119\u0011\u0011F\fC\u0002\u0005-\"!\u0001+\u0012\t\u00055\u00121\u0007\t\u0004a\u0005=\u0012bAA\u0019c\t9aj\u001c;iS:<\u0007c\u0001\u0017\u00026%\u0019\u0011q\u0007\u0010\u0003#-\u000bgm[1BI6Lg.T3tg\u0006<W\rC\u0004\u0002<]\u0001\r!!\u0010\u0002\u0003\u0019\u0004r\u0001MA \u0003G\t\u0019%C\u0002\u0002BE\u0012\u0011BR;oGRLwN\\\u0019\u0011\u0007A\n)%C\u0002\u0002HE\u00121!\u00118z\u0003I\u0019\u0007.Z2l\u001fJ\u001c%/Z1uKR{\u0007/[2\u0015\t\u0005\u001d\u0011Q\n\u0005\b\u0003\u001fA\u0002\u0019AA(!\ra\u0013\u0011K\u0005\u0004\u0003'r\"AE\"iK\u000e\\wJ]\"sK\u0006$X\rV8qS\u000e\f\u0001\"\u00193e)>\u0004\u0018n\u0019\u000b\u0005\u0003\u000f\tI\u0006C\u0004\u0002\u0010e\u0001\r!a\u0017\u0011\u00071\ni&C\u0002\u0002`y\u0011\u0001\"\u00113e)>\u0004\u0018nY\u0001\u000bG\",7m\u001b+pa&\u001cG\u0003BA\u0004\u0003KBq!a\u0004\u001b\u0001\u0004\t9\u0007E\u0002-\u0003SJ1!a\u001b\u001f\u0005)\u0019\u0005.Z2l)>\u0004\u0018nY\u0001\fe\u0016lwN^3U_BL7\r\u0006\u0003\u0002\b\u0005E\u0004bBA\b7\u0001\u0007\u00111\u000f\t\u0004Y\u0005U\u0014bAA<=\tY!+Z7pm\u0016$v\u000e]5d\u00031\u0019'/Z1uK\u000e{gNZ5h)\u0019\ti(!#\u0002(B!\u0011qPAC\u001b\t\t\tIC\u0002\u0002\u0004v\nA!\u001e;jY&!\u0011qQAA\u0005)\u0001&o\u001c9feRLWm\u001d\u0005\b\u0003\u0017c\u0002\u0019AAG\u0003\u001d\u0011'o\\6feN\u0004b!a$\u0002\u001e\u0006\rf\u0002BAI\u00033\u00032!a%2\u001b\t\t)JC\u0002\u0002\u0018*\na\u0001\u0010:p_Rt\u0014bAANc\u00051\u0001K]3eK\u001aLA!a(\u0002\"\n\u00191+\u001a;\u000b\u0007\u0005m\u0015\u0007\u0005\u0003\u0002\u0010\u0006\u0015\u0016b\u0001!\u0002\"\"9\u0011\u0011\u0016\u000fA\u0002\u0005-\u0016AB8uQ\u0016\u00148\u000f\u0005\u0004\u0002.\u0006M\u0016qW\u0007\u0003\u0003_S1!!-2\u0003)\u0019w\u000e\u001c7fGRLwN\\\u0005\u0005\u0003k\u000byKA\u0002TKF\u0004B!!/\u0002D6\u0011\u00111\u0018\u0006\u0005\u0003{\u000by,A\u0007d_:4\u0017nZ;sCRLwN\u001c\u0006\u0004\u0003\u0003\u0014\u0013AB7pI\u0016d7/\u0003\u0003\u0002F\u0006m&\u0001E&bM.\fWI\u001c;ss\u000e{gNZ5h\u0001")
/* loaded from: input_file:it/agilelab/bigdata/wasp/core/kafka/NewKafkaAdminActor.class */
public class NewKafkaAdminActor implements Actor, Logging {
    private AdminClient adminClient;
    private final WaspLogger logger;
    private final ActorContext context;
    private final ActorRef self;

    public static int replicas() {
        return NewKafkaAdminActor$.MODULE$.replicas();
    }

    public static int partitions() {
        return NewKafkaAdminActor$.MODULE$.partitions();
    }

    public static int connectionTimeout() {
        return NewKafkaAdminActor$.MODULE$.connectionTimeout();
    }

    public static int sessionTimeout() {
        return NewKafkaAdminActor$.MODULE$.sessionTimeout();
    }

    public static String topic() {
        return NewKafkaAdminActor$.MODULE$.topic();
    }

    public static String name() {
        return NewKafkaAdminActor$.MODULE$.name();
    }

    public final ActorRef sender() {
        return Actor.sender$(this);
    }

    public void aroundReceive(PartialFunction<Object, BoxedUnit> partialFunction, Object obj) {
        Actor.aroundReceive$(this, partialFunction, obj);
    }

    public void aroundPreStart() {
        Actor.aroundPreStart$(this);
    }

    public void aroundPostStop() {
        Actor.aroundPostStop$(this);
    }

    public void aroundPreRestart(Throwable th, Option<Object> option) {
        Actor.aroundPreRestart$(this, th, option);
    }

    public void aroundPostRestart(Throwable th) {
        Actor.aroundPostRestart$(this, th);
    }

    public SupervisorStrategy supervisorStrategy() {
        return Actor.supervisorStrategy$(this);
    }

    public void preStart() throws Exception {
        Actor.preStart$(this);
    }

    public void preRestart(Throwable th, Option<Object> option) throws Exception {
        Actor.preRestart$(this, th, option);
    }

    public void postRestart(Throwable th) throws Exception {
        Actor.postRestart$(this, th);
    }

    public void unhandled(Object obj) {
        Actor.unhandled$(this, obj);
    }

    @Override // it.agilelab.bigdata.wasp.core.logging.Logging
    public WaspLogger logger() {
        return this.logger;
    }

    @Override // it.agilelab.bigdata.wasp.core.logging.Logging
    public void it$agilelab$bigdata$wasp$core$logging$Logging$_setter_$logger_$eq(WaspLogger waspLogger) {
        this.logger = waspLogger;
    }

    public ActorContext context() {
        return this.context;
    }

    public final ActorRef self() {
        return this.self;
    }

    public void akka$actor$Actor$_setter_$context_$eq(ActorContext actorContext) {
        this.context = actorContext;
    }

    public final void akka$actor$Actor$_setter_$self_$eq(ActorRef actorRef) {
        this.self = actorRef;
    }

    public AdminClient adminClient() {
        return this.adminClient;
    }

    public void adminClient_$eq(AdminClient adminClient) {
        this.adminClient = adminClient;
    }

    public PartialFunction<Object, BoxedUnit> receive() {
        return new NewKafkaAdminActor$$anonfun$receive$1(this);
    }

    public boolean initialization(Initialization initialization) {
        KafkaConfigModel kafkaConfigModel = initialization.kafkaConfigModel();
        logger().info(() -> {
            return new StringBuilder(46).append("Before create a zookeeper client with config: ").append(kafkaConfigModel).toString();
        });
        try {
            adminClient_$eq(AdminClient.create(createConfig(((TraversableOnce) kafkaConfigModel.connections().map(connectionConfig -> {
                return connectionConfig.toString();
            }, Seq$.MODULE$.canBuildFrom())).toSet(), kafkaConfigModel.others())));
            logger().info(() -> {
                return new StringBuilder(25).append("New kafka client created ").append(this.adminClient()).toString();
            });
            return true;
        } catch (Throwable th) {
            logger().error(() -> {
                return new StringBuilder(23).append("KafkaAdminClient error ").append(th).toString();
            });
            package$.MODULE$.actorRef2Scala(sender()).$bang(new Status.Failure(th), self());
            throw th;
        }
    }

    public void postStop() {
        if (adminClient() != null) {
            adminClient().close();
        }
        adminClient_$eq(null);
        logger().debug(() -> {
            return "zookeeper client stopped";
        });
    }

    public <T extends KafkaAdminMessage> void it$agilelab$bigdata$wasp$core$kafka$NewKafkaAdminActor$$call(T t, Function1<T, Object> function1) {
        Object apply = function1.apply(t);
        logger().info(() -> {
            return new StringBuilder(0).append(Predef$any2stringadd$.MODULE$.$plus$extension(Predef$.MODULE$.any2stringadd(t), ": ")).append(apply).toString();
        });
        package$.MODULE$.actorRef2Scala(sender()).$bang(apply, self());
    }

    public boolean it$agilelab$bigdata$wasp$core$kafka$NewKafkaAdminActor$$checkOrCreateTopic(CheckOrCreateTopic checkOrCreateTopic) {
        logger().info(() -> {
            return new StringBuilder(13).append("checkTopic , ").append(checkOrCreateTopic).toString();
        });
        BooleanRef create = BooleanRef.create(it$agilelab$bigdata$wasp$core$kafka$NewKafkaAdminActor$$checkTopic(new CheckTopic(checkOrCreateTopic.topic())));
        logger().info(() -> {
            return new StringBuilder(22).append("checkOrCreateTopic ").append(create.elem).append(" , ").append(checkOrCreateTopic).toString();
        });
        if (!create.elem) {
            create.elem = it$agilelab$bigdata$wasp$core$kafka$NewKafkaAdminActor$$addTopic(new AddTopic(checkOrCreateTopic.topic(), checkOrCreateTopic.partitions(), checkOrCreateTopic.replicas()));
        }
        return create.elem;
    }

    public boolean it$agilelab$bigdata$wasp$core$kafka$NewKafkaAdminActor$$addTopic(AddTopic addTopic) {
        try {
            adminClient().createTopics(Collections.singleton(new NewTopic(addTopic.topic(), addTopic.partitions(), (short) addTopic.replicas()))).all().get();
            logger().info(() -> {
                return new StringBuilder(14).append("Created topic ").append(addTopic.topic()).toString();
            });
            return true;
        } catch (Throwable th) {
            String sb = new StringBuilder(26).append("Error in topic '").append(addTopic.topic()).append("' creation").toString();
            logger().error(() -> {
                return sb;
            }, th);
            return false;
        }
    }

    public boolean it$agilelab$bigdata$wasp$core$kafka$NewKafkaAdminActor$$checkTopic(CheckTopic checkTopic) {
        try {
            return ((Set) adminClient().listTopics().names().get()).contains(checkTopic.topic());
        } catch (Throwable th) {
            logger().error(() -> {
                return new StringBuilder(19).append("List topic '").append(checkTopic.topic()).append("' error").toString();
            }, th);
            return false;
        }
    }

    public boolean it$agilelab$bigdata$wasp$core$kafka$NewKafkaAdminActor$$removeTopic(RemoveTopic removeTopic) {
        try {
            adminClient().deleteTopics(Collections.singleton(removeTopic.topic())).all().get();
            logger().info(() -> {
                return new StringBuilder(14).append("Removed topic ").append(removeTopic.topic()).toString();
            });
            return true;
        } catch (Throwable th) {
            logger().error(() -> {
                return new StringBuilder(26).append("Error in topic '").append(removeTopic.topic()).append("' creation").toString();
            }, th);
            return false;
        }
    }

    private Properties createConfig(scala.collection.immutable.Set<String> set, Seq<KafkaEntryConfig> seq) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", set.mkString(","));
        seq.foreach(kafkaEntryConfig -> {
            return properties.put(kafkaEntryConfig.key(), kafkaEntryConfig.value());
        });
        return properties;
    }

    public NewKafkaAdminActor() {
        Actor.$init$(this);
        it$agilelab$bigdata$wasp$core$logging$Logging$_setter_$logger_$eq(WaspLogger$.MODULE$.apply(getClass()));
    }
}
