package pl.touk.nussknacker.engine.javaexample;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import pl.touk.nussknacker.engine.api.CustomStreamTransformer;
import pl.touk.nussknacker.engine.api.LazyParameter;
import pl.touk.nussknacker.engine.api.MethodToInvoke;
import pl.touk.nussknacker.engine.api.ParamName;
import pl.touk.nussknacker.engine.api.ValueWithContext;
import pl.touk.nussknacker.engine.flink.api.process.FlinkCustomStreamTransformation;
import pl.touk.nussknacker.engine.flink.javaapi.process.JavaFlinkCustomStreamTransformation;

/* loaded from: input_file:pl/touk/nussknacker/engine/javaexample/TransactionAmountAggregator.class */
public class TransactionAmountAggregator extends CustomStreamTransformer {

    /* loaded from: input_file:pl/touk/nussknacker/engine/javaexample/TransactionAmountAggregator$AggregatedAmount.class */
    public static class AggregatedAmount {
        public String clientId;
        public int amount;

        public AggregatedAmount(String str, int i) {
            this.clientId = str;
            this.amount = i;
        }
    }

    @MethodToInvoke
    public FlinkCustomStreamTransformation execute(@ParamName("clientId") LazyParameter<String> lazyParameter) {
        return JavaFlinkCustomStreamTransformation.apply((dataStream, flinkCustomNodeContext) -> {
            return dataStream.map(flinkCustomNodeContext.nodeServices().lazyMapFunction(lazyParameter)).keyBy(new KeySelector<ValueWithContext<String>, String>() { // from class: pl.touk.nussknacker.engine.javaexample.TransactionAmountAggregator.1
                public String getKey(ValueWithContext<String> valueWithContext) throws Exception {
                    return (String) valueWithContext.value();
                }
            }).map(amountAggregateFunction());
        });
    }

    private RichMapFunction<ValueWithContext<String>, ValueWithContext<Object>> amountAggregateFunction() {
        return new RichMapFunction<ValueWithContext<String>, ValueWithContext<Object>>() { // from class: pl.touk.nussknacker.engine.javaexample.TransactionAmountAggregator.2
            ValueState<AggregatedAmount> state = null;

            public void open(Configuration configuration) throws Exception {
                super.open(configuration);
                this.state = getRuntimeContext().getState(new ValueStateDescriptor("state", TypeInformation.of(AggregatedAmount.class).createSerializer(getRuntimeContext().getExecutionConfig())));
            }

            public ValueWithContext<Object> map(ValueWithContext<String> valueWithContext) throws Exception {
                Transaction transaction = (Transaction) valueWithContext.context().apply("input");
                this.state.update(new AggregatedAmount(transaction.clientId, transaction.amount + (this.state.value() == null ? 0 : ((AggregatedAmount) this.state.value()).amount)));
                return new ValueWithContext<>(this.state.value(), valueWithContext.context());
            }
        };
    }
}
