package org.apache.seatunnel.connectors.seatunnel.pulsar.sink;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.apache.pulsar.client.api.MessageRoutingMode;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.SerializationSchema;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarClientConfig;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarConfigUtil;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.PulsarSemantics;
import org.apache.seatunnel.connectors.seatunnel.pulsar.config.SinkProperties;
import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.pulsar.exception.PulsarConnectorException;
import org.apache.seatunnel.connectors.seatunnel.pulsar.state.PulsarCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.pulsar.state.PulsarSinkState;
import org.apache.seatunnel.format.json.JsonSerializationSchema;
import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException;
import org.apache.seatunnel.format.text.TextSerializationSchema;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSinkWriter.class */
public class PulsarSinkWriter implements SinkWriter<SeaTunnelRow, PulsarCommitInfo, PulsarSinkState> {
    private SinkWriter.Context context;
    private Producer<byte[]> producer;
    private PulsarClient pulsarClient;
    private SerializationSchema serializationSchema;
    private SerializationSchema keySerializationSchema;
    private TransactionImpl transaction;
    private int transactionTimeout = ((Integer) SinkProperties.TRANSACTION_TIMEOUT.defaultValue()).intValue();
    private PulsarSemantics pulsarSemantics = (PulsarSemantics) SinkProperties.SEMANTICS.defaultValue();
    private final AtomicLong pendingMessages;

    public PulsarSinkWriter(SinkWriter.Context context, PulsarClientConfig pulsarClientConfig, SeaTunnelRowType seaTunnelRowType, ReadonlyConfig readonlyConfig, List<PulsarSinkState> list) {
        this.context = context;
        String str = (String) readonlyConfig.get(SinkProperties.TOPIC);
        String str2 = (String) readonlyConfig.get(SinkProperties.FORMAT);
        String str3 = (String) readonlyConfig.get(SinkProperties.FIELD_DELIMITER);
        Integer num = (Integer) readonlyConfig.get(SinkProperties.TRANSACTION_TIMEOUT);
        PulsarSemantics pulsarSemantics = (PulsarSemantics) readonlyConfig.get(SinkProperties.SEMANTICS);
        MessageRoutingMode messageRoutingMode = (MessageRoutingMode) readonlyConfig.get(SinkProperties.MESSAGE_ROUTING_MODE);
        this.serializationSchema = createSerializationSchema(seaTunnelRowType, str2, str3);
        this.keySerializationSchema = createKeySerializationSchema(getPartitionKeyFields(readonlyConfig, seaTunnelRowType), seaTunnelRowType);
        this.pulsarClient = PulsarConfigUtil.createClient(pulsarClientConfig, pulsarSemantics);
        if (PulsarSemantics.EXACTLY_ONCE == pulsarSemantics) {
            try {
                this.transaction = (TransactionImpl) PulsarConfigUtil.getTransaction(this.pulsarClient, num.intValue());
            } catch (Exception e) {
                throw new PulsarConnectorException(PulsarConnectorErrorCode.CREATE_TRANSACTION_FAILED, "Pulsar transaction create fail.");
            }
        }
        try {
            this.producer = PulsarConfigUtil.createProducer(this.pulsarClient, str, pulsarSemantics, readonlyConfig, messageRoutingMode);
            this.pendingMessages = new AtomicLong(0L);
        } catch (PulsarClientException e2) {
            throw new PulsarConnectorException(PulsarConnectorErrorCode.CREATE_PRODUCER_FAILED, "Pulsar Producer create fail.");
        }
    }

    public void write(SeaTunnelRow seaTunnelRow) throws IOException {
        byte[] serialize = this.serializationSchema.serialize(seaTunnelRow);
        byte[] bArr = null;
        if (this.keySerializationSchema != null) {
            bArr = this.keySerializationSchema.serialize(seaTunnelRow);
        }
        TypedMessageBuilder<byte[]> createTypedMessageBuilder = PulsarConfigUtil.createTypedMessageBuilder(this.producer, this.transaction);
        if (bArr != null) {
            createTypedMessageBuilder.keyBytes(bArr);
        }
        createTypedMessageBuilder.value(serialize);
        if (PulsarSemantics.NON == this.pulsarSemantics) {
            createTypedMessageBuilder.sendAsync();
        } else {
            this.pendingMessages.incrementAndGet();
            createTypedMessageBuilder.sendAsync().whenComplete((messageId, th) -> {
                this.pendingMessages.decrementAndGet();
                if (th != null) {
                    throw new PulsarConnectorException(PulsarConnectorErrorCode.SEND_MESSAGE_FAILED, "send message failed");
                }
            });
        }
    }

    public Optional<PulsarCommitInfo> prepareCommit() throws IOException {
        return PulsarSemantics.EXACTLY_ONCE == this.pulsarSemantics ? Optional.of(new PulsarCommitInfo(this.transaction.getTxnID())) : Optional.empty();
    }

    public List<PulsarSinkState> snapshotState(long j) throws IOException {
        if (PulsarSemantics.NON != this.pulsarSemantics) {
            this.producer.flush();
            while (this.pendingMessages.longValue() > 0) {
                this.producer.flush();
            }
        }
        if (PulsarSemantics.EXACTLY_ONCE != this.pulsarSemantics) {
            return Collections.emptyList();
        }
        ArrayList newArrayList = Lists.newArrayList(new PulsarSinkState[]{new PulsarSinkState(this.transaction.getTxnID())});
        try {
            this.transaction = (TransactionImpl) PulsarConfigUtil.getTransaction(this.pulsarClient, this.transactionTimeout);
            return newArrayList;
        } catch (Exception e) {
            throw new PulsarConnectorException(PulsarConnectorErrorCode.CREATE_TRANSACTION_FAILED, "Pulsar transaction create fail.");
        }
    }

    public void abortPrepare() {
        if (PulsarSemantics.EXACTLY_ONCE == this.pulsarSemantics) {
            this.transaction.abort();
        }
    }

    public void close() throws IOException {
        this.producer.close();
        this.pulsarClient.close();
    }

    private SerializationSchema createSerializationSchema(SeaTunnelRowType seaTunnelRowType, String str, String str2) {
        if (SinkProperties.DEFAULT_FORMAT.equals(str)) {
            return new JsonSerializationSchema(seaTunnelRowType);
        }
        if ("text".equals(str)) {
            return TextSerializationSchema.builder().seaTunnelRowType(seaTunnelRowType).delimiter(str2).build();
        }
        throw new SeaTunnelJsonFormatException((SeaTunnelErrorCode) CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Unsupported format: " + str);
    }

    public static SerializationSchema createKeySerializationSchema(List<String> list, SeaTunnelRowType seaTunnelRowType) {
        if (list == null || list.isEmpty()) {
            return null;
        }
        int[] iArr = new int[list.size()];
        SeaTunnelDataType[] seaTunnelDataTypeArr = new SeaTunnelDataType[list.size()];
        for (int i = 0; i < list.size(); i++) {
            int indexOf = seaTunnelRowType.indexOf(list.get(i));
            iArr[i] = indexOf;
            seaTunnelDataTypeArr[i] = seaTunnelRowType.getFieldType(indexOf);
        }
        JsonSerializationSchema jsonSerializationSchema = new JsonSerializationSchema(new SeaTunnelRowType((String[]) list.toArray(new String[0]), seaTunnelDataTypeArr));
        Function function = seaTunnelRow -> {
            Object[] objArr = new Object[iArr.length];
            for (int i2 = 0; i2 < iArr.length; i2++) {
                objArr[i2] = seaTunnelRow.getField(iArr[i2]);
            }
            return new SeaTunnelRow(objArr);
        };
        return seaTunnelRow2 -> {
            return jsonSerializationSchema.serialize((SeaTunnelRow) function.apply(seaTunnelRow2));
        };
    }

    private List<String> getPartitionKeyFields(ReadonlyConfig readonlyConfig, SeaTunnelRowType seaTunnelRowType) {
        if (readonlyConfig.get(SinkProperties.PARTITION_KEY_FIELDS) == null) {
            return Collections.emptyList();
        }
        List<String> list = (List) readonlyConfig.get(SinkProperties.PARTITION_KEY_FIELDS);
        List asList = Arrays.asList(seaTunnelRowType.getFieldNames());
        for (String str : list) {
            if (!asList.contains(str)) {
                throw new PulsarConnectorException((SeaTunnelErrorCode) CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT, String.format("Partition key field not found: %s, rowType: %s", str, asList));
            }
        }
        return list;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1616735475:
                if (implMethodName.equals("lambda$createKeySerializationSchema$d41841e8$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/seatunnel/api/serialization/SerializationSchema") && serializedLambda.getFunctionalInterfaceMethodName().equals("serialize") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/seatunnel/api/table/type/SeaTunnelRow;)[B") && serializedLambda.getImplClass().equals("org/apache/seatunnel/connectors/seatunnel/pulsar/sink/PulsarSinkWriter") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/seatunnel/api/serialization/SerializationSchema;Ljava/util/function/Function;Lorg/apache/seatunnel/api/table/type/SeaTunnelRow;)[B")) {
                    SerializationSchema serializationSchema = (SerializationSchema) serializedLambda.getCapturedArg(0);
                    Function function = (Function) serializedLambda.getCapturedArg(1);
                    return seaTunnelRow2 -> {
                        return serializationSchema.serialize((SeaTunnelRow) function.apply(seaTunnelRow2));
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
