package cz.seznam.euphoria.flink.streaming.windowing;

import cz.seznam.euphoria.core.client.dataset.windowing.Window;
import cz.seznam.euphoria.core.client.operator.state.ListStorage;
import cz.seznam.euphoria.core.client.operator.state.ListStorageDescriptor;
import cz.seznam.euphoria.core.client.operator.state.StorageDescriptor;
import cz.seznam.euphoria.core.client.operator.state.StorageProvider;
import cz.seznam.euphoria.core.client.operator.state.ValueStorage;
import cz.seznam.euphoria.core.client.operator.state.ValueStorageDescriptor;
import cz.seznam.euphoria.flink.storage.Descriptors;
import cz.seznam.euphoria.flink.storage.FlinkListStorage;
import cz.seznam.euphoria.flink.storage.FlinkReducingValueStorage;
import cz.seznam.euphoria.flink.storage.FlinkValueStorage;
import java.util.IdentityHashMap;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.state.KeyedStateBackend;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:cz/seznam/euphoria/flink/streaming/windowing/WindowedStorageProvider.class */
public class WindowedStorageProvider<WID extends Window> implements StorageProvider {
    private final KeyedStateBackend keyedStateBackend;
    private final TypeSerializer<WID> windowSerializer;
    private Window window;
    private final IdentityHashMap<StorageDescriptor, StateDescriptor> storageToStateDescriptors = new IdentityHashMap<>();
    private final int descriptorsCacheMaxSize;

    /* JADX INFO: Access modifiers changed from: package-private */
    public WindowedStorageProvider(KeyedStateBackend keyedStateBackend, TypeSerializer<WID> typeSerializer, int i) {
        this.keyedStateBackend = keyedStateBackend;
        this.windowSerializer = typeSerializer;
        this.descriptorsCacheMaxSize = i;
    }

    public void setWindow(Window window) {
        this.window = window;
    }

    public <T> ValueStorage<T> getValueStorage(ValueStorageDescriptor<T> valueStorageDescriptor) {
        try {
            if (valueStorageDescriptor instanceof ValueStorageDescriptor.MergingValueStorageDescriptor) {
                ReducingStateDescriptor computeIfAbsent = this.storageToStateDescriptors.computeIfAbsent(valueStorageDescriptor, storageDescriptor -> {
                    return Descriptors.from((ValueStorageDescriptor.MergingValueStorageDescriptor) storageDescriptor);
                });
                validateStateDescriptorSize();
                return new FlinkReducingValueStorage(this.keyedStateBackend.getPartitionedState(this.window, this.windowSerializer, computeIfAbsent), valueStorageDescriptor.getDefaultValue(), this.window);
            }
            ValueStateDescriptor computeIfAbsent2 = this.storageToStateDescriptors.computeIfAbsent(valueStorageDescriptor, storageDescriptor2 -> {
                return Descriptors.from((ValueStorageDescriptor) storageDescriptor2);
            });
            validateStateDescriptorSize();
            return new FlinkValueStorage(this.keyedStateBackend.getPartitionedState(this.window, this.windowSerializer, computeIfAbsent2), this.window);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public <T> ListStorage<T> getListStorage(ListStorageDescriptor<T> listStorageDescriptor) {
        try {
            ListStateDescriptor computeIfAbsent = this.storageToStateDescriptors.computeIfAbsent(listStorageDescriptor, storageDescriptor -> {
                return Descriptors.from((ListStorageDescriptor) storageDescriptor);
            });
            validateStateDescriptorSize();
            return new FlinkListStorage(this.keyedStateBackend.getPartitionedState(this.window, this.windowSerializer, computeIfAbsent), this.window);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void validateStateDescriptorSize() {
        if (this.storageToStateDescriptors.size() > this.descriptorsCacheMaxSize) {
            throw new IllegalStateException("Too many state descriptors! Likely some of the storage descriptors are not declared as 'static final' and are generated for each element!");
        }
    }
}
