package org.apache.seatunnel.connectors.seatunnel.cassandra.sink;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.BatchStatement;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.connectors.seatunnel.cassandra.client.CassandraClient;
import org.apache.seatunnel.connectors.seatunnel.cassandra.config.CassandraParameters;
import org.apache.seatunnel.connectors.seatunnel.cassandra.exception.CassandraConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.cassandra.exception.CassandraConnectorException;
import org.apache.seatunnel.connectors.seatunnel.cassandra.util.TypeConvertUtil;
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/cassandra/sink/CassandraSinkWriter.class */
public class CassandraSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
    private static final Logger log = LoggerFactory.getLogger(CassandraSinkWriter.class);
    private final CassandraParameters cassandraParameters;
    private final SeaTunnelRowType seaTunnelRowType;
    private final ColumnDefinitions tableSchema;
    private final CqlSession session;
    private BatchStatement batchStatement;
    private final PreparedStatement preparedStatement;
    private final AtomicInteger counter = new AtomicInteger(0);
    private List<BoundStatement> boundStatementList = new ArrayList();
    private List<CompletionStage<AsyncResultSet>> completionStages = new ArrayList();

    public CassandraSinkWriter(CassandraParameters cassandraParameters, SeaTunnelRowType seaTunnelRowType, ColumnDefinitions columnDefinitions) {
        this.cassandraParameters = cassandraParameters;
        this.seaTunnelRowType = seaTunnelRowType;
        this.tableSchema = columnDefinitions;
        this.session = CassandraClient.getCqlSessionBuilder(cassandraParameters.getHost(), cassandraParameters.getKeyspace(), cassandraParameters.getUsername(), cassandraParameters.getPassword(), cassandraParameters.getDatacenter()).build();
        this.batchStatement = BatchStatement.builder(cassandraParameters.getBatchType()).build();
        this.preparedStatement = this.session.prepare(initPrepareCQL());
    }

    public void write(SeaTunnelRow seaTunnelRow) throws IOException {
        addIntoBatch(seaTunnelRow, this.preparedStatement.bind(new Object[0]));
        if (this.counter.getAndIncrement() >= this.cassandraParameters.getBatchSize().intValue()) {
            flush();
            this.counter.set(0);
        }
    }

    private void flush() {
        try {
            if (this.cassandraParameters.getAsyncWrite().booleanValue()) {
                this.completionStages.forEach(completionStage -> {
                    completionStage.whenComplete((asyncResultSet, th) -> {
                        if (th != null) {
                            log.error(ExceptionUtils.getMessage(th));
                        }
                    });
                });
                this.completionStages.clear();
                return;
            }
            try {
                this.session.execute(this.batchStatement.addAll(this.boundStatementList));
                this.batchStatement.clear();
                this.boundStatementList.clear();
            } catch (Exception e) {
                log.error("Batch insert error,Try inserting one by one!", e);
                Iterator<BoundStatement> it = this.boundStatementList.iterator();
                while (it.hasNext()) {
                    this.session.execute(it.next());
                }
                this.batchStatement.clear();
                this.boundStatementList.clear();
            }
        } catch (Throwable th) {
            this.batchStatement.clear();
            this.boundStatementList.clear();
            throw th;
        }
    }

    private void addIntoBatch(SeaTunnelRow seaTunnelRow, BoundStatement boundStatement) {
        for (int i = 0; i < this.cassandraParameters.getFields().size(); i++) {
            try {
                boundStatement = TypeConvertUtil.reconvertAndInject(boundStatement, i, this.tableSchema.get(i).getType(), seaTunnelRow.getField(this.seaTunnelRowType.indexOf(this.cassandraParameters.getFields().get(i))));
            } catch (Exception e) {
                throw new CassandraConnectorException(CassandraConnectorErrorCode.ADD_BATCH_DATA_FAILED, e);
            }
        }
        if (this.cassandraParameters.getAsyncWrite().booleanValue()) {
            this.completionStages.add(this.session.executeAsync(boundStatement));
        } else {
            this.boundStatementList.add(boundStatement);
        }
    }

    private String initPrepareCQL() {
        String[] strArr = new String[this.cassandraParameters.getFields().size()];
        Arrays.fill(strArr, "?");
        return String.format("INSERT INTO %s (%s) VALUES (%s)", this.cassandraParameters.getTable(), String.join(",", this.cassandraParameters.getFields()), String.join(",", strArr));
    }

    public void close() throws IOException {
        flush();
        try {
            if (this.session != null) {
                this.session.close();
            }
        } catch (Exception e) {
            throw new CassandraConnectorException(CassandraConnectorErrorCode.CLOSE_CQL_SESSION_FAILED, e);
        }
    }
}
