package cz.seznam.euphoria.core.client.operator;

import cz.seznam.euphoria.core.annotation.operator.Recommended;
import cz.seznam.euphoria.core.annotation.operator.StateComplexity;
import cz.seznam.euphoria.core.client.dataset.Dataset;
import cz.seznam.euphoria.core.client.dataset.windowing.Window;
import cz.seznam.euphoria.core.client.dataset.windowing.Windowing;
import cz.seznam.euphoria.core.client.flow.Flow;
import cz.seznam.euphoria.core.client.functional.BinaryFunction;
import cz.seznam.euphoria.core.client.functional.CombinableReduceFunction;
import cz.seznam.euphoria.core.client.functional.ReduceFunction;
import cz.seznam.euphoria.core.client.functional.ReduceFunctor;
import cz.seznam.euphoria.core.client.functional.UnaryFunction;
import cz.seznam.euphoria.core.client.io.Collector;
import cz.seznam.euphoria.core.client.io.ExternalIterable;
import cz.seznam.euphoria.core.client.io.SpillTools;
import cz.seznam.euphoria.core.client.operator.Builders;
import cz.seznam.euphoria.core.client.operator.StateSupport;
import cz.seznam.euphoria.core.client.operator.state.ListStorage;
import cz.seznam.euphoria.core.client.operator.state.ListStorageDescriptor;
import cz.seznam.euphoria.core.client.operator.state.State;
import cz.seznam.euphoria.core.client.operator.state.StateContext;
import cz.seznam.euphoria.core.client.operator.state.StateFactory;
import cz.seznam.euphoria.core.client.operator.state.StorageProvider;
import cz.seznam.euphoria.core.client.operator.state.ValueStorage;
import cz.seznam.euphoria.core.client.operator.state.ValueStorageDescriptor;
import cz.seznam.euphoria.core.client.util.Pair;
import cz.seznam.euphoria.core.executor.graph.DAG;
import cz.seznam.euphoria.core.executor.util.SingleValueContext;
import java.lang.invoke.SerializedLambda;
import java.util.Objects;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;

@Recommended(reason = "Is very recommended to override because of performance in a specific area of (mostly) batch calculations where combiners can be efficiently used in the executor-specific implementation", state = StateComplexity.CONSTANT_IF_COMBINABLE, repartitions = 1)
/* loaded from: input_file:cz/seznam/euphoria/core/client/operator/ReduceByKey.class */
public class ReduceByKey<IN, KEY, VALUE, OUT, W extends Window> extends StateAwareWindowWiseSingleInputOperator<IN, IN, IN, KEY, Pair<KEY, OUT>, W, ReduceByKey<IN, KEY, VALUE, OUT, W>> implements Builders.OutputValues<KEY, OUT> {
    final ReduceFunctor<VALUE, OUT> reducer;
    final UnaryFunction<IN, VALUE> valueExtractor;

    @Nullable
    final BinaryFunction<VALUE, VALUE, Integer> valueComparator;

    /* loaded from: input_file:cz/seznam/euphoria/core/client/operator/ReduceByKey$CombiningReduceState.class */
    static class CombiningReduceState<E> implements State<E, E>, StateSupport.MergeFrom<CombiningReduceState<E>> {
        private static final ValueStorageDescriptor STORAGE_DESC = ValueStorageDescriptor.of("rbsk-value", Object.class, null);
        private final ReduceFunctor<E, E> reducer;
        private final ValueStorage<E> storage;
        private final SingleValueContext<E> context = new SingleValueContext<>();

        /* loaded from: input_file:cz/seznam/euphoria/core/client/operator/ReduceByKey$CombiningReduceState$Factory.class */
        static final class Factory<E> implements StateFactory<E, E, State<E, E>> {
            private final ReduceFunctor<E, E> r;

            Factory(ReduceFunctor<E, E> reduceFunctor) {
                this.r = (ReduceFunctor) Objects.requireNonNull(reduceFunctor);
            }

            @Override // cz.seznam.euphoria.core.client.operator.state.StateFactory
            public State<E, E> createState(StateContext stateContext, Collector<E> collector) {
                return new CombiningReduceState(stateContext.getStorageProvider(), this.r);
            }
        }

        CombiningReduceState(StorageProvider storageProvider, ReduceFunctor<E, E> reduceFunctor) {
            this.reducer = (ReduceFunctor) Objects.requireNonNull(reduceFunctor);
            this.storage = storageProvider.getValueStorage(STORAGE_DESC);
        }

        @Override // cz.seznam.euphoria.core.client.operator.state.State
        public void add(E e) {
            E e2 = this.storage.get();
            if (e2 == null) {
                this.storage.set(e);
            } else {
                this.reducer.apply(Stream.of(e2, e), this.context);
                this.storage.set(this.context.getAndResetValue());
            }
        }

        @Override // cz.seznam.euphoria.core.client.operator.state.State
        public void flush(Collector<E> collector) {
            collector.collect(this.storage.get());
        }

        @Override // cz.seznam.euphoria.core.client.operator.state.State
        public void close() {
            this.storage.clear();
        }

        @Override // cz.seznam.euphoria.core.client.operator.StateSupport.MergeFrom
        public void mergeFrom(CombiningReduceState<E> combiningReduceState) {
            add(combiningReduceState.storage.get());
        }
    }

    /* loaded from: input_file:cz/seznam/euphoria/core/client/operator/ReduceByKey$DatasetBuilder2.class */
    public static class DatasetBuilder2<IN, KEY> implements ReduceBy<IN, KEY, IN> {
        private final String name;
        private final Dataset<IN> input;
        private final UnaryFunction<IN, KEY> keyExtractor;

        DatasetBuilder2(String str, Dataset<IN> dataset, UnaryFunction<IN, KEY> unaryFunction) {
            this.name = (String) Objects.requireNonNull(str);
            this.input = (Dataset) Objects.requireNonNull(dataset);
            this.keyExtractor = (UnaryFunction) Objects.requireNonNull(unaryFunction);
        }

        public <VALUE> DatasetBuilder3<IN, KEY, VALUE> valueBy(UnaryFunction<IN, VALUE> unaryFunction) {
            return new DatasetBuilder3<>(this.name, this.input, this.keyExtractor, unaryFunction);
        }

        @Override // cz.seznam.euphoria.core.client.operator.ReduceByKey.ReduceBy
        public <OUT> SortableDatasetBuilder4<IN, KEY, IN, OUT> reduceBy(ReduceFunctor<IN, OUT> reduceFunctor) {
            return new SortableDatasetBuilder4<>(this.name, this.input, this.keyExtractor, obj -> {
                return obj;
            }, reduceFunctor, null);
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case -1751593632:
                    if (implMethodName.equals("lambda$reduceBy$1f547040$1")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/seznam/euphoria/core/client/functional/UnaryFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/seznam/euphoria/core/client/operator/ReduceByKey$DatasetBuilder2") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;")) {
                        return obj -> {
                            return obj;
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    /* loaded from: input_file:cz/seznam/euphoria/core/client/operator/ReduceByKey$DatasetBuilder3.class */
    public static class DatasetBuilder3<IN, KEY, VALUE> implements ReduceBy<IN, KEY, VALUE> {
        private final String name;
        private final Dataset<IN> input;
        private final UnaryFunction<IN, KEY> keyExtractor;
        private final UnaryFunction<IN, VALUE> valueExtractor;

        DatasetBuilder3(String str, Dataset<IN> dataset, UnaryFunction<IN, KEY> unaryFunction, UnaryFunction<IN, VALUE> unaryFunction2) {
            this.name = (String) Objects.requireNonNull(str);
            this.input = (Dataset) Objects.requireNonNull(dataset);
            this.keyExtractor = (UnaryFunction) Objects.requireNonNull(unaryFunction);
            this.valueExtractor = (UnaryFunction) Objects.requireNonNull(unaryFunction2);
        }

        @Override // cz.seznam.euphoria.core.client.operator.ReduceByKey.ReduceBy
        public <OUT> SortableDatasetBuilder4<IN, KEY, VALUE, OUT> reduceBy(ReduceFunctor<VALUE, OUT> reduceFunctor) {
            return new SortableDatasetBuilder4<>(this.name, this.input, this.keyExtractor, this.valueExtractor, reduceFunctor, null);
        }
    }

    /* loaded from: input_file:cz/seznam/euphoria/core/client/operator/ReduceByKey$DatasetBuilder4.class */
    public static class DatasetBuilder4<IN, KEY, VALUE, OUT> implements Builders.Output<Pair<KEY, OUT>>, Builders.OutputValues<KEY, OUT>, Builders.WindowBy<IN, DatasetBuilder4<IN, KEY, VALUE, OUT>> {
        final String name;
        final Dataset<IN> input;
        final UnaryFunction<IN, KEY> keyExtractor;
        final UnaryFunction<IN, VALUE> valueExtractor;
        final ReduceFunctor<VALUE, OUT> reducer;

        @Nullable
        final BinaryFunction<VALUE, VALUE, Integer> valuesComparator;

        DatasetBuilder4(String str, Dataset<IN> dataset, UnaryFunction<IN, KEY> unaryFunction, UnaryFunction<IN, VALUE> unaryFunction2, ReduceFunctor<VALUE, OUT> reduceFunctor, @Nullable BinaryFunction<VALUE, VALUE, Integer> binaryFunction) {
            this.name = (String) Objects.requireNonNull(str);
            this.input = (Dataset) Objects.requireNonNull(dataset);
            this.keyExtractor = (UnaryFunction) Objects.requireNonNull(unaryFunction);
            this.valueExtractor = (UnaryFunction) Objects.requireNonNull(unaryFunction2);
            this.reducer = (ReduceFunctor) Objects.requireNonNull(reduceFunctor);
            this.valuesComparator = binaryFunction;
        }

        @Override // cz.seznam.euphoria.core.client.operator.Builders.WindowBy
        public <W extends Window> DatasetBuilder5<IN, KEY, VALUE, OUT, W> windowBy(Windowing<IN, W> windowing) {
            return new DatasetBuilder5<>(this.name, this.input, this.keyExtractor, this.valueExtractor, this.reducer, (Windowing) Objects.requireNonNull(windowing), this.valuesComparator);
        }

        @Override // cz.seznam.euphoria.core.client.operator.Builders.Output
        public Dataset<Pair<KEY, OUT>> output() {
            return new DatasetBuilder5(this.name, this.input, this.keyExtractor, this.valueExtractor, this.reducer, null, this.valuesComparator).output();
        }
    }

    /* loaded from: input_file:cz/seznam/euphoria/core/client/operator/ReduceByKey$DatasetBuilder5.class */
    public static class DatasetBuilder5<IN, KEY, VALUE, OUT, W extends Window> extends DatasetBuilder4<IN, KEY, VALUE, OUT> implements Builders.OutputValues<KEY, OUT> {

        @Nullable
        private final Windowing<IN, W> windowing;

        DatasetBuilder5(String str, Dataset<IN> dataset, UnaryFunction<IN, KEY> unaryFunction, UnaryFunction<IN, VALUE> unaryFunction2, ReduceFunctor<VALUE, OUT> reduceFunctor, @Nullable Windowing<IN, W> windowing, @Nullable BinaryFunction<VALUE, VALUE, Integer> binaryFunction) {
            super(str, dataset, unaryFunction, unaryFunction2, reduceFunctor, binaryFunction);
            this.windowing = windowing;
        }

        @Override // cz.seznam.euphoria.core.client.operator.ReduceByKey.DatasetBuilder4, cz.seznam.euphoria.core.client.operator.Builders.Output
        public Dataset<Pair<KEY, OUT>> output() {
            Flow flow = this.input.getFlow();
            ReduceByKey reduceByKey = new ReduceByKey(this.name, flow, this.input, this.keyExtractor, this.valueExtractor, this.windowing, this.reducer, this.valuesComparator);
            flow.add(reduceByKey);
            return reduceByKey.output();
        }
    }

    /* loaded from: input_file:cz/seznam/euphoria/core/client/operator/ReduceByKey$KeyByBuilder.class */
    public static class KeyByBuilder<IN> implements Builders.KeyBy<IN> {
        private final String name;
        private final Dataset<IN> input;

        KeyByBuilder(String str, Dataset<IN> dataset) {
            this.name = (String) Objects.requireNonNull(str);
            this.input = (Dataset) Objects.requireNonNull(dataset);
        }

        @Override // cz.seznam.euphoria.core.client.operator.Builders.KeyBy
        public <KEY> DatasetBuilder2<IN, KEY> keyBy(UnaryFunction<IN, KEY> unaryFunction) {
            return new DatasetBuilder2<>(this.name, this.input, unaryFunction);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:cz/seznam/euphoria/core/client/operator/ReduceByKey$NonCombiningReduceState.class */
    public static class NonCombiningReduceState<IN, OUT> implements State<IN, OUT>, StateSupport.MergeFrom<NonCombiningReduceState<IN, OUT>> {
        private static final ListStorageDescriptor STORAGE_DESC = ListStorageDescriptor.of("values", Object.class);
        private final ReduceFunctor<IN, OUT> reducer;
        private final ListStorage<IN> reducibleValues;
        private final SpillTools spill;

        @Nullable
        private final BinaryFunction<IN, IN, Integer> comparator;

        /* loaded from: input_file:cz/seznam/euphoria/core/client/operator/ReduceByKey$NonCombiningReduceState$Factory.class */
        static final class Factory<IN, OUT> implements StateFactory<IN, OUT, NonCombiningReduceState<IN, OUT>> {
            private final ReduceFunctor<IN, OUT> r;
            private final BinaryFunction<IN, IN, Integer> comparator;

            Factory(ReduceFunctor<IN, OUT> reduceFunctor, @Nullable BinaryFunction<IN, IN, Integer> binaryFunction) {
                this.r = (ReduceFunctor) Objects.requireNonNull(reduceFunctor);
                this.comparator = binaryFunction;
            }

            @Override // cz.seznam.euphoria.core.client.operator.state.StateFactory
            public NonCombiningReduceState<IN, OUT> createState(StateContext stateContext, Collector<OUT> collector) {
                return new NonCombiningReduceState<>(stateContext, this.r, this.comparator);
            }
        }

        NonCombiningReduceState(StateContext stateContext, ReduceFunctor<IN, OUT> reduceFunctor, BinaryFunction<IN, IN, Integer> binaryFunction) {
            this.reducer = (ReduceFunctor) Objects.requireNonNull(reduceFunctor);
            this.comparator = binaryFunction;
            this.reducibleValues = stateContext.getStorageProvider().getListStorage(STORAGE_DESC);
            this.spill = stateContext.getSpillTools();
        }

        @Override // cz.seznam.euphoria.core.client.operator.state.State
        public void add(IN in) {
            this.reducibleValues.add(in);
        }

        @Override // cz.seznam.euphoria.core.client.operator.state.State
        public void flush(Collector<OUT> collector) {
            if (this.comparator == null) {
                this.reducer.apply(StreamSupport.stream(this.reducibleValues.get().spliterator(), false), collector);
                return;
            }
            try {
                BinaryFunction<IN, IN, Integer> binaryFunction = this.comparator;
                binaryFunction.getClass();
                ExternalIterable sorted = this.spill.sorted(this.reducibleValues.get(), binaryFunction::apply);
                Throwable th = null;
                try {
                    try {
                        this.reducer.apply(StreamSupport.stream(sorted.spliterator(), false), collector);
                        if (sorted != null) {
                            if (0 != 0) {
                                try {
                                    sorted.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                sorted.close();
                            }
                        }
                    } catch (Throwable th3) {
                        th = th3;
                        throw th3;
                    }
                } finally {
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }

        @Override // cz.seznam.euphoria.core.client.operator.state.State
        public void close() {
            this.reducibleValues.clear();
        }

        @Override // cz.seznam.euphoria.core.client.operator.StateSupport.MergeFrom
        public void mergeFrom(NonCombiningReduceState<IN, OUT> nonCombiningReduceState) {
            this.reducibleValues.addAll(nonCombiningReduceState.reducibleValues.get());
        }
    }

    /* loaded from: input_file:cz/seznam/euphoria/core/client/operator/ReduceByKey$OfBuilder.class */
    public static class OfBuilder implements Builders.Of {
        private final String name;

        OfBuilder(String str) {
            this.name = str;
        }

        @Override // cz.seznam.euphoria.core.client.operator.Builders.Of
        public <IN> KeyByBuilder<IN> of(Dataset<IN> dataset) {
            return new KeyByBuilder<>(this.name, dataset);
        }
    }

    /* loaded from: input_file:cz/seznam/euphoria/core/client/operator/ReduceByKey$ReduceBy.class */
    public interface ReduceBy<IN, KEY, VALUE> {
        default <OUT> SortableDatasetBuilder4<IN, KEY, VALUE, OUT> reduceBy(ReduceFunction<VALUE, OUT> reduceFunction) {
            return reduceBy((stream, collector) -> {
                collector.collect(reduceFunction.apply(stream));
            });
        }

        <OUT> SortableDatasetBuilder4<IN, KEY, VALUE, OUT> reduceBy(ReduceFunctor<VALUE, OUT> reduceFunctor);

        default DatasetBuilder4<IN, KEY, VALUE, VALUE> combineBy(CombinableReduceFunction<VALUE> combinableReduceFunction) {
            return reduceBy(ReduceByKey.toReduceFunctor(combinableReduceFunction));
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case -116735943:
                    if (implMethodName.equals("lambda$reduceBy$2c630e87$1")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("cz/seznam/euphoria/core/client/functional/ReduceFunctor") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Lcz/seznam/euphoria/core/client/io/Collector;)V") && serializedLambda.getImplClass().equals("cz/seznam/euphoria/core/client/operator/ReduceByKey$ReduceBy") && serializedLambda.getImplMethodSignature().equals("(Lcz/seznam/euphoria/core/client/functional/ReduceFunction;Ljava/util/stream/Stream;Lcz/seznam/euphoria/core/client/io/Collector;)V")) {
                        ReduceFunction reduceFunction = (ReduceFunction) serializedLambda.getCapturedArg(0);
                        return (stream, collector) -> {
                            collector.collect(reduceFunction.apply(stream));
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    /* loaded from: input_file:cz/seznam/euphoria/core/client/operator/ReduceByKey$SortableDatasetBuilder4.class */
    public static class SortableDatasetBuilder4<IN, KEY, VALUE, OUT> extends DatasetBuilder4<IN, KEY, VALUE, OUT> {
        SortableDatasetBuilder4(String str, Dataset<IN> dataset, UnaryFunction<IN, KEY> unaryFunction, UnaryFunction<IN, VALUE> unaryFunction2, ReduceFunctor<VALUE, OUT> reduceFunctor, @Nullable BinaryFunction<VALUE, VALUE, Integer> binaryFunction) {
            super(str, dataset, unaryFunction, unaryFunction2, reduceFunctor, binaryFunction);
        }

        public DatasetBuilder4<IN, KEY, VALUE, OUT> withSortedValues(BinaryFunction<VALUE, VALUE, Integer> binaryFunction) {
            return new SortableDatasetBuilder4(this.name, this.input, this.keyExtractor, this.valueExtractor, this.reducer, binaryFunction);
        }
    }

    public static <IN> KeyByBuilder<IN> of(Dataset<IN> dataset) {
        return new KeyByBuilder<>("ReduceByKey", dataset);
    }

    public static OfBuilder named(String str) {
        return new OfBuilder(str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReduceByKey(String str, Flow flow, Dataset<IN> dataset, UnaryFunction<IN, KEY> unaryFunction, UnaryFunction<IN, VALUE> unaryFunction2, @Nullable Windowing<IN, W> windowing, CombinableReduceFunction<OUT> combinableReduceFunction) {
        this(str, flow, dataset, unaryFunction, unaryFunction2, windowing, toReduceFunctor(combinableReduceFunction), null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReduceByKey(String str, Flow flow, Dataset<IN> dataset, UnaryFunction<IN, KEY> unaryFunction, UnaryFunction<IN, VALUE> unaryFunction2, @Nullable Windowing<IN, W> windowing, ReduceFunctor<VALUE, OUT> reduceFunctor, @Nullable BinaryFunction<VALUE, VALUE, Integer> binaryFunction) {
        super(str, flow, dataset, unaryFunction, windowing);
        this.reducer = reduceFunctor;
        this.valueExtractor = unaryFunction2;
        this.valueComparator = binaryFunction;
    }

    public ReduceFunctor<VALUE, OUT> getReducer() {
        return this.reducer;
    }

    public boolean isCombinable() {
        return this.reducer.isCombinable();
    }

    public UnaryFunction<IN, VALUE> getValueExtractor() {
        return this.valueExtractor;
    }

    @Override // cz.seznam.euphoria.core.client.operator.Operator
    public DAG<Operator<?, ?>> getBasicOps() {
        return DAG.of(new ReduceStateByKey(getName(), getFlow(), this.input, this.keyExtractor, this.valueExtractor, this.windowing, this.reducer.isCombinable() ? new CombiningReduceState.Factory(this.reducer) : new NonCombiningReduceState.Factory(this.reducer, this.valueComparator), new StateSupport.MergeFromStateMerger()));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <VALUE> ReduceFunctor<VALUE, VALUE> toReduceFunctor(final CombinableReduceFunction<VALUE> combinableReduceFunction) {
        return new ReduceFunctor<VALUE, VALUE>() { // from class: cz.seznam.euphoria.core.client.operator.ReduceByKey.1
            @Override // cz.seznam.euphoria.core.client.functional.ReduceFunctor
            public boolean isCombinable() {
                return true;
            }

            /* JADX WARN: Multi-variable type inference failed */
            @Override // cz.seznam.euphoria.core.client.functional.UnaryFunctor
            public void apply(Stream<VALUE> stream, Collector<VALUE> collector) {
                collector.collect(CombinableReduceFunction.this.apply(stream));
            }
        };
    }
}
