package com.oceanbase.connector.flink.sink;

import com.oceanbase.connector.flink.ConnectorOptions;
import com.oceanbase.connector.flink.sink.OceanBaseWriterEvent;
import com.oceanbase.connector.flink.table.DataChangeRecord;
import com.oceanbase.connector.flink.table.Record;
import com.oceanbase.connector.flink.table.RecordSerializationSchema;
import com.oceanbase.connector.flink.table.SchemaChangeRecord;
import com.oceanbase.connector.flink.table.TransactionRecord;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/oceanbase/connector/flink/sink/OceanBaseWriter.class */
public class OceanBaseWriter<T> implements SinkWriter<T> {
    private static final Logger LOG = LoggerFactory.getLogger(OceanBaseWriter.class);
    private final ConnectorOptions options;
    private final SinkWriterMetricGroup metricGroup;
    private final TypeSerializer<T> typeSerializer;
    private final RecordSerializationSchema<T> recordSerializer;
    private final DataChangeRecord.KeyExtractor keyExtractor;
    private final RecordFlusher recordFlusher;
    private final OceanBaseWriterEvent.Listener writerEventListener;
    private final transient ScheduledExecutorService scheduler;
    private final transient ScheduledFuture<?> scheduledFuture;
    private final AtomicReference<Record> currentRecord = new AtomicReference<>();
    private final Map<String, List<DataChangeRecord>> buffer = new HashMap();
    private final Map<String, Map<Object, DataChangeRecord>> reducedBuffer = new HashMap();
    private transient int bufferCount = 0;
    private volatile transient Exception flushException = null;
    private volatile transient boolean closed = false;

    public OceanBaseWriter(ConnectorOptions connectorOptions, Sink.InitContext initContext, TypeSerializer<T> typeSerializer, RecordSerializationSchema<T> recordSerializationSchema, DataChangeRecord.KeyExtractor keyExtractor, RecordFlusher recordFlusher, OceanBaseWriterEvent.Listener listener) {
        this.options = connectorOptions;
        this.metricGroup = initContext.metricGroup();
        this.typeSerializer = typeSerializer;
        this.recordSerializer = recordSerializationSchema;
        this.keyExtractor = keyExtractor;
        this.recordFlusher = recordFlusher;
        this.writerEventListener = listener;
        this.scheduler = (connectorOptions.getSyncWrite() || connectorOptions.getBufferFlushInterval() == 0) ? null : new ScheduledThreadPoolExecutor(1, (ThreadFactory) new ExecutorThreadFactory("OceanBaseWriter.scheduler"));
        this.scheduledFuture = this.scheduler == null ? null : this.scheduler.scheduleWithFixedDelay(() -> {
            if (this.closed) {
                return;
            }
            try {
                synchronized (this) {
                    flush(false);
                }
            } catch (Exception e) {
                this.flushException = e;
            }
        }, connectorOptions.getBufferFlushInterval(), connectorOptions.getBufferFlushInterval(), TimeUnit.MILLISECONDS);
        if (!connectorOptions.getSyncWrite() && keyExtractor == null) {
            throw new IllegalArgumentException("When 'sync-write' is not enabled, keyExtractor is required to construct the buffer key.");
        }
        if (listener != null) {
            listener.apply(OceanBaseWriterEvent.INITIALIZED);
        }
    }

    public synchronized void write(T t, SinkWriter.Context context) throws IOException, InterruptedException {
        checkFlushException();
        Record serialize = this.recordSerializer.serialize(copyIfNecessary(t));
        if (serialize == null) {
            return;
        }
        if ((this.options.getSyncWrite() && (serialize instanceof DataChangeRecord)) || (serialize instanceof SchemaChangeRecord) || (serialize instanceof TransactionRecord)) {
            while (!this.currentRecord.compareAndSet(null, serialize)) {
                flush(false);
            }
            flush(false);
        } else if (serialize instanceof DataChangeRecord) {
            DataChangeRecord dataChangeRecord = (DataChangeRecord) serialize;
            Object extract = this.keyExtractor.extract(dataChangeRecord);
            if (extract == null) {
                synchronized (this.buffer) {
                    this.buffer.computeIfAbsent(serialize.getTableId().identifier(), str -> {
                        return new ArrayList();
                    }).add(dataChangeRecord);
                }
            } else {
                synchronized (this.reducedBuffer) {
                    this.reducedBuffer.computeIfAbsent(serialize.getTableId().identifier(), str2 -> {
                        return new HashMap();
                    }).put(extract, dataChangeRecord);
                }
            }
            this.bufferCount++;
            if (this.bufferCount >= this.options.getBufferSize()) {
                flush(false);
            }
        } else {
            LOG.info("Discard unsupported record: {}", serialize);
        }
        this.metricGroup.getIOMetricGroup().getNumRecordsInCounter().inc();
    }

    protected void checkFlushException() {
        if (this.flushException != null) {
            throw new RuntimeException("Writing records to OceanBase failed.", this.flushException);
        }
    }

    private T copyIfNecessary(T t) {
        return this.typeSerializer == null ? t : (T) this.typeSerializer.copy(t);
    }

    public synchronized void flush(boolean z) throws IOException, InterruptedException {
        checkFlushException();
        for (int i = 0; i <= this.options.getMaxRetries(); i++) {
            try {
                if (!this.buffer.isEmpty()) {
                    synchronized (this.buffer) {
                        Iterator<Map.Entry<String, List<DataChangeRecord>>> it = this.buffer.entrySet().iterator();
                        while (it.hasNext()) {
                            this.recordFlusher.flush(it.next().getValue());
                            this.metricGroup.getIOMetricGroup().getNumRecordsOutCounter().inc(r0.size());
                        }
                        this.buffer.clear();
                    }
                }
                if (!this.reducedBuffer.isEmpty()) {
                    synchronized (this.reducedBuffer) {
                        Iterator<Map.Entry<String, Map<Object, DataChangeRecord>>> it2 = this.reducedBuffer.entrySet().iterator();
                        while (it2.hasNext()) {
                            this.recordFlusher.flush(new ArrayList(it2.next().getValue().values()));
                            this.metricGroup.getIOMetricGroup().getNumRecordsOutCounter().inc(r0.size());
                        }
                        this.reducedBuffer.clear();
                    }
                }
                this.bufferCount = 0;
                Record record = this.currentRecord.get();
                if (record == null) {
                    return;
                }
                if (record instanceof SchemaChangeRecord) {
                    this.recordFlusher.flush((SchemaChangeRecord) record);
                } else if (record instanceof TransactionRecord) {
                    this.recordFlusher.flush((TransactionRecord) record);
                } else if (record instanceof DataChangeRecord) {
                    this.recordFlusher.flush(Collections.singletonList((DataChangeRecord) record));
                } else {
                    LOG.info("Discard unsupported record: {}", record);
                }
                this.metricGroup.getIOMetricGroup().getNumRecordsOutCounter().inc();
                this.currentRecord.compareAndSet(record, null);
                return;
            } catch (Exception e) {
                LOG.error("OceanBaseWriter flush error, retry times = {}", Integer.valueOf(i), e);
                if (i >= this.options.getMaxRetries()) {
                    throw new IOException(e);
                }
                Thread.sleep(1000 * i);
            }
        }
    }

    public synchronized void close() {
        if (!this.closed) {
            this.closed = true;
            if (this.scheduledFuture != null) {
                this.scheduledFuture.cancel(false);
                this.scheduler.shutdown();
            }
            if (this.bufferCount > 0) {
                try {
                    flush(true);
                } catch (Exception e) {
                    LOG.warn("Writing records to OceanBase failed", e);
                    throw new RuntimeException("Writing records to OceanBase failed", e);
                }
            }
            if (this.writerEventListener != null) {
                this.writerEventListener.apply(OceanBaseWriterEvent.CLOSING);
            }
            try {
                if (this.recordFlusher != null) {
                    this.recordFlusher.close();
                }
            } catch (Exception e2) {
                LOG.warn("Close statement executor failed", e2);
            }
        }
        checkFlushException();
    }
}
