package com.oceanbase.connector.flink.sink;

import com.oceanbase.connector.flink.OBKVHBaseConnectorOptions;
import com.oceanbase.connector.flink.connection.OBKVHBaseConnectionProvider;
import com.oceanbase.connector.flink.shaded.org.apache.hadoop.hbase.client.Delete;
import com.oceanbase.connector.flink.shaded.org.apache.hadoop.hbase.client.HTableInterface;
import com.oceanbase.connector.flink.shaded.org.apache.hadoop.hbase.client.Put;
import com.oceanbase.connector.flink.shaded.org.apache.hadoop.hbase.util.Bytes;
import com.oceanbase.connector.flink.table.DataChangeRecord;
import com.oceanbase.connector.flink.table.HTableInfo;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.collections.CollectionUtils;

/* loaded from: input_file:com/oceanbase/connector/flink/sink/OBKVHBaseRecordFlusher.class */
public class OBKVHBaseRecordFlusher implements RecordFlusher {
    private static final long serialVersionUID = 1;
    private final OBKVHBaseConnectorOptions options;
    private final OBKVHBaseConnectionProvider connectionProvider;

    public OBKVHBaseRecordFlusher(OBKVHBaseConnectorOptions oBKVHBaseConnectorOptions) {
        this(oBKVHBaseConnectorOptions, new OBKVHBaseConnectionProvider(oBKVHBaseConnectorOptions));
    }

    public OBKVHBaseRecordFlusher(OBKVHBaseConnectorOptions oBKVHBaseConnectorOptions, OBKVHBaseConnectionProvider oBKVHBaseConnectionProvider) {
        this.options = oBKVHBaseConnectorOptions;
        this.connectionProvider = oBKVHBaseConnectionProvider;
    }

    @Override // com.oceanbase.connector.flink.sink.RecordFlusher
    public void flush(List<DataChangeRecord> list) throws Exception {
        if (list == null || list.isEmpty()) {
            return;
        }
        HTableInfo hTableInfo = (HTableInfo) list.get(0).getTable();
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        for (DataChangeRecord dataChangeRecord : list) {
            byte[] bArr = (byte[]) dataChangeRecord.getFieldValue(hTableInfo.getRowKeyName());
            for (String str : hTableInfo.getFamilyNames()) {
                Object fieldValue = dataChangeRecord.getFieldValue(str);
                if (fieldValue != null) {
                    List list2 = (List) ((Map) fieldValue).entrySet().stream().filter(entry -> {
                        return entry.getValue() != null;
                    }).collect(Collectors.toList());
                    if (!list2.isEmpty()) {
                        byte[] bytes = Bytes.toBytes(str);
                        if (dataChangeRecord.isUpsert()) {
                            Put put = new Put(bArr);
                            list2.forEach(entry2 -> {
                                put.add(bytes, Bytes.toBytes((String) entry2.getKey()), (byte[]) entry2.getValue());
                            });
                            hashMap.computeIfAbsent(bytes, bArr2 -> {
                                return new ArrayList();
                            }).add(put);
                        } else {
                            Delete delete = new Delete(bArr);
                            Iterator it = ((Map) fieldValue).entrySet().iterator();
                            while (it.hasNext()) {
                                delete.deleteColumn(bytes, Bytes.toBytes((String) ((Map.Entry) it.next()).getKey()));
                            }
                            list2.forEach(entry3 -> {
                                delete.deleteColumn(bytes, Bytes.toBytes((String) entry3.getKey()));
                            });
                            hashMap2.computeIfAbsent(bytes, bArr3 -> {
                                return new ArrayList();
                            }).add(delete);
                        }
                    }
                }
            }
        }
        flush(this.connectionProvider.getHTableClient(hTableInfo.getTableId()), hashMap, hashMap2);
    }

    private void flush(HTableInterface hTableInterface, Map<byte[], List<Put>> map, Map<byte[], List<Delete>> map2) throws Exception {
        for (List<Put> list : map.values()) {
            if (CollectionUtils.isNotEmpty(list)) {
                hTableInterface.put(list);
            }
        }
        for (List<Delete> list2 : map2.values()) {
            if (CollectionUtils.isNotEmpty(list2)) {
                hTableInterface.delete(list2);
            }
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        this.connectionProvider.close();
    }
}
