package org.apache.paimon.flink.sink.index;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import java.util.function.BiConsumer;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.RowCompactedSerializer;
import org.apache.paimon.flink.lookup.RocksDBStateFactory;
import org.apache.paimon.flink.lookup.RocksDBValueState;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.AbstractFileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.PartitionKeyExtractor;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.FileIOUtils;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.IDMapping;
import org.apache.paimon.utils.PositiveIntInt;
import org.apache.paimon.utils.PositiveIntIntSerializer;
import org.apache.paimon.utils.SerBiFunction;
import org.apache.paimon.utils.SerializableFunction;

/* loaded from: input_file:org/apache/paimon/flink/sink/index/GlobalIndexAssigner.class */
public class GlobalIndexAssigner<T> implements Serializable {
    private static final long serialVersionUID = 1;
    private final AbstractFileStoreTable table;
    private final SerializableFunction<TableSchema, PartitionKeyExtractor<T>> extractorFunction;
    private final SerializableFunction<TableSchema, PartitionKeyExtractor<T>> keyPartExtractorFunction;
    private final SerBiFunction<T, BinaryRow, T> setPartition;
    private final SerBiFunction<T, RowKind, T> setRowKind;
    private transient int targetBucketRowNumber;
    private transient int assignId;
    private transient BiConsumer<T, Integer> collector;
    private transient int numAssigners;
    private transient PartitionKeyExtractor<T> extractor;
    private transient PartitionKeyExtractor<T> keyPartExtractor;
    private transient File path;
    private transient RocksDBStateFactory stateFactory;
    private transient RocksDBValueState<InternalRow, PositiveIntInt> keyIndex;
    private transient IDMapping<BinaryRow> partMapping;
    private transient BucketAssigner bucketAssigner;
    private transient ExistsAction existsAction;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/paimon/flink/sink/index/GlobalIndexAssigner$BucketAssigner.class */
    public static class BucketAssigner {
        private final Map<BinaryRow, TreeMap<Integer, Integer>> stats;

        private BucketAssigner() {
            this.stats = new HashMap();
        }

        public int assignBucket(BinaryRow binaryRow, Filter<Integer> filter, int i) {
            TreeMap<Integer, Integer> bucketMap = bucketMap(binaryRow);
            for (Map.Entry<Integer, Integer> entry : bucketMap.entrySet()) {
                int intValue = entry.getKey().intValue();
                int intValue2 = entry.getValue().intValue();
                if (filter.test(Integer.valueOf(intValue)) && intValue2 < i) {
                    bucketMap.put(Integer.valueOf(intValue), Integer.valueOf(intValue2 + 1));
                    return intValue;
                }
            }
            int i2 = 0;
            while (true) {
                if (filter.test(Integer.valueOf(i2)) && !bucketMap.containsKey(Integer.valueOf(i2))) {
                    bucketMap.put(Integer.valueOf(i2), 1);
                    return i2;
                }
                i2++;
            }
        }

        public void decrement(BinaryRow binaryRow, int i) {
            bucketMap(binaryRow).compute(Integer.valueOf(i), (num, num2) -> {
                return Integer.valueOf(num2 == null ? 0 : num2.intValue() - 1);
            });
        }

        private TreeMap<Integer, Integer> bucketMap(BinaryRow binaryRow) {
            TreeMap<Integer, Integer> treeMap = this.stats.get(binaryRow);
            if (treeMap == null) {
                treeMap = new TreeMap<>();
                this.stats.put(binaryRow.copy(), treeMap);
            }
            return treeMap;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/paimon/flink/sink/index/GlobalIndexAssigner$ExistsAction.class */
    public enum ExistsAction {
        DELETE,
        USE_OLD,
        SKIP_NEW
    }

    public GlobalIndexAssigner(Table table, SerializableFunction<TableSchema, PartitionKeyExtractor<T>> serializableFunction, SerializableFunction<TableSchema, PartitionKeyExtractor<T>> serializableFunction2, SerBiFunction<T, BinaryRow, T> serBiFunction, SerBiFunction<T, RowKind, T> serBiFunction2) {
        this.table = (AbstractFileStoreTable) table;
        this.extractorFunction = serializableFunction;
        this.keyPartExtractorFunction = serializableFunction2;
        this.setPartition = serBiFunction;
        this.setRowKind = serBiFunction2;
    }

    public void open(File file, int i, int i2, BiConsumer<T, Integer> biConsumer) throws Exception {
        this.numAssigners = i;
        this.assignId = i2;
        this.collector = biConsumer;
        CoreOptions coreOptions = this.table.coreOptions();
        this.targetBucketRowNumber = (int) coreOptions.dynamicBucketTargetRowNum();
        this.extractor = this.extractorFunction.apply(this.table.schema());
        this.keyPartExtractor = this.keyPartExtractorFunction.apply(this.table.schema());
        Options configuration = coreOptions.toConfiguration();
        this.path = new File(file, "lookup-" + UUID.randomUUID());
        this.stateFactory = new RocksDBStateFactory(this.path.toString(), configuration);
        this.keyIndex = this.stateFactory.valueState("keyIndex", new RowCompactedSerializer(this.table.schema().logicalTrimmedPrimaryKeysType()), new PositiveIntIntSerializer(), ((MemorySize) configuration.get(CoreOptions.LOOKUP_CACHE_MAX_MEMORY_SIZE)).getBytes());
        this.partMapping = new IDMapping<>((v0) -> {
            return v0.copy();
        });
        this.bucketAssigner = new BucketAssigner();
        this.existsAction = fromMergeEngine(coreOptions.mergeEngine());
    }

    public void process(T t) throws Exception {
        BinaryRow partition = this.extractor.partition(t);
        BinaryRow trimmedPrimaryKey = this.extractor.trimmedPrimaryKey(t);
        int index = this.partMapping.index(partition);
        PositiveIntInt positiveIntInt = this.keyIndex.get((RocksDBValueState<InternalRow, PositiveIntInt>) trimmedPrimaryKey);
        if (positiveIntInt == null) {
            processNewRecord(partition, index, trimmedPrimaryKey, t);
            return;
        }
        int i1 = positiveIntInt.i1();
        int i2 = positiveIntInt.i2();
        if (i1 == index) {
            collect(t, i2);
            return;
        }
        switch (this.existsAction) {
            case DELETE:
                BinaryRow binaryRow = this.partMapping.get(i1);
                collect(this.setRowKind.apply(this.setPartition.apply(t, binaryRow), RowKind.DELETE), i2);
                this.bucketAssigner.decrement(binaryRow, i2);
                processNewRecord(partition, index, trimmedPrimaryKey, t);
                return;
            case USE_OLD:
                collect(this.setPartition.apply(t, this.partMapping.get(i1)), i2);
                return;
            case SKIP_NEW:
            default:
                return;
        }
    }

    public void bootstrap(T t) throws IOException {
        BinaryRow partition = this.keyPartExtractor.partition(t);
        this.keyIndex.put(this.keyPartExtractor.trimmedPrimaryKey(t), new PositiveIntInt(this.partMapping.index(partition), assignBucket(partition)));
    }

    private void processNewRecord(BinaryRow binaryRow, int i, BinaryRow binaryRow2, T t) throws IOException {
        int assignBucket = assignBucket(binaryRow);
        this.keyIndex.put(binaryRow2, new PositiveIntInt(i, assignBucket));
        collect(t, assignBucket);
    }

    private int assignBucket(BinaryRow binaryRow) {
        return this.bucketAssigner.assignBucket(binaryRow, (v1) -> {
            return isAssignBucket(v1);
        }, this.targetBucketRowNumber);
    }

    private boolean isAssignBucket(int i) {
        return computeAssignId(i) == this.assignId;
    }

    private int computeAssignId(int i) {
        return Math.abs(i % this.numAssigners);
    }

    private void collect(T t, int i) {
        this.collector.accept(t, Integer.valueOf(i));
    }

    public void close() throws IOException {
        if (this.stateFactory != null) {
            this.stateFactory.close();
            this.stateFactory = null;
        }
        if (this.path != null) {
            FileIOUtils.deleteDirectoryQuietly(this.path);
        }
    }

    private ExistsAction fromMergeEngine(CoreOptions.MergeEngine mergeEngine) {
        switch (mergeEngine) {
            case DEDUPLICATE:
                return ExistsAction.DELETE;
            case PARTIAL_UPDATE:
            case AGGREGATE:
                return ExistsAction.USE_OLD;
            case FIRST_ROW:
                return ExistsAction.SKIP_NEW;
            default:
                throw new UnsupportedOperationException("Unsupported engine: " + mergeEngine);
        }
    }
}
