package it.agilelab.bigdata.wasp.core.kafka;

import akka.actor.ActorRef;
import it.agilelab.bigdata.wasp.core.logging.Logging;
import it.agilelab.bigdata.wasp.core.logging.WaspLogger;
import it.agilelab.bigdata.wasp.models.configuration.KafkaConfigModel;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import scala.collection.Seq$;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Set;
import scala.reflect.ScalaSignature;

/* compiled from: WaspKafkaReader.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005Ua\u0001B\u0001\u0003\u0001=\u0011qbV1ta.\u000bgm[1SK\u0006$WM\u001d\u0006\u0003\u0007\u0011\tQa[1gW\u0006T!!\u0002\u0004\u0002\t\r|'/\u001a\u0006\u0003\u000f!\tAa^1ta*\u0011\u0011BC\u0001\bE&<G-\u0019;b\u0015\tYA\"\u0001\u0005bO&dW\r\\1c\u0015\u0005i\u0011AA5u\u0007\u0001)2\u0001\u0005\u00189'\r\u0001\u0011c\u0006\t\u0003%Ui\u0011a\u0005\u0006\u0002)\u0005)1oY1mC&\u0011ac\u0005\u0002\u0007\u0003:L(+\u001a4\u0011\u0005aYR\"A\r\u000b\u0005i!\u0011a\u00027pO\u001eLgnZ\u0005\u00039e\u0011q\u0001T8hO&tw\r\u0003\u0005\u001f\u0001\t\u0005\t\u0015!\u0003 \u00039\u0019wN\\:v[\u0016\u00148i\u001c8gS\u001e\u0004\"\u0001I\u0013\u000e\u0003\u0005R!AI\u0012\u0002\tU$\u0018\u000e\u001c\u0006\u0002I\u0005!!.\u0019<b\u0013\t1\u0013E\u0001\u0006Qe>\u0004XM\u001d;jKNDQ\u0001\u000b\u0001\u0005\u0002%\na\u0001P5oSRtDC\u0001\u0016;!\u0011Y\u0003\u0001L\u001c\u000e\u0003\t\u0001\"!\f\u0018\r\u0001\u0011)q\u0006\u0001b\u0001a\t\t1*\u0005\u00022iA\u0011!CM\u0005\u0003gM\u0011qAT8uQ&tw\r\u0005\u0002\u0013k%\u0011ag\u0005\u0002\u0004\u0003:L\bCA\u00179\t\u0015I\u0004A1\u00011\u0005\u00051\u0006\"\u0002\u0010(\u0001\u0004y\u0002\"\u0002\u0015\u0001\t\u0003aD\u0003\u0002\u0016>\u000fBCQAP\u001eA\u0002}\nAaY8oMB\u0011\u0001)R\u0007\u0002\u0003*\u0011!iQ\u0001\u000eG>tg-[4ve\u0006$\u0018n\u001c8\u000b\u0005\u00113\u0011AB7pI\u0016d7/\u0003\u0002G\u0003\n\u00012*\u00194lC\u000e{gNZ5h\u001b>$W\r\u001c\u0005\u0006\u0011n\u0002\r!S\u0001\u0006OJ|W\u000f\u001d\t\u0003\u00156s!AE&\n\u00051\u001b\u0012A\u0002)sK\u0012,g-\u0003\u0002O\u001f\n11\u000b\u001e:j]\u001eT!\u0001T\n\t\u000bE[\u0004\u0019A%\u0002\u0013i|wn[3fa\u0016\u0014\bbB*\u0001\u0005\u0004%I\u0001V\u0001\tG>t7/^7feV\tQ\u000b\u0005\u0003W?&KU\"A,\u000b\u0005MC&BA-[\u0003\u001d\u0019G.[3oiNT!aA.\u000b\u0005qk\u0016AB1qC\u000eDWMC\u0001_\u0003\ry'oZ\u0005\u0003A^\u0013QbS1gW\u0006\u001cuN\\:v[\u0016\u0014\bB\u00022\u0001A\u0003%Q+A\u0005d_:\u001cX/\\3sA!)A\r\u0001C\u0001K\u0006I1/\u001e2tGJL'-\u001a\u000b\u0004M&\\\u0007C\u0001\nh\u0013\tA7C\u0001\u0003V]&$\b\"\u00026d\u0001\u0004I\u0015!\u0002;pa&\u001c\u0007\"\u00027d\u0001\u0004i\u0017\u0001\u00037jgR,g.\u001a:\u0011\u00059\u001cX\"A8\u000b\u0005A\f\u0018!B1di>\u0014(\"\u0001:\u0002\t\u0005\\7.Y\u0005\u0003i>\u0014\u0001\"Q2u_J\u0014VM\u001a\u0005\u0006m\u0002!\ta^\u0001\u0006G2|7/\u001a\u000b\u0002M\u001e)\u0011P\u0001E\u0001u\u0006yq+Y:q\u0017\u000647.\u0019*fC\u0012,'\u000f\u0005\u0002,w\u001a)\u0011A\u0001E\u0001yN\u001110\u0005\u0005\u0006Qm$\tA \u000b\u0002u\"9\u0011\u0011A>\u0005\u0002\u0005\r\u0011\u0001D2sK\u0006$XmQ8oM&<GcB\u0010\u0002\u0006\u0005=\u0011\u0011\u0003\u0005\b\u0003\u000fy\b\u0019AA\u0005\u0003\u001d\u0011'o\\6feN\u0004BASA\u0006\u0013&\u0019\u0011QB(\u0003\u0007M+G\u000fC\u0003I\u007f\u0002\u0007\u0011\n\u0003\u0004\u0002\u0014}\u0004\r!S\u0001\tu>|7.\u001a9fe\u0002")
/* loaded from: input_file:it/agilelab/bigdata/wasp/core/kafka/WaspKafkaReader.class */
public class WaspKafkaReader<K, V> implements Logging {
    public final Properties it$agilelab$bigdata$wasp$core$kafka$WaspKafkaReader$$consumerConfig;
    private final KafkaConsumer<String, String> it$agilelab$bigdata$wasp$core$kafka$WaspKafkaReader$$consumer;
    private final WaspLogger logger;

    public static Properties createConfig(Set<String> set, String str, String str2) {
        return WaspKafkaReader$.MODULE$.createConfig(set, str, str2);
    }

    @Override // it.agilelab.bigdata.wasp.core.logging.Logging
    public WaspLogger logger() {
        return this.logger;
    }

    @Override // it.agilelab.bigdata.wasp.core.logging.Logging
    public void it$agilelab$bigdata$wasp$core$logging$Logging$_setter_$logger_$eq(WaspLogger waspLogger) {
        this.logger = waspLogger;
    }

    public KafkaConsumer<String, String> it$agilelab$bigdata$wasp$core$kafka$WaspKafkaReader$$consumer() {
        return this.it$agilelab$bigdata$wasp$core$kafka$WaspKafkaReader$$consumer;
    }

    public void subscribe(String str, ActorRef actorRef) {
        it$agilelab$bigdata$wasp$core$kafka$WaspKafkaReader$$consumer().subscribe(Arrays.asList(str));
        new WaspKafkaReader$$anon$1(this, str, actorRef).start();
    }

    public void close() {
        it$agilelab$bigdata$wasp$core$kafka$WaspKafkaReader$$consumer().close();
    }

    public WaspKafkaReader(Properties properties) {
        this.it$agilelab$bigdata$wasp$core$kafka$WaspKafkaReader$$consumerConfig = properties;
        Logging.Cclass.$init$(this);
        logger().info(new WaspKafkaReader$$anonfun$1(this));
        this.it$agilelab$bigdata$wasp$core$kafka$WaspKafkaReader$$consumer = new KafkaConsumer<>(properties);
    }

    public WaspKafkaReader(KafkaConfigModel kafkaConfigModel, String str, String str2) {
        this(WaspKafkaReader$.MODULE$.createConfig(((TraversableOnce) kafkaConfigModel.connections().map(new WaspKafkaReader$$anonfun$$lessinit$greater$1(), Seq$.MODULE$.canBuildFrom())).toSet(), str, str2));
    }
}
