package cz.seznam.euphoria.flink.batch;

import cz.seznam.euphoria.core.client.dataset.windowing.Window;
import cz.seznam.euphoria.core.client.dataset.windowing.Windowing;
import cz.seznam.euphoria.core.client.functional.UnaryFunction;
import cz.seznam.euphoria.core.client.operator.ReduceStateByKey;
import cz.seznam.euphoria.core.client.operator.state.State;
import cz.seznam.euphoria.core.client.operator.state.StateFactory;
import cz.seznam.euphoria.core.client.operator.state.StateMerger;
import cz.seznam.euphoria.core.client.operator.state.StorageProvider;
import cz.seznam.euphoria.core.client.triggers.Trigger;
import cz.seznam.euphoria.core.client.util.Pair;
import cz.seznam.euphoria.core.executor.greduce.GroupReducer;
import cz.seznam.euphoria.core.util.Settings;
import cz.seznam.euphoria.flink.FlinkOperator;
import cz.seznam.euphoria.flink.Utils;
import cz.seznam.euphoria.flink.accumulators.FlinkAccumulatorFactory;
import cz.seznam.euphoria.flink.functions.PartitionerWrapper;
import cz.seznam.euphoria.shaded.guava.com.google.common.collect.Iterables;
import java.lang.invoke.SerializedLambda;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.util.Collector;

/* loaded from: input_file:cz/seznam/euphoria/flink/batch/ReduceStateByKeyTranslator.class */
public class ReduceStateByKeyTranslator implements BatchOperatorTranslator<ReduceStateByKey> {
    private StorageProvider stateStorageProvider;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:cz/seznam/euphoria/flink/batch/ReduceStateByKeyTranslator$RSBKReducer.class */
    public static class RSBKReducer extends RichGroupReduceFunction<BatchElement<?, Pair>, BatchElement<?, Pair>> implements ResultTypeQueryable<BatchElement<?, Pair>> {
        private final StateFactory<?, ?, State<?, ?>> stateFactory;
        private final StateMerger<?, ?, State<?, ?>> stateCombiner;
        private final StorageProvider stateStorageProvider;
        private final Windowing windowing;
        private final Trigger trigger;
        private final FlinkAccumulatorFactory accumulatorFactory;
        private final Settings settings;
        private transient Map<Object, GroupReducer> activeReducers;

        RSBKReducer(ReduceStateByKey reduceStateByKey, StorageProvider storageProvider, Windowing windowing, FlinkAccumulatorFactory flinkAccumulatorFactory, Settings settings) {
            this.stateFactory = reduceStateByKey.getStateFactory();
            this.stateCombiner = reduceStateByKey.getStateMerger();
            this.stateStorageProvider = storageProvider;
            this.windowing = windowing;
            this.trigger = windowing.getTrigger();
            this.accumulatorFactory = flinkAccumulatorFactory;
            this.settings = settings;
        }

        public void reduce(Iterable<BatchElement<?, Pair>> iterable, Collector<BatchElement<?, Pair>> collector) {
            this.activeReducers = new HashMap();
            for (BatchElement<?, Pair> batchElement : iterable) {
                Object first = batchElement.getElement().getFirst();
                GroupReducer groupReducer = this.activeReducers.get(first);
                if (groupReducer == null) {
                    groupReducer = new GroupReducer(this.stateFactory, this.stateCombiner, this.stateStorageProvider, BatchElement::new, this.windowing, this.trigger, obj -> {
                        collector.collect((BatchElement) obj);
                    }, this.accumulatorFactory.create(this.settings, getRuntimeContext()), false);
                    this.activeReducers.put(first, groupReducer);
                }
                groupReducer.process(batchElement);
            }
            flushStates();
        }

        private void flushStates() {
            Iterator<Map.Entry<Object, GroupReducer>> it = this.activeReducers.entrySet().iterator();
            while (it.hasNext()) {
                it.next().getValue().close();
            }
            this.activeReducers.clear();
        }

        public TypeInformation<BatchElement<?, Pair>> getProducedType() {
            return TypeInformation.of(BatchElement.class);
        }
    }

    private void loadConfig(Settings settings, ExecutionEnvironment executionEnvironment) {
        this.stateStorageProvider = new BatchStateStorageProvider(settings.getInt(BatchOperatorTranslator.CFG_LIST_STORAGE_MAX_MEMORY_ELEMS_KEY, BatchOperatorTranslator.CFG_LIST_STORAGE_MAX_MEMORY_ELEMS_DEFAULT), executionEnvironment);
    }

    /* renamed from: translate, reason: avoid collision after fix types in other method */
    public DataSet translate2(FlinkOperator<ReduceStateByKey> flinkOperator, BatchExecutorContext batchExecutorContext) {
        loadConfig(batchExecutorContext.getSettings(), batchExecutorContext.getExecutionEnvironment());
        int parallelism = ((FlinkOperator) Iterables.getOnlyElement(batchExecutorContext.getInputOperators(flinkOperator))).getParallelism();
        DataSet dataSet = (DataSet) Iterables.getOnlyElement(batchExecutorContext.getInputStreams(flinkOperator));
        ReduceStateByKey originalOperator = flinkOperator.getOriginalOperator();
        Windowing windowing = originalOperator.getWindowing() == null ? AttachedWindowing.INSTANCE : originalOperator.getWindowing();
        UnaryFunction keyExtractor = originalOperator.getKeyExtractor();
        UnaryFunction valueExtractor = originalOperator.getValueExtractor();
        DataSet name = dataSet.flatMap((obj, collector) -> {
            BatchElement batchElement = (BatchElement) obj;
            for (Window window : windowing.assignWindowsToElement(batchElement)) {
                Object element = batchElement.getElement();
                collector.collect(new BatchElement(window, batchElement.getTimestamp(), Pair.of(keyExtractor.apply(element), valueExtractor.apply(element))));
            }
        }).returns(BatchElement.class).name(flinkOperator.getName() + "::map-input").setParallelism(parallelism).groupBy(Utils.wrapQueryable(batchElement -> {
            return Integer.valueOf(((Pair) batchElement.getElement()).getFirst().hashCode());
        }, Integer.class)).sortGroup(Utils.wrapQueryable((v0) -> {
            return v0.getTimestamp();
        }, Long.class), Order.ASCENDING).reduceGroup(new RSBKReducer(originalOperator, this.stateStorageProvider, windowing, batchExecutorContext.getAccumulatorFactory(), batchExecutorContext.getSettings())).setParallelism(flinkOperator.getParallelism()).name(flinkOperator.getName() + "::reduce");
        if (!originalOperator.getPartitioning().hasDefaultPartitioner()) {
            name = name.partitionCustom(new PartitionerWrapper(originalOperator.getPartitioning().getPartitioner()), Utils.wrapQueryable(batchElement2 -> {
                return (Comparable) ((Pair) batchElement2.getElement()).getFirst();
            }, Comparable.class)).setParallelism(flinkOperator.getParallelism());
        }
        return name;
    }

    @Override // cz.seznam.euphoria.flink.OperatorTranslator
    public /* bridge */ /* synthetic */ DataSet<?> translate(FlinkOperator flinkOperator, BatchExecutorContext batchExecutorContext) {
        return translate2((FlinkOperator<ReduceStateByKey>) flinkOperator, batchExecutorContext);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 45521504:
                if (implMethodName.equals("getTimestamp")) {
                    z = 2;
                    break;
                }
                break;
            case 1051187877:
                if (implMethodName.equals("lambda$translate$7f502591$1")) {
                    z = false;
                    break;
                }
                break;
            case 1120660565:
                if (implMethodName.equals("lambda$translate$904d7d78$1")) {
                    z = 3;
                    break;
                }
                break;
            case 1719093970:
                if (implMethodName.equals("lambda$translate$6f978639$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/common/functions/FlatMapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("flatMap") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Lorg/apache/flink/util/Collector;)V") && serializedLambda.getImplClass().equals("cz/seznam/euphoria/flink/batch/ReduceStateByKeyTranslator") && serializedLambda.getImplMethodSignature().equals("(Lcz/seznam/euphoria/core/client/dataset/windowing/Windowing;Lcz/seznam/euphoria/core/client/functional/UnaryFunction;Lcz/seznam/euphoria/core/client/functional/UnaryFunction;Ljava/lang/Object;Lorg/apache/flink/util/Collector;)V")) {
                    Windowing windowing = (Windowing) serializedLambda.getCapturedArg(0);
                    UnaryFunction unaryFunction = (UnaryFunction) serializedLambda.getCapturedArg(1);
                    UnaryFunction unaryFunction2 = (UnaryFunction) serializedLambda.getCapturedArg(2);
                    return (obj, collector) -> {
                        BatchElement batchElement = (BatchElement) obj;
                        for (Window window : windowing.assignWindowsToElement(batchElement)) {
                            Object element = batchElement.getElement();
                            collector.collect(new BatchElement(window, batchElement.getTimestamp(), Pair.of(unaryFunction.apply(element), unaryFunction2.apply(element))));
                        }
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/seznam/euphoria/flink/batch/ReduceStateByKeyTranslator") && serializedLambda.getImplMethodSignature().equals("(Lcz/seznam/euphoria/flink/batch/BatchElement;)Ljava/lang/Comparable;")) {
                    return batchElement2 -> {
                        return (Comparable) ((Pair) batchElement2.getElement()).getFirst();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/seznam/euphoria/flink/batch/BatchElement") && serializedLambda.getImplMethodSignature().equals("()J")) {
                    return (v0) -> {
                        return v0.getTimestamp();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/seznam/euphoria/flink/batch/ReduceStateByKeyTranslator") && serializedLambda.getImplMethodSignature().equals("(Lcz/seznam/euphoria/flink/batch/BatchElement;)Ljava/lang/Integer;")) {
                    return batchElement -> {
                        return Integer.valueOf(((Pair) batchElement.getElement()).getFirst().hashCode());
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
