package pl.touk.nussknacker.engine.javaexample;

import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import pl.touk.nussknacker.engine.api.CustomStreamTransformer;
import pl.touk.nussknacker.engine.api.LazyParameter;
import pl.touk.nussknacker.engine.api.MethodToInvoke;
import pl.touk.nussknacker.engine.api.ParamName;
import pl.touk.nussknacker.engine.api.ValueWithContext;
import pl.touk.nussknacker.engine.api.util.MultiMap;
import pl.touk.nussknacker.engine.flink.api.process.FlinkCustomStreamTransformation;
import pl.touk.nussknacker.engine.flink.api.state.TimestampedEvictableStateFunction;
import pl.touk.nussknacker.engine.flink.javaapi.process.JavaFlinkCustomStreamTransformation;
import scala.collection.JavaConversions;
import scala.concurrent.duration.Duration;

/* loaded from: input_file:pl/touk/nussknacker/engine/javaexample/EventsCounter.class */
public class EventsCounter extends CustomStreamTransformer {

    /* loaded from: input_file:pl/touk/nussknacker/engine/javaexample/EventsCounter$CounterFunction.class */
    public static class CounterFunction extends TimestampedEvictableStateFunction<ValueWithContext<String>, ValueWithContext<Object>, Integer> {
        long lengthInMillis;

        public CounterFunction(long j) {
            this.lengthInMillis = j;
        }

        public ValueStateDescriptor<MultiMap<Object, Integer>> stateDescriptor() {
            return new ValueStateDescriptor<>("state", MultiMap.class);
        }

        public void processElement(ValueWithContext<String> valueWithContext, KeyedProcessFunction<String, ValueWithContext<String>, ValueWithContext<Object>>.Context context, Collector<ValueWithContext<Object>> collector) throws Exception {
            long longValue = context.timestamp().longValue();
            moveEvictionTime(this.lengthInMillis, context);
            state().update(stateValue().add(Long.valueOf(longValue), 1));
            collector.collect(new ValueWithContext(new EventCount(JavaConversions.asJavaCollection(r0.map().values()).stream().map(list -> {
                return Integer.valueOf(JavaConversions.asJavaCollection(list).stream().mapToInt((v0) -> {
                    return v0.intValue();
                }).sum());
            }).mapToInt((v0) -> {
                return v0.intValue();
            }).sum()), valueWithContext.context()));
        }

        public /* bridge */ /* synthetic */ void processElement(Object obj, KeyedProcessFunction.Context context, Collector collector) throws Exception {
            processElement((ValueWithContext<String>) obj, (KeyedProcessFunction<String, ValueWithContext<String>, ValueWithContext<Object>>.Context) context, (Collector<ValueWithContext<Object>>) collector);
        }
    }

    /* loaded from: input_file:pl/touk/nussknacker/engine/javaexample/EventsCounter$EventCount.class */
    public static class EventCount {
        long count;

        public EventCount(long j) {
            this.count = j;
        }

        public long count() {
            return this.count;
        }
    }

    @MethodToInvoke(returnType = EventCount.class)
    public FlinkCustomStreamTransformation execute(@ParamName("key") LazyParameter<String> lazyParameter, @ParamName("length") String str) {
        return JavaFlinkCustomStreamTransformation.apply((dataStream, flinkCustomNodeContext) -> {
            return dataStream.map(flinkCustomNodeContext.nodeServices().lazyMapFunction(lazyParameter)).keyBy(new KeySelector<ValueWithContext<String>, String>() { // from class: pl.touk.nussknacker.engine.javaexample.EventsCounter.1
                public String getKey(ValueWithContext<String> valueWithContext) throws Exception {
                    return (String) valueWithContext.value();
                }
            }).process(new CounterFunction(Duration.apply(str).toMillis()));
        });
    }
}
