package org.apache.seatunnel.translation.spark.sink.write;

import java.io.IOException;
import java.util.Collections;
import javax.annotation.Nullable;
import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.translation.serialization.RowConverter;
import org.apache.seatunnel.translation.spark.serialization.InternalRowConverter;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.write.DataWriter;
import org.apache.spark.sql.connector.write.WriterCommitMessage;

/* loaded from: input_file:org/apache/seatunnel/translation/spark/sink/write/SeaTunnelSparkDataWriter.class */
public class SeaTunnelSparkDataWriter<CommitInfoT, StateT> implements DataWriter<InternalRow> {
    private final SinkWriter<SeaTunnelRow, CommitInfoT, StateT> sinkWriter;

    @Nullable
    private final SinkCommitter<CommitInfoT> sinkCommitter;
    private final RowConverter<InternalRow> rowConverter;
    private CommitInfoT latestCommitInfoT;
    private long epochId;

    public SeaTunnelSparkDataWriter(SinkWriter<SeaTunnelRow, CommitInfoT, StateT> sinkWriter, @Nullable SinkCommitter<CommitInfoT> sinkCommitter, SeaTunnelDataType<?> seaTunnelDataType, long j) {
        this.sinkWriter = sinkWriter;
        this.sinkCommitter = sinkCommitter;
        this.rowConverter = new InternalRowConverter(seaTunnelDataType);
        this.epochId = j == 0 ? 1L : j;
    }

    public void write(InternalRow internalRow) throws IOException {
        this.sinkWriter.write(this.rowConverter.reconvert(internalRow));
    }

    public WriterCommitMessage commit() throws IOException {
        this.sinkWriter.prepareCommit().ifPresent(obj -> {
            this.latestCommitInfoT = obj;
        });
        SinkWriter<SeaTunnelRow, CommitInfoT, StateT> sinkWriter = this.sinkWriter;
        long j = this.epochId;
        this.epochId = j + 1;
        sinkWriter.snapshotState(j);
        if (this.sinkCommitter != null) {
            if (this.latestCommitInfoT == null) {
                this.sinkCommitter.commit(Collections.emptyList());
            } else {
                this.sinkCommitter.commit(Collections.singletonList(this.latestCommitInfoT));
            }
        }
        SeaTunnelSparkWriterCommitMessage seaTunnelSparkWriterCommitMessage = new SeaTunnelSparkWriterCommitMessage(this.latestCommitInfoT);
        cleanCommitInfo();
        this.sinkWriter.close();
        return seaTunnelSparkWriterCommitMessage;
    }

    public void abort() throws IOException {
        this.sinkWriter.abortPrepare();
        if (this.sinkCommitter != null) {
            if (this.latestCommitInfoT == null) {
                this.sinkCommitter.abort(Collections.emptyList());
            } else {
                this.sinkCommitter.abort(Collections.singletonList(this.latestCommitInfoT));
            }
        }
        cleanCommitInfo();
    }

    private void cleanCommitInfo() {
        this.latestCommitInfoT = null;
    }

    public void close() throws IOException {
    }
}
