package org.apache.paimon.flink.sink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import javax.annotation.Nullable;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.sink.StoreSinkWriteState;
import org.apache.paimon.memory.MemorySegmentPool;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.SinkRecord;
import org.apache.paimon.table.source.snapshot.FullCompactedStartingScanner;
import org.apache.paimon.utils.SnapshotManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/paimon/flink/sink/GlobalFullCompactionSinkWrite.class */
public class GlobalFullCompactionSinkWrite extends StoreSinkWriteImpl {
    private static final Logger LOG = LoggerFactory.getLogger(GlobalFullCompactionSinkWrite.class);
    private final int deltaCommits;
    private final String tableName;
    private final SnapshotManager snapshotManager;
    private final Set<Tuple2<BinaryRow, Integer>> currentWrittenBuckets;
    private final NavigableMap<Long, Set<Tuple2<BinaryRow, Integer>>> writtenBuckets;
    private static final String WRITTEN_BUCKETS_STATE_NAME = "paimon_written_buckets";
    private final TreeSet<Long> commitIdentifiersToCheck;

    public GlobalFullCompactionSinkWrite(FileStoreTable fileStoreTable, String str, StoreSinkWriteState storeSinkWriteState, IOManager iOManager, boolean z, boolean z2, int i, boolean z3, @Nullable MemorySegmentPool memorySegmentPool) {
        super(fileStoreTable, str, storeSinkWriteState, iOManager, z, z2, z3, memorySegmentPool);
        this.deltaCommits = i;
        this.tableName = fileStoreTable.name();
        this.snapshotManager = fileStoreTable.snapshotManager();
        this.currentWrittenBuckets = new HashSet();
        this.writtenBuckets = new TreeMap();
        List<StoreSinkWriteState.StateValue> list = storeSinkWriteState.get(this.tableName, WRITTEN_BUCKETS_STATE_NAME);
        if (list != null) {
            for (StoreSinkWriteState.StateValue stateValue : list) {
                ((Set) this.writtenBuckets.computeIfAbsent(Long.valueOf(bytesToLong(stateValue.value())), l -> {
                    return new HashSet();
                })).add(Tuple2.of(stateValue.partition(), Integer.valueOf(stateValue.bucket())));
            }
        }
        this.commitIdentifiersToCheck = new TreeSet<>();
    }

    @Override // org.apache.paimon.flink.sink.StoreSinkWriteImpl, org.apache.paimon.flink.sink.StoreSinkWrite
    public SinkRecord write(InternalRow internalRow) throws Exception {
        SinkRecord write = super.write(internalRow);
        touchBucket(write.partition(), write.bucket());
        return write;
    }

    @Override // org.apache.paimon.flink.sink.StoreSinkWriteImpl, org.apache.paimon.flink.sink.StoreSinkWrite
    public void compact(BinaryRow binaryRow, int i, boolean z) throws Exception {
        super.compact(binaryRow, i, z);
        touchBucket(binaryRow, i);
    }

    private void touchBucket(BinaryRow binaryRow, int i) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("touch partition {}, bucket {}", binaryRow, Integer.valueOf(i));
        }
        if (this.currentWrittenBuckets.contains(Tuple2.of(binaryRow, Integer.valueOf(i)))) {
            return;
        }
        this.currentWrittenBuckets.add(Tuple2.of(binaryRow.copy(), Integer.valueOf(i)));
    }

    @Override // org.apache.paimon.flink.sink.StoreSinkWriteImpl, org.apache.paimon.flink.sink.StoreSinkWrite
    public List<Committable> prepareCommit(boolean z, long j) throws IOException {
        checkSuccessfulFullCompaction();
        if (!this.currentWrittenBuckets.isEmpty()) {
            ((Set) this.writtenBuckets.computeIfAbsent(Long.valueOf(j), l -> {
                return new HashSet();
            })).addAll(this.currentWrittenBuckets);
            this.currentWrittenBuckets.clear();
        }
        if (LOG.isDebugEnabled()) {
            for (Map.Entry<Long, Set<Tuple2<BinaryRow, Integer>>> entry : this.writtenBuckets.entrySet()) {
                LOG.debug("Written buckets for checkpoint #{} are:", entry.getKey());
                for (Tuple2<BinaryRow, Integer> tuple2 : entry.getValue()) {
                    LOG.debug("  * partition {}, bucket {}", tuple2.f0, tuple2.f1);
                }
            }
        }
        if (!this.writtenBuckets.isEmpty() && FullCompactedStartingScanner.isFullCompactedIdentifier(j, this.deltaCommits)) {
            z = true;
        }
        if (z) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Submit full compaction for checkpoint #{}", Long.valueOf(j));
            }
            submitFullCompaction(j);
            this.commitIdentifiersToCheck.add(Long.valueOf(j));
        }
        return super.prepareCommit(z, j);
    }

    private void checkSuccessfulFullCompaction() {
        if (this.commitIdentifiersToCheck.isEmpty()) {
            return;
        }
        this.snapshotManager.traversalSnapshotsFromLatestSafely(this::checkSuccessfulFullCompaction);
    }

    private boolean checkSuccessfulFullCompaction(Snapshot snapshot) {
        if (!snapshot.commitUser().equals(this.commitUser) || snapshot.commitKind() != Snapshot.CommitKind.COMPACT) {
            return false;
        }
        long commitIdentifier = snapshot.commitIdentifier();
        if (!this.commitIdentifiersToCheck.contains(Long.valueOf(commitIdentifier))) {
            return false;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Found full compaction snapshot #{} with identifier {}", Long.valueOf(snapshot.id()), Long.valueOf(commitIdentifier));
        }
        this.writtenBuckets.headMap(Long.valueOf(commitIdentifier), true).clear();
        this.commitIdentifiersToCheck.headSet(Long.valueOf(commitIdentifier)).clear();
        return true;
    }

    private void submitFullCompaction(long j) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Submit full compaction for checkpoint #{}", Long.valueOf(j));
        }
        HashSet hashSet = new HashSet();
        this.writtenBuckets.forEach((l, set) -> {
            Iterator it = set.iterator();
            while (it.hasNext()) {
                Tuple2 tuple2 = (Tuple2) it.next();
                if (!hashSet.contains(tuple2)) {
                    hashSet.add(tuple2);
                    try {
                        this.write.compact((BinaryRow) tuple2.f0, ((Integer) tuple2.f1).intValue(), true);
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }
            }
        });
    }

    @Override // org.apache.paimon.flink.sink.StoreSinkWriteImpl, org.apache.paimon.flink.sink.StoreSinkWrite
    public void snapshotState() throws Exception {
        super.snapshotState();
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<Long, Set<Tuple2<BinaryRow, Integer>>> entry : this.writtenBuckets.entrySet()) {
            for (Tuple2<BinaryRow, Integer> tuple2 : entry.getValue()) {
                arrayList.add(new StoreSinkWriteState.StateValue((BinaryRow) tuple2.f0, ((Integer) tuple2.f1).intValue(), longToBytes(entry.getKey().longValue())));
            }
        }
        this.state.put(this.tableName, WRITTEN_BUCKETS_STATE_NAME, arrayList);
    }

    private static byte[] longToBytes(long j) {
        byte[] bArr = new byte[8];
        for (int i = 7; i >= 0; i--) {
            bArr[i] = (byte) (j & 255);
            j >>= 8;
        }
        return bArr;
    }

    private static long bytesToLong(byte[] bArr) {
        long j = 0;
        for (int i = 0; i < 8; i++) {
            j = (j << 8) | (bArr[i] & 255);
        }
        return j;
    }
}
