package it.agilelab.bigdata.wasp.core.kafka;

import akka.actor.ActorRef;
import akka.actor.ScalaActorRef;
import it.agilelab.bigdata.wasp.core.logging.Logging;
import it.agilelab.bigdata.wasp.core.logging.WaspLogger;
import it.agilelab.bigdata.wasp.core.logging.WaspLogger$;
import it.agilelab.bigdata.wasp.models.configuration.KafkaConfigModel;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2;
import scala.collection.IterableLike;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.convert.package$;
import scala.collection.immutable.Set;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;

/* compiled from: WaspKafkaReader.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005ua\u0001\u0002\u0007\u000e\u0001iA\u0001\u0002\u000b\u0001\u0003\u0002\u0003\u0006I!\u000b\u0005\u0006c\u0001!\tA\r\u0005\u0006c\u0001!\t\u0001\u0012\u0005\b=\u0002\u0011\r\u0011\"\u0003`\u0011\u0019a\u0007\u0001)A\u0005A\")Q\u000e\u0001C\u0001]\")a\u0010\u0001C\u0001\u007f\u001e9\u0011\u0011A\u0007\t\u0002\u0005\raA\u0002\u0007\u000e\u0011\u0003\t)\u0001\u0003\u00042\u0013\u0011\u0005\u0011q\u0001\u0005\b\u0003\u0013IA\u0011AA\u0006\u0005=9\u0016m\u001d9LC\u001a\\\u0017MU3bI\u0016\u0014(B\u0001\b\u0010\u0003\u0015Y\u0017MZ6b\u0015\t\u0001\u0012#\u0001\u0003d_J,'B\u0001\n\u0014\u0003\u00119\u0018m\u001d9\u000b\u0005Q)\u0012a\u00022jO\u0012\fG/\u0019\u0006\u0003-]\t\u0001\"Y4jY\u0016d\u0017M\u0019\u0006\u00021\u0005\u0011\u0011\u000e^\u0002\u0001+\rYr'Q\n\u0004\u0001q\u0011\u0003CA\u000f!\u001b\u0005q\"\"A\u0010\u0002\u000bM\u001c\u0017\r\\1\n\u0005\u0005r\"AB!osJ+g\r\u0005\u0002$M5\tAE\u0003\u0002&\u001f\u00059An\\4hS:<\u0017BA\u0014%\u0005\u001daunZ4j]\u001e\fabY8ogVlWM]\"p]\u001aLw\r\u0005\u0002+_5\t1F\u0003\u0002-[\u0005!Q\u000f^5m\u0015\u0005q\u0013\u0001\u00026bm\u0006L!\u0001M\u0016\u0003\u0015A\u0013x\u000e]3si&,7/\u0001\u0004=S:LGO\u0010\u000b\u0003g\r\u0003B\u0001\u000e\u00016\u00016\tQ\u0002\u0005\u00027o1\u0001A!\u0002\u001d\u0001\u0005\u0004I$!A&\u0012\u0005ij\u0004CA\u000f<\u0013\tadDA\u0004O_RD\u0017N\\4\u0011\u0005uq\u0014BA \u001f\u0005\r\te.\u001f\t\u0003m\u0005#QA\u0011\u0001C\u0002e\u0012\u0011A\u0016\u0005\u0006Q\t\u0001\r!\u000b\u000b\u0005g\u0015{E\fC\u0003G\u0007\u0001\u0007q)\u0001\u0003d_:4\u0007C\u0001%N\u001b\u0005I%B\u0001&L\u00035\u0019wN\u001c4jOV\u0014\u0018\r^5p]*\u0011A*E\u0001\u0007[>$W\r\\:\n\u00059K%\u0001E&bM.\f7i\u001c8gS\u001elu\u000eZ3m\u0011\u0015\u00016\u00011\u0001R\u0003\u00159'o\\;q!\t\u0011\u0016L\u0004\u0002T/B\u0011AKH\u0007\u0002+*\u0011a+G\u0001\u0007yI|w\u000e\u001e \n\u0005as\u0012A\u0002)sK\u0012,g-\u0003\u0002[7\n11\u000b\u001e:j]\u001eT!\u0001\u0017\u0010\t\u000bu\u001b\u0001\u0019A)\u0002\u0013i|wn[3fa\u0016\u0014\u0018\u0001C2p]N,X.\u001a:\u0016\u0003\u0001\u0004B!\u00196R#6\t!M\u0003\u0002_G*\u0011A-Z\u0001\bG2LWM\u001c;t\u0015\tqaM\u0003\u0002hQ\u00061\u0011\r]1dQ\u0016T\u0011![\u0001\u0004_J<\u0017BA6c\u00055Y\u0015MZ6b\u0007>t7/^7fe\u0006I1m\u001c8tk6,'\u000fI\u0001\ngV\u00147o\u0019:jE\u0016$2a\u001c:u!\ti\u0002/\u0003\u0002r=\t!QK\\5u\u0011\u0015\u0019h\u00011\u0001R\u0003\u0015!x\u000e]5d\u0011\u0015)h\u00011\u0001w\u0003!a\u0017n\u001d;f]\u0016\u0014\bCA<}\u001b\u0005A(BA={\u0003\u0015\t7\r^8s\u0015\u0005Y\u0018\u0001B1lW\u0006L!! =\u0003\u0011\u0005\u001bGo\u001c:SK\u001a\fQa\u00197pg\u0016$\u0012a\\\u0001\u0010/\u0006\u001c\boS1gW\u0006\u0014V-\u00193feB\u0011A'C\n\u0003\u0013q!\"!a\u0001\u0002\u0019\r\u0014X-\u0019;f\u0007>tg-[4\u0015\u000f%\ni!a\u0006\u0002\u001a!9\u0011qB\u0006A\u0002\u0005E\u0011a\u00022s_.,'o\u001d\t\u0005%\u0006M\u0011+C\u0002\u0002\u0016m\u00131aU3u\u0011\u0015\u00016\u00021\u0001R\u0011\u0019\tYb\u0003a\u0001#\u0006A!p\\8lKB,'\u000f")
/* loaded from: input_file:it/agilelab/bigdata/wasp/core/kafka/WaspKafkaReader.class */
public class WaspKafkaReader<K, V> implements Logging {
    private final Properties consumerConfig;
    private final KafkaConsumer<String, String> it$agilelab$bigdata$wasp$core$kafka$WaspKafkaReader$$consumer;
    private final WaspLogger logger;

    public static Properties createConfig(Set<String> set, String str, String str2) {
        return WaspKafkaReader$.MODULE$.createConfig(set, str, str2);
    }

    @Override // it.agilelab.bigdata.wasp.core.logging.Logging
    public WaspLogger logger() {
        return this.logger;
    }

    @Override // it.agilelab.bigdata.wasp.core.logging.Logging
    public void it$agilelab$bigdata$wasp$core$logging$Logging$_setter_$logger_$eq(WaspLogger waspLogger) {
        this.logger = waspLogger;
    }

    public KafkaConsumer<String, String> it$agilelab$bigdata$wasp$core$kafka$WaspKafkaReader$$consumer() {
        return this.it$agilelab$bigdata$wasp$core$kafka$WaspKafkaReader$$consumer;
    }

    public void subscribe(final String str, final ActorRef actorRef) {
        it$agilelab$bigdata$wasp$core$kafka$WaspKafkaReader$$consumer().subscribe(Arrays.asList(str));
        new Thread(this, actorRef, str) { // from class: it.agilelab.bigdata.wasp.core.kafka.WaspKafkaReader$$anon$1
            private final /* synthetic */ WaspKafkaReader $outer;
            private final ActorRef listener$1;
            private final String topic$1;

            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                while (true) {
                    ((IterableLike) package$.MODULE$.decorateAsScala().iterableAsScalaIterableConverter(this.$outer.it$agilelab$bigdata$wasp$core$kafka$WaspKafkaReader$$consumer().poll(100L)).asScala()).foreach(consumerRecord -> {
                        $anonfun$run$1(this, consumerRecord);
                        return BoxedUnit.UNIT;
                    });
                }
            }

            public static final /* synthetic */ void $anonfun$run$1(WaspKafkaReader$$anon$1 waspKafkaReader$$anon$1, ConsumerRecord consumerRecord) {
                ScalaActorRef actorRef2Scala = akka.actor.package$.MODULE$.actorRef2Scala(waspKafkaReader$$anon$1.listener$1);
                Tuple2 $minus$greater$extension = Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(waspKafkaReader$$anon$1.topic$1), consumerRecord.value());
                actorRef2Scala.$bang($minus$greater$extension, actorRef2Scala.$bang$default$2($minus$greater$extension));
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                this.listener$1 = actorRef;
                this.topic$1 = str;
            }
        }.start();
    }

    public void close() {
        it$agilelab$bigdata$wasp$core$kafka$WaspKafkaReader$$consumer().close();
    }

    public WaspKafkaReader(Properties properties) {
        this.consumerConfig = properties;
        it$agilelab$bigdata$wasp$core$logging$Logging$_setter_$logger_$eq(WaspLogger$.MODULE$.apply(getClass()));
        logger().info(() -> {
            return new StringBuilder(15).append("consumerConfig ").append(this.consumerConfig).toString();
        });
        this.it$agilelab$bigdata$wasp$core$kafka$WaspKafkaReader$$consumer = new KafkaConsumer<>(properties);
    }

    public WaspKafkaReader(KafkaConfigModel kafkaConfigModel, String str, String str2) {
        this(WaspKafkaReader$.MODULE$.createConfig(((TraversableOnce) kafkaConfigModel.connections().map(new WaspKafkaReader$$anonfun$$lessinit$greater$1(), Seq$.MODULE$.canBuildFrom())).toSet(), str, str2));
    }
}
