package cz.seznam.euphoria.flink.batch;

import cz.seznam.euphoria.core.client.dataset.windowing.MergingWindowing;
import cz.seznam.euphoria.core.client.dataset.windowing.TimedWindow;
import cz.seznam.euphoria.core.client.dataset.windowing.Window;
import cz.seznam.euphoria.core.client.dataset.windowing.Windowing;
import cz.seznam.euphoria.core.client.functional.ReduceFunctor;
import cz.seznam.euphoria.core.client.functional.UnaryFunction;
import cz.seznam.euphoria.core.client.functional.UnaryFunctor;
import cz.seznam.euphoria.core.client.operator.ReduceByKey;
import cz.seznam.euphoria.core.client.util.Pair;
import cz.seznam.euphoria.core.executor.util.SingleValueContext;
import cz.seznam.euphoria.flink.FlinkOperator;
import cz.seznam.euphoria.flink.Utils;
import cz.seznam.euphoria.flink.functions.PartitionerWrapper;
import cz.seznam.euphoria.shaded.guava.com.google.common.base.Preconditions;
import cz.seznam.euphoria.shaded.guava.com.google.common.collect.Iterables;
import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.Operator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.util.Collector;

/* loaded from: input_file:cz/seznam/euphoria/flink/batch/ReduceByKeyTranslator.class */
public class ReduceByKeyTranslator implements BatchOperatorTranslator<ReduceByKey> {

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:cz/seznam/euphoria/flink/batch/ReduceByKeyTranslator$RBKKeySelector.class */
    public static class RBKKeySelector implements KeySelector<BatchElement<Window, Pair>, Integer> {
        RBKKeySelector() {
        }

        public Integer getKey(BatchElement<Window, Pair> batchElement) {
            Tuple2 tuple2 = new Tuple2();
            tuple2.f0 = batchElement.getWindow();
            tuple2.f1 = batchElement.getElement().getFirst();
            return Integer.valueOf(tuple2.hashCode());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:cz/seznam/euphoria/flink/batch/ReduceByKeyTranslator$RBKReducer.class */
    public static class RBKReducer implements GroupReduceFunction<BatchElement<Window, Pair>, BatchElement<Window, Pair>>, GroupCombineFunction<BatchElement<Window, Pair>, BatchElement<Window, Pair>>, ResultTypeQueryable<BatchElement<Window, Pair>> {
        final UnaryFunctor<Iterable, Object> reducer;
        transient SingleValueContext<Object> singleValueContext;

        RBKReducer(UnaryFunctor<Iterable, Object> unaryFunctor) {
            this.reducer = unaryFunctor;
        }

        public void combine(Iterable<BatchElement<Window, Pair>> iterable, Collector<BatchElement<Window, Pair>> collector) {
            doReduce(iterable, collector);
        }

        public void reduce(Iterable<BatchElement<Window, Pair>> iterable, Collector<BatchElement<Window, Pair>> collector) {
            doReduce(iterable, collector);
        }

        private void doReduce(Iterable<BatchElement<Window, Pair>> iterable, Collector<BatchElement<Window, Pair>> collector) {
            HashMap hashMap = new HashMap();
            if (this.singleValueContext == null) {
                this.singleValueContext = new SingleValueContext<>();
            }
            for (BatchElement<Window, Pair> batchElement : iterable) {
                Tuple2 tuple2 = new Tuple2(batchElement.getElement().getFirst(), batchElement.getWindow());
                TimestampedElement timestampedElement = (TimestampedElement) hashMap.get(tuple2);
                if (timestampedElement == null) {
                    hashMap.put(tuple2, new TimestampedElement(batchElement.getTimestamp(), batchElement.getElement().getSecond()));
                } else {
                    this.reducer.apply(Arrays.asList(timestampedElement.getElement(), batchElement.getElement().getSecond()), this.singleValueContext);
                    timestampedElement.setElement(this.singleValueContext.getAndResetValue());
                    timestampedElement.setTimestamp(Math.max(timestampedElement.getTimestamp(), batchElement.getTimestamp()));
                }
            }
            for (Map.Entry entry : hashMap.entrySet()) {
                collector.collect(new BatchElement((Window) ((Tuple2) entry.getKey()).f1, ((TimestampedElement) entry.getValue()).getTimestamp(), Pair.of(((Tuple2) entry.getKey()).f0, ((TimestampedElement) entry.getValue()).getElement())));
            }
        }

        public TypeInformation<BatchElement<Window, Pair>> getProducedType() {
            return TypeInformation.of(BatchElement.class);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static boolean wantTranslate(ReduceByKey reduceByKey) {
        return reduceByKey.isCombinable() && (reduceByKey.getWindowing() == null || !((reduceByKey.getWindowing() instanceof MergingWindowing) || reduceByKey.getWindowing().getTrigger().isStateful()));
    }

    /* renamed from: translate, reason: avoid collision after fix types in other method */
    public DataSet translate2(FlinkOperator<ReduceByKey> flinkOperator, BatchExecutorContext batchExecutorContext) {
        int parallelism = ((FlinkOperator) Iterables.getOnlyElement(batchExecutorContext.getInputOperators(flinkOperator))).getParallelism();
        DataSet dataSet = (DataSet) Iterables.getOnlyElement(batchExecutorContext.getInputStreams(flinkOperator));
        ReduceByKey originalOperator = flinkOperator.getOriginalOperator();
        ReduceFunctor reducer = originalOperator.getReducer();
        Windowing windowing = originalOperator.getWindowing() == null ? AttachedWindowing.INSTANCE : originalOperator.getWindowing();
        Preconditions.checkState(originalOperator.isCombinable(), "Non-combinable ReduceByKey not supported!");
        Preconditions.checkState(!(windowing instanceof MergingWindowing), "MergingWindowing not supported!");
        Preconditions.checkState(!windowing.getTrigger().isStateful(), "Stateful triggers not supported!");
        UnaryFunction keyExtractor = originalOperator.getKeyExtractor();
        UnaryFunction valueExtractor = originalOperator.getValueExtractor();
        Operator name = dataSet.flatMap((obj, collector) -> {
            BatchElement batchElement = (BatchElement) obj;
            for (TimedWindow timedWindow : windowing.assignWindowsToElement(batchElement)) {
                Object element = batchElement.getElement();
                collector.collect(new BatchElement(timedWindow, timedWindow instanceof TimedWindow ? timedWindow.maxTimestamp() : batchElement.getTimestamp(), Pair.of(keyExtractor.apply(element), valueExtractor.apply(element))));
            }
        }).name(flinkOperator.getName() + "::map-input").setParallelism(parallelism).returns(new TypeHint<BatchElement<Window, Pair>>() { // from class: cz.seznam.euphoria.flink.batch.ReduceByKeyTranslator.1
        }).groupBy(new RBKKeySelector()).reduceGroup(new RBKReducer(reducer)).setParallelism(flinkOperator.getParallelism()).name(flinkOperator.getName() + "::reduce");
        if (!originalOperator.getPartitioning().hasDefaultPartitioner()) {
            name = name.partitionCustom(new PartitionerWrapper(originalOperator.getPartitioning().getPartitioner()), Utils.wrapQueryable(batchElement -> {
                return (Comparable) ((Pair) batchElement.getElement()).getFirst();
            }, Comparable.class)).setParallelism(flinkOperator.getParallelism());
        }
        return name;
    }

    @Override // cz.seznam.euphoria.flink.OperatorTranslator
    public /* bridge */ /* synthetic */ DataSet<?> translate(FlinkOperator flinkOperator, BatchExecutorContext batchExecutorContext) {
        return translate2((FlinkOperator<ReduceByKey>) flinkOperator, batchExecutorContext);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1720827202:
                if (implMethodName.equals("lambda$translate$b0dc05f2$1")) {
                    z = false;
                    break;
                }
                break;
            case 1534386815:
                if (implMethodName.equals("lambda$translate$f64ade78$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/FlatMapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("flatMap") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Lorg/apache/flink/util/Collector;)V") && serializedLambda.getImplClass().equals("cz/seznam/euphoria/flink/batch/ReduceByKeyTranslator") && serializedLambda.getImplMethodSignature().equals("(Lcz/seznam/euphoria/core/client/dataset/windowing/Windowing;Lcz/seznam/euphoria/core/client/functional/UnaryFunction;Lcz/seznam/euphoria/core/client/functional/UnaryFunction;Ljava/lang/Object;Lorg/apache/flink/util/Collector;)V")) {
                    Windowing windowing = (Windowing) serializedLambda.getCapturedArg(0);
                    UnaryFunction unaryFunction = (UnaryFunction) serializedLambda.getCapturedArg(1);
                    UnaryFunction unaryFunction2 = (UnaryFunction) serializedLambda.getCapturedArg(2);
                    return (obj, collector) -> {
                        BatchElement batchElement = (BatchElement) obj;
                        for (TimedWindow timedWindow : windowing.assignWindowsToElement(batchElement)) {
                            Object element = batchElement.getElement();
                            collector.collect(new BatchElement(timedWindow, timedWindow instanceof TimedWindow ? timedWindow.maxTimestamp() : batchElement.getTimestamp(), Pair.of(unaryFunction.apply(element), unaryFunction2.apply(element))));
                        }
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/seznam/euphoria/flink/batch/ReduceByKeyTranslator") && serializedLambda.getImplMethodSignature().equals("(Lcz/seznam/euphoria/flink/batch/BatchElement;)Ljava/lang/Comparable;")) {
                    return batchElement -> {
                        return (Comparable) ((Pair) batchElement.getElement()).getFirst();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
