package cz.seznam.euphoria.flink.streaming.io;

import cz.seznam.euphoria.core.client.io.DataSink;
import cz.seznam.euphoria.core.client.io.Writer;
import cz.seznam.euphoria.flink.streaming.StreamingElement;
import java.io.Serializable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

/* loaded from: input_file:cz/seznam/euphoria/flink/streaming/io/DataSinkWrapper.class */
public class DataSinkWrapper<T> extends RichSinkFunction<StreamingElement<?, T>> implements Checkpointed {
    private DataSink<T> dataSink;
    private Writer<T> writer;

    public DataSinkWrapper(DataSink<T> dataSink) {
        this.dataSink = dataSink;
    }

    public void open(Configuration configuration) throws Exception {
        this.dataSink.initialize();
        this.writer = this.dataSink.openWriter(getRuntimeContext().getIndexOfThisSubtask());
    }

    public void close() throws Exception {
        if (this.writer != null) {
            this.writer.flush();
            this.writer.commit();
            this.writer.close();
        }
    }

    public void invoke(StreamingElement<?, T> streamingElement) throws Exception {
        this.writer.write(streamingElement.getElement());
    }

    public Serializable snapshotState(long j, long j2) throws Exception {
        this.writer.flush();
        return this.dataSink;
    }

    public void restoreState(Serializable serializable) throws Exception {
        this.dataSink = (DataSink) serializable;
    }
}
