package pl.touk.nussknacker.engine.example.custom;

import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import pl.touk.nussknacker.engine.api.InterpretationResult;
import pl.touk.nussknacker.engine.api.ValueWithContext;
import pl.touk.nussknacker.engine.api.util.MultiMap;
import pl.touk.nussknacker.engine.flink.api.state.TimestampedEvictableState;
import scala.Predef$;
import scala.math.Numeric$IntIsIntegral$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxesRunTime;

/* compiled from: EventsCounter.scala */
@ScalaSignature(bytes = "\u0006\u0001a3A!\u0001\u0002\u0001\u001f\ty1i\\;oi\u0016\u0014h)\u001e8di&|gN\u0003\u0002\u0004\t\u000511-^:u_6T!!\u0002\u0004\u0002\u000f\u0015D\u0018-\u001c9mK*\u0011q\u0001C\u0001\u0007K:<\u0017N\\3\u000b\u0005%Q\u0011a\u00038vgN\\g.Y2lKJT!a\u0003\u0007\u0002\tQ|Wo\u001b\u0006\u0002\u001b\u0005\u0011\u0001\u000f\\\u0002\u0001'\t\u0001\u0001\u0003E\u0002\u00121ii\u0011A\u0005\u0006\u0003'Q\tQa\u001d;bi\u0016T!!\u0006\f\u0002\u0007\u0005\u0004\u0018N\u0003\u0002\u0018\r\u0005)a\r\\5oW&\u0011\u0011D\u0005\u0002\u001a)&lWm\u001d;b[B,G-\u0012<jGR\f'\r\\3Ti\u0006$X\r\u0005\u0002\u001c=5\tADC\u0001\u001e\u0003\u0015\u00198-\u00197b\u0013\tyBDA\u0002J]RD\u0001\"\t\u0001\u0003\u0002\u0003\u0006IAI\u0001\u000fY\u0016tw\r\u001e5J]6KG\u000e\\5t!\tY2%\u0003\u0002%9\t!Aj\u001c8h\u0011\u00151\u0003\u0001\"\u0001(\u0003\u0019a\u0014N\\5u}Q\u0011\u0001F\u000b\t\u0003S\u0001i\u0011A\u0001\u0005\u0006C\u0015\u0002\rA\t\u0005\u0006Y\u0001!\t%L\u0001\u0010gR\fG/\u001a#fg\u000e\u0014\u0018\u000e\u001d;peV\ta\u0006E\u00020smj\u0011\u0001\r\u0006\u0003'ER!AM\u001a\u0002\r\r|W.\\8o\u0015\t)BG\u0003\u0002\u0018k)\u0011agN\u0001\u0007CB\f7\r[3\u000b\u0003a\n1a\u001c:h\u0013\tQ\u0004G\u0001\u000bWC2,Xm\u0015;bi\u0016$Um]2sSB$xN\u001d\t\u0005y\u0001\u0013#$D\u0001>\u0015\tqt(\u0001\u0003vi&d'BA\u000b\u0007\u0013\t\tUH\u0001\u0005Nk2$\u0018.T1q\u0011\u0015\u0019\u0005\u0001\"\u0011E\u00039\u0001(o\\2fgN,E.Z7f]R$\"!\u0012%\u0011\u0005m1\u0015BA$\u001d\u0005\u0011)f.\u001b;\t\u000b%\u0013\u0005\u0019\u0001&\u0002\u000f\u0015dW-\\3oiB\u00191J\u0015+\u000e\u00031S!!\u0014(\u0002\u0019M$(/Z1ne\u0016\u001cwN\u001d3\u000b\u0005=\u0003\u0016a\u0002:v]RLW.\u001a\u0006\u0003#R\n\u0011b\u001d;sK\u0006l\u0017N\\4\n\u0005Mc%\u0001D*ue\u0016\fWNU3d_J$\u0007CA+W\u001b\u0005y\u0014BA,@\u0005QIe\u000e^3saJ,G/\u0019;j_:\u0014Vm];mi\u0002")
/* loaded from: input_file:pl/touk/nussknacker/engine/example/custom/CounterFunction.class */
public class CounterFunction extends TimestampedEvictableState<Object> {
    private final long lengthInMillis;

    public ValueStateDescriptor<MultiMap<Object, Object>> stateDescriptor() {
        return new ValueStateDescriptor<>("state", MultiMap.class);
    }

    public void processElement(StreamRecord<InterpretationResult> streamRecord) {
        setEvictionTimeForCurrentKey(streamRecord.getTimestamp() + this.lengthInMillis);
        state().update(filterState(streamRecord.getTimestamp(), this.lengthInMillis));
        InterpretationResult interpretationResult = (InterpretationResult) streamRecord.getValue();
        state().update(stateValue().add(BoxesRunTime.boxToLong(streamRecord.getTimestamp()), BoxesRunTime.boxToInteger(1)));
        this.output.collect(new StreamRecord(new ValueWithContext(new EventCount(BoxesRunTime.unboxToInt(r0.map().values().flatten(Predef$.MODULE$.$conforms()).sum(Numeric$IntIsIntegral$.MODULE$))), interpretationResult.finalContext()), streamRecord.getTimestamp()));
    }

    public CounterFunction(long j) {
        this.lengthInMillis = j;
    }
}
