package com.oceanbase.connector.flink.sink;

import java.io.Serializable;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SerializableFunction;

/* loaded from: input_file:com/oceanbase/connector/flink/sink/AbstractDynamicTableSink.class */
public abstract class AbstractDynamicTableSink implements DynamicTableSink {
    protected final ResolvedSchema physicalSchema;

    /* loaded from: input_file:com/oceanbase/connector/flink/sink/AbstractDynamicTableSink$SinkProvider.class */
    static class SinkProvider implements DataStreamSinkProvider, Serializable {
        private static final long serialVersionUID = 1;
        private final SerializableFunction<TypeSerializer<RowData>, Sink<RowData>> sinkSupplier;

        public SinkProvider(SerializableFunction<TypeSerializer<RowData>, Sink<RowData>> serializableFunction) {
            this.sinkSupplier = serializableFunction;
        }

        public DataStreamSink<?> consumeDataStream(ProviderContext providerContext, DataStream<RowData> dataStream) {
            return dataStream.sinkTo((Sink) this.sinkSupplier.apply(dataStream.getExecutionEnvironment().getConfig().isObjectReuseEnabled() ? dataStream.getType().createSerializer(dataStream.getExecutionConfig()) : null));
        }
    }

    public AbstractDynamicTableSink(ResolvedSchema resolvedSchema) {
        this.physicalSchema = resolvedSchema;
    }

    public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
        validatePrimaryKey(changelogMode);
        return ChangelogMode.newBuilder().addContainedKind(RowKind.INSERT).addContainedKind(RowKind.DELETE).addContainedKind(RowKind.UPDATE_AFTER).build();
    }

    protected void validatePrimaryKey(ChangelogMode changelogMode) {
        Preconditions.checkState(ChangelogMode.insertOnly().equals(changelogMode) || this.physicalSchema.getPrimaryKey().isPresent(), "please declare primary key for sink table when query contains update/delete record.");
    }
}
