package cz.seznam.euphoria.flink.streaming.windowing;

import cz.seznam.euphoria.core.client.dataset.windowing.MergingWindowing;
import cz.seznam.euphoria.core.client.dataset.windowing.TimedWindow;
import cz.seznam.euphoria.core.client.dataset.windowing.Window;
import cz.seznam.euphoria.core.client.dataset.windowing.Windowing;
import cz.seznam.euphoria.core.client.operator.state.ListStorage;
import cz.seznam.euphoria.core.client.operator.state.ListStorageDescriptor;
import cz.seznam.euphoria.core.client.operator.state.MergingStorageDescriptor;
import cz.seznam.euphoria.core.client.operator.state.State;
import cz.seznam.euphoria.core.client.operator.state.StateFactory;
import cz.seznam.euphoria.core.client.operator.state.StateMerger;
import cz.seznam.euphoria.core.client.operator.state.StorageDescriptor;
import cz.seznam.euphoria.core.client.operator.state.ValueStorage;
import cz.seznam.euphoria.core.client.operator.state.ValueStorageDescriptor;
import cz.seznam.euphoria.core.client.triggers.Trigger;
import cz.seznam.euphoria.core.client.triggers.TriggerContext;
import cz.seznam.euphoria.core.client.util.Pair;
import cz.seznam.euphoria.core.util.Settings;
import cz.seznam.euphoria.flink.accumulators.AbstractCollector;
import cz.seznam.euphoria.flink.accumulators.FlinkAccumulatorFactory;
import cz.seznam.euphoria.flink.storage.Descriptors;
import cz.seznam.euphoria.flink.streaming.StreamingElement;
import cz.seznam.euphoria.shaded.guava.com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

/* loaded from: input_file:cz/seznam/euphoria/flink/streaming/windowing/AbstractWindowOperator.class */
public abstract class AbstractWindowOperator<I, KEY, WID extends Window> extends AbstractStreamOperator<StreamingElement<WID, Pair<?, ?>>> implements OneInputStreamOperator<I, StreamingElement<WID, Pair<?, ?>>>, Triggerable<KEY, WID> {
    private final Windowing<?, WID> windowing;
    private final Trigger<WID> trigger;
    private final StateFactory<?, ?, State<?, ?>> stateFactory;
    private final StateMerger<?, ?, State<?, ?>> stateCombiner;
    private final boolean localMode;
    private final boolean allowEarlyEmitting;
    private final int descriptorsCacheMaxSize;
    private final FlinkAccumulatorFactory accumulatorFactory;
    private final Settings settings;
    private transient InternalTimerService<WID> timerService;
    private transient InternalTimerService<WID> endOfStreamTimerService;
    private transient AbstractWindowOperator<I, KEY, WID>.TriggerContextAdapter triggerContext;
    private transient AbstractWindowOperator<I, KEY, WID>.OutputCollector outputContext;
    private transient WindowedStorageProvider storageProvider;
    private transient ListStateDescriptor<Tuple2<WID, WID>> mergingWindowsDescriptor;
    private transient TypeSerializer<WID> windowSerializer;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:cz/seznam/euphoria/flink/streaming/windowing/AbstractWindowOperator$OutputCollector.class */
    public class OutputCollector extends AbstractCollector {
        private Object key;
        private Window window;
        private final StreamRecord reuse;

        public OutputCollector(FlinkAccumulatorFactory flinkAccumulatorFactory, Settings settings, RuntimeContext runtimeContext) {
            super(flinkAccumulatorFactory, settings, runtimeContext);
            this.reuse = new StreamRecord((Object) null);
        }

        public void collect(Object obj) {
            AbstractWindowOperator.this.output.collect(this.reuse.replace(new StreamingElement(this.window, Pair.of(this.key, obj)), this.window instanceof TimedWindow ? this.window.maxTimestamp() : AbstractWindowOperator.this.timerService.currentWatermark()));
        }

        public Object getWindow() {
            return this.window;
        }

        public void setWindow(Window window) {
            this.window = window;
        }

        public void setKey(Object obj) {
            this.key = obj;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:cz/seznam/euphoria/flink/streaming/windowing/AbstractWindowOperator$TriggerContextAdapter.class */
    public class TriggerContextAdapter implements TriggerContext, TriggerContext.TriggerMergeContext {
        private WID window;
        private Collection<WID> mergedWindows;

        private TriggerContextAdapter() {
        }

        public boolean registerTimer(long j, Window window) {
            if (j <= getCurrentTimestamp()) {
                return false;
            }
            AbstractWindowOperator.this.timerService.registerEventTimeTimer(window, j);
            return true;
        }

        public void deleteTimer(long j, Window window) {
            AbstractWindowOperator.this.timerService.deleteEventTimeTimer(window, j);
        }

        public long getCurrentTimestamp() {
            return AbstractWindowOperator.this.timerService.currentWatermark();
        }

        public <T> ValueStorage<T> getValueStorage(ValueStorageDescriptor<T> valueStorageDescriptor) {
            return AbstractWindowOperator.this.storageProvider.getValueStorage(valueStorageDescriptor);
        }

        public <T> ListStorage<T> getListStorage(ListStorageDescriptor<T> listStorageDescriptor) {
            return AbstractWindowOperator.this.storageProvider.getListStorage(listStorageDescriptor);
        }

        public void mergeStoredState(StorageDescriptor storageDescriptor) {
            try {
                Objects.requireNonNull(this.mergedWindows);
                if (!(storageDescriptor instanceof MergingStorageDescriptor)) {
                    throw new IllegalStateException("Storage descriptor '" + storageDescriptor.getName() + "' must support merging!");
                }
                if (this.mergedWindows.isEmpty() || !(storageDescriptor instanceof ValueStorageDescriptor.MergingValueStorageDescriptor)) {
                    throw new UnsupportedOperationException(storageDescriptor + " is not supported for merging yet!");
                }
                AbstractWindowOperator.this.getKeyedStateBackend().mergePartitionedStates(this.window, this.mergedWindows, AbstractWindowOperator.this.windowSerializer, Descriptors.from((ValueStorageDescriptor.MergingValueStorageDescriptor) storageDescriptor));
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void onMerge(Iterable<WID> iterable) {
            this.mergedWindows = Lists.newArrayList(iterable);
            AbstractWindowOperator.this.trigger.onMerge(this.window, this);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void setWindow(WID wid) {
            this.window = wid;
        }
    }

    public AbstractWindowOperator(Windowing<?, WID> windowing, StateFactory<?, ?, State<?, ?>> stateFactory, StateMerger<?, ?, State<?, ?>> stateMerger, boolean z, int i, boolean z2, FlinkAccumulatorFactory flinkAccumulatorFactory, Settings settings) {
        this.windowing = (Windowing) Objects.requireNonNull(windowing);
        this.trigger = windowing.getTrigger();
        this.stateFactory = (StateFactory) Objects.requireNonNull(stateFactory);
        this.stateCombiner = (StateMerger) Objects.requireNonNull(stateMerger);
        this.localMode = z;
        this.descriptorsCacheMaxSize = i;
        this.allowEarlyEmitting = z2;
        this.accumulatorFactory = (FlinkAccumulatorFactory) Objects.requireNonNull(flinkAccumulatorFactory);
        this.settings = (Settings) Objects.requireNonNull(settings);
    }

    public void open() throws Exception {
        super.open();
        this.windowSerializer = TypeExtractor.createTypeInfo(Window.class).createSerializer(getRuntimeContext().getExecutionConfig());
        this.timerService = getInternalTimerService("window-timers", this.windowSerializer, this);
        this.endOfStreamTimerService = getInternalTimerService("end-of-stream-timers", this.windowSerializer, this);
        this.triggerContext = new TriggerContextAdapter();
        this.outputContext = new OutputCollector(this.accumulatorFactory, this.settings, getRuntimeContext());
        this.storageProvider = new WindowedStorageProvider(getKeyedStateBackend(), this.windowSerializer, this.descriptorsCacheMaxSize);
        if (this.windowing instanceof MergingWindowing) {
            this.mergingWindowsDescriptor = new ListStateDescriptor<>("merging-window-set", new TupleSerializer(Tuple2.class, new TypeSerializer[]{this.windowSerializer, this.windowSerializer}));
        }
    }

    private void setupEnvironment(Object obj, WID wid) {
        this.outputContext.setKey(obj);
        this.outputContext.setWindow(wid);
        this.storageProvider.setWindow(wid);
    }

    protected abstract KeyedMultiWindowedElement<WID, KEY, ?> recordValue(StreamRecord<I> streamRecord) throws Exception;

    public void processElement(StreamRecord<I> streamRecord) throws Exception {
        if (streamRecord.getTimestamp() < this.timerService.currentWatermark()) {
            return;
        }
        KeyedMultiWindowedElement<WID, KEY, ?> recordValue = recordValue(streamRecord);
        if (!(this.windowing instanceof MergingWindowing)) {
            for (WID wid : recordValue.getWindows()) {
                setupEnvironment(getCurrentKey(), wid);
                if (this.localMode) {
                    this.endOfStreamTimerService.registerEventTimeTimer(wid, Long.MAX_VALUE);
                }
                State windowState = getWindowState(wid);
                windowState.add(recordValue.getValue());
                processTriggerResult(wid, windowState, this.trigger.onElement(streamRecord.getTimestamp(), wid, this.triggerContext), null);
            }
            return;
        }
        MergingWindowSet<WID> mergingWindowSet = getMergingWindowSet();
        Iterator<WID> it = recordValue.getWindows().iterator();
        while (it.hasNext()) {
            WID addWindow = mergingWindowSet.addWindow(it.next(), (window, iterable, window2, iterable2) -> {
                setupEnvironment(getCurrentKey(), window);
                this.triggerContext.setWindow(window);
                this.triggerContext.onMerge(iterable);
                Iterator it2 = iterable.iterator();
                while (it2.hasNext()) {
                    Window window = (Window) it2.next();
                    this.storageProvider.setWindow(window);
                    this.trigger.onClear(window, this.triggerContext);
                    removeWindow(window, null);
                }
                ArrayList arrayList = new ArrayList();
                iterable2.forEach(window2 -> {
                    arrayList.add(getWindowState(window2));
                });
                this.stateCombiner.merge(getWindowState(window2), arrayList);
                iterable2.forEach(window3 -> {
                    getWindowState(window3).close();
                });
            });
            setupEnvironment(getCurrentKey(), addWindow);
            if (this.localMode) {
                this.endOfStreamTimerService.registerEventTimeTimer(addWindow, Long.MAX_VALUE);
            }
            Trigger.TriggerResult onElement = this.trigger.onElement(streamRecord.getTimestamp(), addWindow, this.triggerContext);
            WID stateWindow = mergingWindowSet.getStateWindow(addWindow);
            setupEnvironment(getCurrentKey(), stateWindow);
            State windowState2 = getWindowState(stateWindow);
            windowState2.add(recordValue.getValue());
            processTriggerResult(addWindow, windowState2, onElement, mergingWindowSet);
        }
        mergingWindowSet.persist();
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void onEventTime(InternalTimer<KEY, WID> internalTimer) throws Exception {
        Window window = (Window) internalTimer.getNamespace();
        setupEnvironment(internalTimer.getKey(), window);
        Trigger.TriggerResult onTimer = internalTimer.getTimestamp() == Long.MAX_VALUE ? Trigger.TriggerResult.FLUSH_AND_PURGE : this.trigger.onTimer(internalTimer.getTimestamp(), window, this.triggerContext);
        MergingWindowSet mergingWindowSet = null;
        if (this.windowing instanceof MergingWindowing) {
            mergingWindowSet = getMergingWindowSet();
        }
        processTriggerResult(window, null, onTimer, mergingWindowSet);
        if (mergingWindowSet != null) {
            mergingWindowSet.persist();
        }
    }

    public void onProcessingTime(InternalTimer<KEY, WID> internalTimer) throws Exception {
        throw new UnsupportedOperationException("We are not using processing time at all");
    }

    public void processWatermark(Watermark watermark) throws Exception {
        this.timerService.advanceWatermark(watermark.getTimestamp());
        if (this.localMode) {
            this.endOfStreamTimerService.advanceWatermark(watermark.getTimestamp());
        }
        this.output.emitWatermark(watermark);
    }

    private void processTriggerResult(WID wid, @Nullable State state, Trigger.TriggerResult triggerResult, @Nullable MergingWindowSet<WID> mergingWindowSet) {
        if (triggerResult.isFlush() || triggerResult.isPurge()) {
            if (state == null) {
                if (this.windowing instanceof MergingWindowing) {
                    Objects.requireNonNull(mergingWindowSet);
                    state = getWindowState(mergingWindowSet.getStateWindow(wid));
                } else {
                    state = getWindowState(wid);
                }
            }
            if (triggerResult.isFlush()) {
                state.flush(this.outputContext);
            }
            if (triggerResult.isPurge()) {
                state.close();
                this.storageProvider.setWindow(wid);
                this.trigger.onClear(wid, this.triggerContext);
                removeWindow(wid, mergingWindowSet);
            }
        }
    }

    private State getWindowState(WID wid) {
        this.storageProvider.setWindow(wid);
        return this.stateFactory.createState(this.storageProvider, this.allowEarlyEmitting ? this.outputContext : null);
    }

    private MergingWindowSet<WID> getMergingWindowSet() {
        try {
            return new MergingWindowSet<>(this.windowing, getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, this.mergingWindowsDescriptor));
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void removeWindow(WID wid, @Nullable MergingWindowSet<WID> mergingWindowSet) {
        try {
            if (this.localMode) {
                this.endOfStreamTimerService.deleteEventTimeTimer(wid, Long.MAX_VALUE);
            }
            if (mergingWindowSet != null) {
                mergingWindowSet.removeWindow(wid);
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}
