package cz.seznam.euphoria.flink.storage;

import cz.seznam.euphoria.core.client.dataset.windowing.Window;
import cz.seznam.euphoria.core.client.operator.state.ValueStorage;
import org.apache.flink.api.common.state.ReducingState;

/* loaded from: input_file:cz/seznam/euphoria/flink/storage/FlinkReducingValueStorage.class */
public class FlinkReducingValueStorage<T, W extends Window> implements ValueStorage<T> {
    private final ReducingState<T> state;
    private final T defaultValue;
    private final W window;

    public FlinkReducingValueStorage(ReducingState<T> reducingState, T t, W w) {
        this.state = reducingState;
        this.defaultValue = t;
        this.window = w;
    }

    public void set(T t) {
        setNamespace();
        try {
            this.state.clear();
            this.state.add(t);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public T get() {
        setNamespace();
        try {
            T t = (T) this.state.get();
            return t == null ? this.defaultValue : t;
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void clear() {
        setNamespace();
        this.state.clear();
    }

    private void setNamespace() {
        this.state.setCurrentNamespace(this.window);
    }
}
