package org.apache.paimon.flink.sink;

import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.codegen.CodeGenUtils;
import org.apache.paimon.codegen.Projection;
import org.apache.paimon.codegen.RecordComparator;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.flink.FlinkRowWrapper;
import org.apache.paimon.memory.HeapMemorySegmentPool;
import org.apache.paimon.mergetree.SortBufferWriteBuffer;
import org.apache.paimon.mergetree.compact.MergeFunction;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.ChangelogWithKeyTableUtils;
import org.apache.paimon.table.sink.SequenceGenerator;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.KeyComparatorSupplier;
import org.apache.paimon.utils.Preconditions;

/* loaded from: input_file:org/apache/paimon/flink/sink/LocalMergeOperator.class */
public class LocalMergeOperator extends AbstractStreamOperator<RowData> implements OneInputStreamOperator<RowData, RowData>, BoundedOneInput {
    private static final long serialVersionUID = 1;
    TableSchema schema;
    private transient Projection keyProjection;
    private transient RecordComparator keyComparator;
    private transient long recordCount;
    private transient SequenceGenerator sequenceGenerator;
    private transient MergeFunction<KeyValue> mergeFunction;
    private transient SortBufferWriteBuffer buffer;
    private transient long currentWatermark;
    private transient FlinkRowData reusedRowData;
    private transient boolean endOfInput;

    public LocalMergeOperator(TableSchema tableSchema) {
        Preconditions.checkArgument(tableSchema.primaryKeys().size() > 0, "LocalMergeOperator currently only support tables with primary keys");
        this.schema = tableSchema;
        setChainingStrategy(ChainingStrategy.ALWAYS);
    }

    public void open() throws Exception {
        super.open();
        RowType addKeyNamePrefix = ChangelogWithKeyTableUtils.addKeyNamePrefix(this.schema.logicalPrimaryKeysType());
        RowType logicalRowType = this.schema.logicalRowType();
        CoreOptions coreOptions = new CoreOptions(this.schema.options());
        this.keyProjection = CodeGenUtils.newProjection(this.schema.logicalRowType(), this.schema.projection(this.schema.primaryKeys()));
        this.keyComparator = new KeyComparatorSupplier(addKeyNamePrefix).get();
        this.recordCount = 0L;
        this.sequenceGenerator = SequenceGenerator.create(this.schema, coreOptions);
        this.mergeFunction = ChangelogWithKeyTableUtils.createMergeFunctionFactory(this.schema).create();
        this.buffer = new SortBufferWriteBuffer(addKeyNamePrefix, logicalRowType, new HeapMemorySegmentPool(coreOptions.localMergeBufferSize(), coreOptions.pageSize()), false, coreOptions.localSortMaxNumFileHandles(), null);
        this.currentWatermark = Long.MIN_VALUE;
        this.reusedRowData = new FlinkRowData(null);
        this.endOfInput = false;
    }

    public void processElement(StreamRecord<RowData> streamRecord) throws Exception {
        this.recordCount++;
        FlinkRowWrapper flinkRowWrapper = new FlinkRowWrapper((RowData) streamRecord.getValue());
        RowKind rowKind = flinkRowWrapper.getRowKind();
        flinkRowWrapper.setRowKind(RowKind.INSERT);
        BinaryRow apply = this.keyProjection.apply(flinkRowWrapper);
        long generate = this.sequenceGenerator == null ? this.recordCount : this.sequenceGenerator.generate(flinkRowWrapper);
        if (this.buffer.put(generate, rowKind, apply, flinkRowWrapper)) {
            return;
        }
        flushBuffer();
        if (this.buffer.put(generate, rowKind, apply, flinkRowWrapper)) {
            return;
        }
        flinkRowWrapper.setRowKind(rowKind);
        this.output.collect(streamRecord);
    }

    public void processWatermark(Watermark watermark) throws Exception {
        this.currentWatermark = watermark.getTimestamp();
    }

    public void prepareSnapshotPreBarrier(long j) throws Exception {
        if (this.endOfInput) {
            return;
        }
        flushBuffer();
    }

    public void endInput() throws Exception {
        this.endOfInput = true;
        flushBuffer();
    }

    public void close() throws Exception {
        if (this.buffer != null) {
            this.buffer.clear();
        }
        super.close();
    }

    private void flushBuffer() throws Exception {
        if (this.buffer.size() == 0) {
            return;
        }
        this.buffer.forEach(this.keyComparator, this.mergeFunction, null, keyValue -> {
            InternalRow value = keyValue.value();
            value.setRowKind(keyValue.valueKind());
            this.output.collect(new StreamRecord(this.reusedRowData.replace(value)));
        });
        this.buffer.clear();
        if (this.currentWatermark != Long.MIN_VALUE) {
            super.processWatermark(new Watermark(this.currentWatermark));
            this.currentWatermark = Long.MIN_VALUE;
        }
    }
}
