package it.agilelab.bigdata.wasp.producers.metrics.kafka.backlog;

import akka.actor.ActorRef;
import it.agilelab.bigdata.wasp.core.messages.TelemetryMessageSourcesSummary;
import it.agilelab.bigdata.wasp.models.TopicModel;
import it.agilelab.bigdata.wasp.producers.ProducerActor;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.PartialFunction;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Seq;
import scala.collection.Seq$;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Iterable$;
import scala.collection.immutable.Map;
import scala.collection.immutable.Map$;
import scala.collection.immutable.StringOps;
import scala.math.Numeric$LongIsIntegral$;
import scala.package$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Either;
import scala.util.Right;

/* compiled from: BacklogSizeAnalyzerProducerActor.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005eb!\u0002\b\u0010\u0003\u0003\u0001\u0003\"\u0003\u001b\u0001\u0005\u0003\u0005\u000b\u0011B\u001b>\u0011!q\u0004A!A!\u0002\u0013)\u0004\"C \u0001\u0005\u0003\u0005\u000b\u0011\u0002!J\u0011!Q\u0005A!A!\u0002\u0013Y\u0005\u0002\u0003,\u0001\u0005\u0003\u0005\u000b\u0011B&\t\u000b]\u0003A\u0011\u0001-\t\u000b\u0001\u0004A\u0011I1\t\u000b\u0015\u0004A\u0011I1\t\u000b\u0019\u0004A\u0011B4\t\u000b9\u0004A\u0011B8\t\u000bi\u0004A\u0011B>\t\u000f\u0005\r\u0002\u0001\"\u0003\u0002&!9\u0011\u0011\u0007\u0001\u0007\u0002\u0005M\"\u0001\t\"bG.dwnZ*ju\u0016\fe.\u00197zu\u0016\u0014\bK]8ek\u000e,'/Q2u_JT!\u0001E\t\u0002\u000f\t\f7m\u001b7pO*\u0011!cE\u0001\u0006W\u000647.\u0019\u0006\u0003)U\tq!\\3ue&\u001c7O\u0003\u0002\u0017/\u0005I\u0001O]8ek\u000e,'o\u001d\u0006\u00031e\tAa^1ta*\u0011!dG\u0001\bE&<G-\u0019;b\u0015\taR$\u0001\u0005bO&dW\r\\1c\u0015\u0005q\u0012AA5u\u0007\u0001)\"!\t\u0015\u0014\u0005\u0001\u0011\u0003cA\u0012%M5\tQ#\u0003\u0002&+\ti\u0001K]8ek\u000e,'/Q2u_J\u0004\"a\n\u0015\r\u0001\u0011)\u0011\u0006\u0001b\u0001U\t\t\u0011)\u0005\u0002,cA\u0011AfL\u0007\u0002[)\ta&A\u0003tG\u0006d\u0017-\u0003\u00021[\t9aj\u001c;iS:<\u0007C\u0001\u00173\u0013\t\u0019TFA\u0002B]f\fAb[1gW\u0006|&o\\;uKJ\u0004\"AN\u001e\u000e\u0003]R!\u0001O\u001d\u0002\u000b\u0005\u001cGo\u001c:\u000b\u0003i\nA!Y6lC&\u0011Ah\u000e\u0002\t\u0003\u000e$xN\u001d*fM&\u0011A\u0007J\u0001\u0013W\u000647.Y(gMN,Go\u00115fG.,'/A\u0003u_BL7\rE\u0002-\u0003\u000eK!AQ\u0017\u0003\r=\u0003H/[8o!\t!u)D\u0001F\u0015\t1u#\u0001\u0004n_\u0012,Gn]\u0005\u0003\u0011\u0016\u0013!\u0002V8qS\u000elu\u000eZ3m\u0013\tyD%\u0001\u0007u_BL7\rV8DQ\u0016\u001c7\u000e\u0005\u0002M':\u0011Q*\u0015\t\u0003\u001d6j\u0011a\u0014\u0006\u0003!~\ta\u0001\u0010:p_Rt\u0014B\u0001*.\u0003\u0019\u0001&/\u001a3fM&\u0011A+\u0016\u0002\u0007'R\u0014\u0018N\\4\u000b\u0005Ik\u0013aB3uY:\u000bW.Z\u0001\u0007y%t\u0017\u000e\u001e \u0015\re[F,\u00180`!\rQ\u0006AJ\u0007\u0002\u001f!)AG\u0002a\u0001k!)aH\u0002a\u0001k!)qH\u0002a\u0001\u0001\")!J\u0002a\u0001\u0017\")aK\u0002a\u0001\u0017\u0006A\u0001O]3Ti\u0006\u0014H\u000fF\u0001c!\ta3-\u0003\u0002e[\t!QK\\5u\u0003!i\u0017-\u001b8UCN\\\u0017!E<bSRLgn\u001a$pe6+7o]1hKR\t\u0001\u000e\u0005\u0002jU6\t\u0001!\u0003\u0002lY\n9!+Z2fSZ,\u0017BA78\u0005\u0015\t5\r^8s\u0003E9\u0018-\u001b;j]\u001e4uN](gMN,Go\u001d\u000b\u0003QBDQ!\u001d\u0006A\u0002I\fA\u0001Z1uCB\u00111\u000f_\u0007\u0002i*\u0011QO^\u0001\t[\u0016\u001c8/Y4fg*\u0011qoF\u0001\u0005G>\u0014X-\u0003\u0002zi\nqB+\u001a7f[\u0016$(/_'fgN\fw-Z*pkJ\u001cWm]*v[6\f'/_\u0001\u0015G\u0006d7-\u001e7bi\u0016\u0014\u0015mY6m_\u001e\u001c\u0016N_3\u0015\u000bq\f\t\"!\t\u0011\ru\f)aSA\u0006\u001d\rq\u0018\u0011\u0001\b\u0003\u001d~L\u0011AL\u0005\u0004\u0003\u0007i\u0013a\u00029bG.\fw-Z\u0005\u0005\u0003\u000f\tIA\u0001\u0004FSRDWM\u001d\u0006\u0004\u0003\u0007i\u0003c\u0001\u0017\u0002\u000e%\u0019\u0011qB\u0017\u0003\t1{gn\u001a\u0005\b\u0003'Y\u0001\u0019AA\u000b\u00039ygMZ:fiN|enS1gW\u0006\u0004r\u0001TA\f\u00037\tY!C\u0002\u0002\u001aU\u00131!T1q!\ra\u0013QD\u0005\u0004\u0003?i#aA%oi\")\u0011o\u0003a\u0001e\u0006!\u0002O]3qCJ,Gk\\*f]\u0012lUm]:bO\u0016$2AYA\u0014\u0011\u001d\tI\u0003\u0004a\u0001\u0003W\tA!\u001b8g_B\u0019!,!\f\n\u0007\u0005=rBA\u0006CC\u000e\\Gn\\4J]\u001a|\u0017A\u0004;p\r&t\u0017\r\\'fgN\fw-\u001a\u000b\u0004M\u0005U\u0002bBA\u001c\u001b\u0001\u0007\u00111F\u0001\u0002S\u0002")
/* loaded from: input_file:it/agilelab/bigdata/wasp/producers/metrics/kafka/backlog/BacklogSizeAnalyzerProducerActor.class */
public abstract class BacklogSizeAnalyzerProducerActor<A> extends ProducerActor<A> {
    public final ActorRef it$agilelab$bigdata$wasp$producers$metrics$kafka$backlog$BacklogSizeAnalyzerProducerActor$$kafkaOffsetChecker;
    public final String it$agilelab$bigdata$wasp$producers$metrics$kafka$backlog$BacklogSizeAnalyzerProducerActor$$topicToCheck;
    public final String it$agilelab$bigdata$wasp$producers$metrics$kafka$backlog$BacklogSizeAnalyzerProducerActor$$etlName;

    @Override // it.agilelab.bigdata.wasp.producers.ProducerActor
    public void preStart() {
        context().become(it$agilelab$bigdata$wasp$producers$metrics$kafka$backlog$BacklogSizeAnalyzerProducerActor$$waitingForMessage());
    }

    @Override // it.agilelab.bigdata.wasp.producers.ProducerActor
    public void mainTask() {
    }

    public PartialFunction<Object, BoxedUnit> it$agilelab$bigdata$wasp$producers$metrics$kafka$backlog$BacklogSizeAnalyzerProducerActor$$waitingForMessage() {
        return new BacklogSizeAnalyzerProducerActor$$anonfun$it$agilelab$bigdata$wasp$producers$metrics$kafka$backlog$BacklogSizeAnalyzerProducerActor$$waitingForMessage$1(this);
    }

    public PartialFunction<Object, BoxedUnit> it$agilelab$bigdata$wasp$producers$metrics$kafka$backlog$BacklogSizeAnalyzerProducerActor$$waitingForOffsets(TelemetryMessageSourcesSummary telemetryMessageSourcesSummary) {
        return new BacklogSizeAnalyzerProducerActor$$anonfun$it$agilelab$bigdata$wasp$producers$metrics$kafka$backlog$BacklogSizeAnalyzerProducerActor$$waitingForOffsets$1(this, telemetryMessageSourcesSummary);
    }

    public Either<String, Object> it$agilelab$bigdata$wasp$producers$metrics$kafka$backlog$BacklogSizeAnalyzerProducerActor$$calculateBacklogSize(Map<Object, Object> map, TelemetryMessageSourcesSummary telemetryMessageSourcesSummary) {
        Right apply;
        Seq streamingQueriesProgress = telemetryMessageSourcesSummary.streamingQueriesProgress();
        logger().debug(() -> {
            return new StringBuilder(29).append("Evaluating streamingSources:\n").append(streamingQueriesProgress.mkString("\n\t")).toString();
        });
        Some headOption = ((TraversableLike) streamingQueriesProgress.flatMap(telemetryMessageSource -> {
            return Option$.MODULE$.option2Iterable(telemetryMessageSource.endOffset().get(this.it$agilelab$bigdata$wasp$producers$metrics$kafka$backlog$BacklogSizeAnalyzerProducerActor$$topicToCheck).map(map2 -> {
                return (Map) map2.map(tuple2 -> {
                    if (tuple2 == null) {
                        throw new MatchError(tuple2);
                    }
                    return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(BoxesRunTime.boxToInteger(new StringOps(Predef$.MODULE$.augmentString((String) tuple2._1())).toInt())), BoxesRunTime.boxToLong(tuple2._2$mcJ$sp()));
                }, Map$.MODULE$.canBuildFrom());
            }));
        }, Seq$.MODULE$.canBuildFrom())).headOption();
        if (headOption instanceof Some) {
            Map map2 = (Map) headOption.value();
            logger().debug(() -> {
                return new StringBuilder(46).append("Current end offsets of Spark Streaming query:\n").append(((TraversableOnce) map2.map(tuple2 -> {
                    if (tuple2 == null) {
                        throw new MatchError(tuple2);
                    }
                    int _1$mcI$sp = tuple2._1$mcI$sp();
                    return new StringBuilder(2).append(_1$mcI$sp).append("->").append(tuple2._2$mcJ$sp()).toString();
                }, Iterable$.MODULE$.canBuildFrom())).mkString("\n\t")).toString();
            });
            apply = package$.MODULE$.Right().apply(((TraversableOnce) map.map(tuple2 -> {
                return BoxesRunTime.boxToLong($anonfun$calculateBacklogSize$7(map2, tuple2));
            }, Iterable$.MODULE$.canBuildFrom())).sum(Numeric$LongIsIntegral$.MODULE$));
        } else {
            if (!None$.MODULE$.equals(headOption)) {
                throw new MatchError(headOption);
            }
            apply = package$.MODULE$.Left().apply(new StringBuilder(51).append("Streaming sources did not contain info about topic ").append(this.it$agilelab$bigdata$wasp$producers$metrics$kafka$backlog$BacklogSizeAnalyzerProducerActor$$topicToCheck).toString());
        }
        return apply;
    }

    public void it$agilelab$bigdata$wasp$producers$metrics$kafka$backlog$BacklogSizeAnalyzerProducerActor$$prepareToSendMessage(BacklogInfo backlogInfo) {
        sendMessage(toFinalMessage(backlogInfo));
    }

    public abstract A toFinalMessage(BacklogInfo backlogInfo);

    public static final /* synthetic */ long $anonfun$calculateBacklogSize$7(Map map, Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        return tuple2._2$mcJ$sp() - BoxesRunTime.unboxToLong(map.apply(BoxesRunTime.boxToInteger(tuple2._1$mcI$sp())));
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public BacklogSizeAnalyzerProducerActor(ActorRef actorRef, ActorRef actorRef2, Option<TopicModel> option, String str, String str2) {
        super(actorRef, option);
        this.it$agilelab$bigdata$wasp$producers$metrics$kafka$backlog$BacklogSizeAnalyzerProducerActor$$kafkaOffsetChecker = actorRef2;
        this.it$agilelab$bigdata$wasp$producers$metrics$kafka$backlog$BacklogSizeAnalyzerProducerActor$$topicToCheck = str;
        this.it$agilelab$bigdata$wasp$producers$metrics$kafka$backlog$BacklogSizeAnalyzerProducerActor$$etlName = str2;
    }
}
