package cz.seznam.euphoria.flink.streaming;

import cz.seznam.euphoria.core.client.dataset.windowing.Windowing;
import cz.seznam.euphoria.core.client.functional.UnaryFunction;
import cz.seznam.euphoria.core.client.operator.ReduceStateByKey;
import cz.seznam.euphoria.core.client.operator.state.StateFactory;
import cz.seznam.euphoria.core.client.operator.state.StateMerger;
import cz.seznam.euphoria.core.client.util.Pair;
import cz.seznam.euphoria.core.util.Settings;
import cz.seznam.euphoria.flink.FlinkOperator;
import cz.seznam.euphoria.flink.functions.PartitionerWrapper;
import cz.seznam.euphoria.flink.streaming.windowing.AttachedWindowing;
import cz.seznam.euphoria.flink.streaming.windowing.KeyedMultiWindowedElement;
import cz.seznam.euphoria.flink.streaming.windowing.KeyedMultiWindowedElementWindowOperator;
import cz.seznam.euphoria.flink.streaming.windowing.StreamingElementWindowOperator;
import cz.seznam.euphoria.flink.streaming.windowing.WindowAssigner;
import cz.seznam.euphoria.shaded.guava.com.google.common.collect.Iterables;
import java.lang.invoke.SerializedLambda;
import java.util.Objects;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

/* loaded from: input_file:cz/seznam/euphoria/flink/streaming/ReduceStateByKeyTranslator.class */
class ReduceStateByKeyTranslator implements StreamingOperatorTranslator<ReduceStateByKey> {
    static final String CFG_VALUE_OF_AFTER_SHUFFLE_KEY = "euphoria.flink.streaming.windowing.only.after.shuffle";
    static final boolean CFG_VALUE_OF_AFTER_SHUFFLE_DEFAULT = false;
    static final String CFG_DESCRIPTORS_CACHE_SIZE_MAX_KEY = "euphoria.flink.streaming.descriptors.cache.max.size";
    static final int CFG_DESCRIPTORS_CACHE_MAX_SIZE_DEFAULT = 1000;
    static final String CFG_ALLOW_EARLY_EMITTING_KEY = "euphoria.flink.streaming.allow.early.emitting";
    static final boolean CFG_ALLOW_EARLY_EMITTING_DEFAULT = false;
    private boolean valueOfAfterShuffle;
    private boolean allowEarlyEmitting;
    private int descriptorsCacheMaxSize;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:cz/seznam/euphoria/flink/streaming/ReduceStateByKeyTranslator$KeyedMultiWindowedElementKeyExtractor.class */
    public static class KeyedMultiWindowedElementKeyExtractor implements KeySelector<KeyedMultiWindowedElement, Object>, ResultTypeQueryable<Object> {
        private KeyedMultiWindowedElementKeyExtractor() {
        }

        public Object getKey(KeyedMultiWindowedElement keyedMultiWindowedElement) throws Exception {
            return keyedMultiWindowedElement.getKey();
        }

        public TypeInformation<Object> getProducedType() {
            return TypeInformation.of(Object.class);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:cz/seznam/euphoria/flink/streaming/ReduceStateByKeyTranslator$UnaryFunctionKeyExtractor.class */
    public static class UnaryFunctionKeyExtractor implements KeySelector<StreamingElement, Object>, ResultTypeQueryable<Object> {
        private final UnaryFunction keyExtractor;

        public UnaryFunctionKeyExtractor(UnaryFunction unaryFunction) {
            this.keyExtractor = (UnaryFunction) Objects.requireNonNull(unaryFunction);
        }

        public Object getKey(StreamingElement streamingElement) throws Exception {
            return this.keyExtractor.apply(streamingElement.getElement());
        }

        public TypeInformation<Object> getProducedType() {
            return TypeInformation.of(Object.class);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:cz/seznam/euphoria/flink/streaming/ReduceStateByKeyTranslator$WindowAssignerOperator.class */
    public static class WindowAssignerOperator extends AbstractStreamOperator<KeyedMultiWindowedElement> implements OneInputStreamOperator<StreamingElement, KeyedMultiWindowedElement> {
        private final WindowAssigner windowAssigner;

        private WindowAssignerOperator(WindowAssigner windowAssigner) {
            this.windowAssigner = windowAssigner;
            this.chainingStrategy = ChainingStrategy.ALWAYS;
        }

        public void processElement(StreamRecord<StreamingElement> streamRecord) throws Exception {
            streamRecord.replace(this.windowAssigner.apply((StreamRecord) streamRecord));
            this.output.collect(streamRecord);
        }
    }

    private void loadSettings(Settings settings) {
        this.valueOfAfterShuffle = settings.getBoolean(CFG_VALUE_OF_AFTER_SHUFFLE_KEY, false);
        this.descriptorsCacheMaxSize = settings.getInt(CFG_DESCRIPTORS_CACHE_SIZE_MAX_KEY, 1000);
        this.allowEarlyEmitting = settings.getBoolean(CFG_ALLOW_EARLY_EMITTING_KEY, false);
    }

    /* renamed from: translate, reason: avoid collision after fix types in other method */
    public DataStream<?> translate2(FlinkOperator<ReduceStateByKey> flinkOperator, StreamingExecutorContext streamingExecutorContext) {
        loadSettings(streamingExecutorContext.getSettings());
        DataStream dataStream = (DataStream) Iterables.getOnlyElement(streamingExecutorContext.getInputStreams(flinkOperator));
        ReduceStateByKey originalOperator = flinkOperator.getOriginalOperator();
        StateFactory stateFactory = originalOperator.getStateFactory();
        StateMerger stateMerger = originalOperator.getStateMerger();
        Windowing windowing = originalOperator.getWindowing();
        if (windowing == null) {
            windowing = new AttachedWindowing();
        }
        UnaryFunction keyExtractor = originalOperator.getKeyExtractor();
        WindowAssigner windowAssigner = new WindowAssigner(windowing, keyExtractor, originalOperator.getValueExtractor());
        SingleOutputStreamOperator parallelism = this.valueOfAfterShuffle ? dataStream.keyBy(new UnaryFunctionKeyExtractor(keyExtractor)).transform(flinkOperator.getName(), TypeInformation.of(StreamingElement.class), new StreamingElementWindowOperator(windowAssigner, windowing, stateFactory, stateMerger, streamingExecutorContext.isLocalMode(), this.descriptorsCacheMaxSize, this.allowEarlyEmitting, streamingExecutorContext.getAccumulatorFactory(), streamingExecutorContext.getSettings())).setParallelism(flinkOperator.getParallelism()) : dataStream.transform(flinkOperator.getName() + "::window-assigner", TypeInformation.of(KeyedMultiWindowedElement.class), new WindowAssignerOperator(windowAssigner)).setParallelism(dataStream.getParallelism()).keyBy(new KeyedMultiWindowedElementKeyExtractor()).transform(flinkOperator.getName(), TypeInformation.of(StreamingElement.class), new KeyedMultiWindowedElementWindowOperator(windowing, stateFactory, stateMerger, streamingExecutorContext.isLocalMode(), this.descriptorsCacheMaxSize, this.allowEarlyEmitting, streamingExecutorContext.getAccumulatorFactory(), streamingExecutorContext.getSettings())).setParallelism(flinkOperator.getParallelism());
        if (!originalOperator.getPartitioning().hasDefaultPartitioner()) {
            parallelism = parallelism.partitionCustom(new PartitionerWrapper(originalOperator.getPartitioning().getPartitioner()), streamingElement -> {
                return ((Pair) streamingElement.getElement()).getFirst();
            });
        }
        return parallelism;
    }

    @Override // cz.seznam.euphoria.flink.OperatorTranslator
    public /* bridge */ /* synthetic */ DataStream<?> translate(FlinkOperator flinkOperator, StreamingExecutorContext streamingExecutorContext) {
        return translate2((FlinkOperator<ReduceStateByKey>) flinkOperator, streamingExecutorContext);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1939921051:
                if (implMethodName.equals("lambda$translate$9480836c$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/java/functions/KeySelector") && serializedLambda.getFunctionalInterfaceMethodName().equals("getKey") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("cz/seznam/euphoria/flink/streaming/ReduceStateByKeyTranslator") && serializedLambda.getImplMethodSignature().equals("(Lcz/seznam/euphoria/flink/streaming/StreamingElement;)Ljava/lang/Object;")) {
                    return streamingElement -> {
                        return ((Pair) streamingElement.getElement()).getFirst();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
