package com.marklogic.spark.writer;

import com.marklogic.client.DatabaseClient;
import com.marklogic.client.datamovement.DataMovementManager;
import com.marklogic.client.datamovement.WriteBatcher;
import com.marklogic.client.io.Format;
import com.marklogic.client.io.StringHandle;
import com.marklogic.spark.Util;
import java.io.IOException;
import java.io.StringWriter;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.json.JacksonGenerator;
import org.apache.spark.sql.connector.write.DataWriter;
import org.apache.spark.sql.connector.write.WriterCommitMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/marklogic/spark/writer/MarkLogicDataWriter.class */
class MarkLogicDataWriter implements DataWriter<InternalRow> {
    private static final Logger logger = LoggerFactory.getLogger((Class<?>) MarkLogicDataWriter.class);
    private final WriteContext writeContext;
    private final DatabaseClient databaseClient;
    private final DataMovementManager dataMovementManager;
    private final WriteBatcher writeBatcher;
    private final DocBuilder docBuilder;
    private final int partitionId;
    private final long taskId;
    private final long epochId;
    private final AtomicReference<Throwable> writeFailure = new AtomicReference<>();
    private int docCount;

    /* loaded from: input_file:com/marklogic/spark/writer/MarkLogicDataWriter$MarkLogicCommitMessage.class */
    private static class MarkLogicCommitMessage implements WriterCommitMessage {
        private int docCount;
        private int partitionId;
        private long taskId;
        private long epochId;

        public MarkLogicCommitMessage(int i, int i2, long j, long j2) {
            this.docCount = i;
            this.partitionId = i2;
            this.taskId = j;
            this.epochId = j2;
        }

        public String toString() {
            return this.epochId != 0 ? String.format("[partitionId: %d; taskId: %d; epochId: %d]; docCount: %d", Integer.valueOf(this.partitionId), Long.valueOf(this.taskId), Long.valueOf(this.epochId), Integer.valueOf(this.docCount)) : String.format("[partitionId: %d; taskId: %d]; docCount: %d", Integer.valueOf(this.partitionId), Long.valueOf(this.taskId), Integer.valueOf(this.docCount));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MarkLogicDataWriter(WriteContext writeContext, int i, long j, long j2) {
        this.writeContext = writeContext;
        this.partitionId = i;
        this.taskId = j;
        this.epochId = j2;
        this.docBuilder = this.writeContext.newDocBuilder();
        this.databaseClient = writeContext.connectToMarkLogic();
        this.dataMovementManager = this.databaseClient.newDataMovementManager();
        this.writeBatcher = writeContext.newWriteBatcher(this.dataMovementManager);
        if (writeContext.isAbortOnFailure()) {
            this.writeBatcher.onBatchFailure((writeBatch, th) -> {
                this.writeFailure.compareAndSet(null, th);
            });
        }
        this.dataMovementManager.startJob(this.writeBatcher);
    }

    public void write(InternalRow internalRow) throws IOException {
        throwWriteFailureIfExists();
        this.writeBatcher.add(this.docBuilder.build(new StringHandle(convertRowToJSONString(internalRow)).withFormat(Format.JSON)));
        this.docCount++;
    }

    public WriterCommitMessage commit() throws IOException {
        MarkLogicCommitMessage markLogicCommitMessage = new MarkLogicCommitMessage(this.docCount, this.partitionId, this.taskId, this.epochId);
        if (logger.isDebugEnabled()) {
            logger.debug("Committing {}", markLogicCommitMessage);
        }
        this.writeBatcher.flushAndWait();
        throwWriteFailureIfExists();
        return markLogicCommitMessage;
    }

    public void abort() {
        logger.warn("Abort called; stopping job");
        stopJobAndRelease();
    }

    public void close() {
        logger.info("Close called; stopping job");
        stopJobAndRelease();
    }

    private String convertRowToJSONString(InternalRow internalRow) {
        StringWriter stringWriter = new StringWriter();
        JacksonGenerator jacksonGenerator = new JacksonGenerator(this.writeContext.getSchema(), stringWriter, Util.DEFAULT_JSON_OPTIONS);
        jacksonGenerator.write(internalRow);
        jacksonGenerator.flush();
        return stringWriter.toString();
    }

    private synchronized void throwWriteFailureIfExists() throws IOException {
        if (this.writeFailure.get() != null) {
            throw new IOException(this.writeFailure.get().getMessage());
        }
    }

    private void stopJobAndRelease() {
        if (this.writeBatcher != null && this.dataMovementManager != null) {
            this.dataMovementManager.stopJob(this.writeBatcher);
        }
        if (this.databaseClient != null) {
            this.databaseClient.release();
        }
    }
}
