package org.apache.shardingsphere.scaling.opengauss.component;

import java.nio.ByteBuffer;
import java.sql.Connection;
import java.sql.SQLException;
import lombok.Generated;
import org.apache.shardingsphere.scaling.core.common.channel.Channel;
import org.apache.shardingsphere.scaling.core.common.exception.ScalingTaskExecuteException;
import org.apache.shardingsphere.scaling.core.common.record.Column;
import org.apache.shardingsphere.scaling.core.common.record.DataRecord;
import org.apache.shardingsphere.scaling.core.common.record.Record;
import org.apache.shardingsphere.scaling.core.config.DumperConfiguration;
import org.apache.shardingsphere.scaling.core.config.datasource.StandardJDBCDataSourceConfiguration;
import org.apache.shardingsphere.scaling.core.executor.AbstractScalingExecutor;
import org.apache.shardingsphere.scaling.core.executor.dumper.IncrementalDumper;
import org.apache.shardingsphere.scaling.core.job.position.ScalingPosition;
import org.apache.shardingsphere.scaling.core.util.ThreadUtil;
import org.apache.shardingsphere.scaling.opengauss.wal.OpenGaussLogicalReplication;
import org.apache.shardingsphere.scaling.opengauss.wal.decode.MppdbDecodingPlugin;
import org.apache.shardingsphere.scaling.opengauss.wal.decode.OpenGaussLogSequenceNumber;
import org.apache.shardingsphere.scaling.opengauss.wal.decode.OpenGaussTimestampUtils;
import org.apache.shardingsphere.scaling.postgresql.wal.WalEventConverter;
import org.apache.shardingsphere.scaling.postgresql.wal.WalPosition;
import org.apache.shardingsphere.scaling.postgresql.wal.event.AbstractWalEvent;
import org.apache.shardingsphere.scaling.postgresql.wal.event.PlaceholderEvent;
import org.opengauss.jdbc.PgConnection;
import org.opengauss.replication.PGReplicationStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/shardingsphere/scaling/opengauss/component/OpenGaussWalDumper.class */
public final class OpenGaussWalDumper extends AbstractScalingExecutor implements IncrementalDumper {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(OpenGaussWalDumper.class);
    private final WalPosition walPosition;
    private final DumperConfiguration dumperConfig;
    private final WalEventConverter walEventConverter;
    private Channel channel;
    private final OpenGaussLogicalReplication logicalReplication = new OpenGaussLogicalReplication();
    private String slotName = OpenGaussLogicalReplication.SLOT_NAME_PREFIX;

    public OpenGaussWalDumper(DumperConfiguration dumperConfiguration, ScalingPosition<WalPosition> scalingPosition) {
        this.walPosition = (WalPosition) scalingPosition;
        if (!StandardJDBCDataSourceConfiguration.class.equals(dumperConfiguration.getDataSourceConfig().getClass())) {
            throw new UnsupportedOperationException("PostgreSQLWalDumper only support JDBCDataSourceConfiguration");
        }
        this.dumperConfig = dumperConfiguration;
        this.walEventConverter = new WalEventConverter(dumperConfiguration);
    }

    public void start() {
        super.start();
        dump();
    }

    private PgConnection getReplicationConn() throws SQLException {
        return (PgConnection) this.logicalReplication.createPgConnection((StandardJDBCDataSourceConfiguration) this.dumperConfig.getDataSourceConfig()).unwrap(PgConnection.class);
    }

    private MppdbDecodingPlugin initReplication() {
        MppdbDecodingPlugin mppdbDecodingPlugin = null;
        try {
            Connection connection = this.dumperConfig.getDataSourceConfig().toDataSource().getConnection();
            Throwable th = null;
            try {
                try {
                    this.slotName = OpenGaussLogicalReplication.getUniqueSlotName(connection);
                    OpenGaussLogicalReplication.createIfNotExists(connection);
                    mppdbDecodingPlugin = new MppdbDecodingPlugin(new OpenGaussTimestampUtils(((PgConnection) connection.unwrap(PgConnection.class)).getTimestampUtils()));
                    if (connection != null) {
                        if (0 != 0) {
                            try {
                                connection.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            connection.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } catch (SQLException e) {
            log.warn("create replication slot failed!");
        }
        return mppdbDecodingPlugin;
    }

    private void dump() {
        MppdbDecodingPlugin initReplication = initReplication();
        try {
            PgConnection replicationConn = getReplicationConn();
            Throwable th = null;
            try {
                try {
                    PGReplicationStream createReplicationStream = this.logicalReplication.createReplicationStream(replicationConn, this.walPosition.getLogSequenceNumber(), this.slotName);
                    while (isRunning()) {
                        ByteBuffer readPending = createReplicationStream.readPending();
                        if (null == readPending) {
                            ThreadUtil.sleep(10L);
                        } else {
                            AbstractWalEvent decode = initReplication.decode(readPending, new OpenGaussLogSequenceNumber(createReplicationStream.getLastReceiveLSN()));
                            Record convert = this.walEventConverter.convert(decode);
                            if (!(decode instanceof PlaceholderEvent) && log.isDebugEnabled()) {
                                log.debug("dump, event={}, record={}", decode, convert);
                            }
                            updateRecordOldValue(convert);
                            pushRecord(convert);
                        }
                    }
                    if (replicationConn != null) {
                        if (0 != 0) {
                            try {
                                replicationConn.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            replicationConn.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } catch (SQLException e) {
            if (!e.getMessage().contains("is already active")) {
                throw new ScalingTaskExecuteException(e);
            }
        }
    }

    private void updateRecordOldValue(Record record) {
        if (record instanceof DataRecord) {
            DataRecord dataRecord = (DataRecord) record;
            if ("UPDATE".equals(dataRecord.getType())) {
                for (Column column : dataRecord.getColumns()) {
                    if (column.isPrimaryKey() && column.isUpdated()) {
                        column.setOldValue(column.getValue());
                    }
                }
            }
        }
    }

    private void pushRecord(Record record) {
        try {
            this.channel.pushRecord(record);
        } catch (InterruptedException e) {
        }
    }

    @Generated
    public void setChannel(Channel channel) {
        this.channel = channel;
    }
}
