package com.opendxl.databus.producer;

import com.opendxl.databus.common.MetricName;
import com.opendxl.databus.common.PartitionInfo;
import com.opendxl.databus.common.TopicPartition;
import com.opendxl.databus.common.internal.adapter.DatabusProducerRecordAdapter;
import com.opendxl.databus.common.internal.adapter.MetricNameMapAdapter;
import com.opendxl.databus.common.internal.adapter.PartitionInfoListAdapter;
import com.opendxl.databus.common.internal.util.TimeUnitUtil;
import com.opendxl.databus.consumer.OffsetAndMetadata;
import com.opendxl.databus.entities.internal.DatabusMessage;
import com.opendxl.databus.exception.DatabusClientRuntimeException;
import com.opendxl.databus.producer.metric.ProducerMetric;
import com.opendxl.databus.producer.metric.ProducerMetricBuilder;
import com.opendxl.databus.producer.metric.ProducerMetricEnum;
import com.opendxl.databus.serialization.internal.DatabusKeySerializer;
import java.time.Duration;
import java.time.temporal.TemporalUnit;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang.StringUtils;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.serialization.Serializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/opendxl/databus/producer/Producer.class */
public abstract class Producer<P> {
    private DatabusKeySerializer keySerializer;
    private Serializer<DatabusMessage> valueSerializer;
    private Map<String, Object> configuration;
    private org.apache.kafka.clients.producer.Producer<String, DatabusMessage> producer;
    private DatabusProducerRecordAdapter<P> databusProducerRecordAdapter;
    private String clientId;
    private static final Logger LOG = LoggerFactory.getLogger(Producer.class);
    protected boolean produceKafkaHeaders;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/opendxl/databus/producer/Producer$CallbackAdapter.class */
    public static class CallbackAdapter implements org.apache.kafka.clients.producer.Callback {
        private final Callback callback;

        CallbackAdapter(Callback callback) {
            this.callback = callback;
        }

        public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
            com.opendxl.databus.common.RecordMetadata recordMetadata2 = null;
            if (recordMetadata != null) {
                recordMetadata2 = new com.opendxl.databus.common.RecordMetadata(recordMetadata);
            }
            this.callback.onCompletion(recordMetadata2, exc);
        }
    }

    public Map<String, Object> getConfiguration() {
        return this.configuration;
    }

    public void setProduceKafkaHeader(boolean z) {
        this.produceKafkaHeaders = z;
    }

    public void send(ProducerRecord producerRecord) {
        send(producerRecord, null);
    }

    public void send(ProducerRecord<P> producerRecord, Callback callback) {
        if (producerRecord == null) {
            throw new IllegalArgumentException("record cannot be null");
        }
        try {
            this.producer.send(this.databusProducerRecordAdapter.adapt(producerRecord, this.produceKafkaHeaders), callback != null ? new CallbackAdapter(callback) : null);
        } catch (Exception e) {
            throw new DatabusClientRuntimeException("send cannot be performed: " + e.getMessage(), e, Producer.class);
        }
    }

    public void flush() {
        try {
            this.producer.flush();
        } catch (Exception e) {
            throw new DatabusClientRuntimeException("flush cannot be performed :" + e.getMessage(), e, Producer.class);
        }
    }

    public List<PartitionInfo> partitionsFor(String str) {
        try {
            return new PartitionInfoListAdapter().adapt(this.producer.partitionsFor(str));
        } catch (Exception e) {
            throw new DatabusClientRuntimeException("partitionsFor cannot be performed :" + e.getMessage(), e, Producer.class);
        }
    }

    public Map<MetricName, ? extends Metric> metrics() {
        try {
            return new MetricNameMapAdapter().adapt(this.producer.metrics());
        } catch (Exception e) {
            throw new DatabusClientRuntimeException("metrics cannot be performed :" + e.getMessage(), e, Producer.class);
        }
    }

    public void close() {
        try {
            this.producer.close();
        } catch (Exception e) {
            throw new DatabusClientRuntimeException("close cannot be performed :" + e.getMessage(), e, Producer.class);
        }
    }

    public void close(long j, TimeUnit timeUnit) {
        try {
            this.producer.close(Duration.of(j, TimeUnitUtil.convert(timeUnit)));
        } catch (Exception e) {
            throw new DatabusClientRuntimeException("close cannot be performed :" + e.getMessage(), e, Producer.class);
        }
    }

    public void close(long j, TemporalUnit temporalUnit) {
        try {
            this.producer.close(Duration.of(j, temporalUnit));
        } catch (Exception e) {
            throw new DatabusClientRuntimeException("close cannot be performed :" + e.getMessage(), e, Producer.class);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setKeySerializer(DatabusKeySerializer databusKeySerializer) {
        this.keySerializer = databusKeySerializer;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setValueSerializer(Serializer<DatabusMessage> serializer) {
        this.valueSerializer = serializer;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public DatabusKeySerializer getKeySerializer() {
        return this.keySerializer;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Serializer<DatabusMessage> getValueSerializer() {
        return this.valueSerializer;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setProducer(org.apache.kafka.clients.producer.Producer<String, DatabusMessage> producer) {
        this.producer = producer;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setDatabusProducerRecordAdapter(DatabusProducerRecordAdapter<P> databusProducerRecordAdapter) {
        this.databusProducerRecordAdapter = databusProducerRecordAdapter;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setClientId(String str) {
        this.clientId = str;
    }

    public void setConfiguration(Map<String, Object> map) {
        this.configuration = map;
    }

    public void initTransactions() {
        try {
            this.producer.initTransactions();
        } catch (Exception e) {
            throw new DatabusClientRuntimeException("initTransactions cannot be performed: " + e.getMessage(), e, Producer.class);
        }
    }

    public void beginTransaction() {
        try {
            this.producer.beginTransaction();
        } catch (Exception e) {
            throw new DatabusClientRuntimeException("beginTransaction cannot be performed: " + e.getMessage(), e, Producer.class);
        }
    }

    public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> map, String str) {
        try {
            HashMap hashMap = new HashMap();
            map.forEach((topicPartition, offsetAndMetadata) -> {
                hashMap.put(new org.apache.kafka.common.TopicPartition(topicPartition.topic(), topicPartition.partition()), new org.apache.kafka.clients.consumer.OffsetAndMetadata(offsetAndMetadata.offset(), offsetAndMetadata.metadata()));
            });
            this.producer.sendOffsetsToTransaction(hashMap, str);
        } catch (Exception e) {
            throw new DatabusClientRuntimeException("sendOffsetsToTransaction cannot be performed: " + e.getMessage(), e, Producer.class);
        }
    }

    public void commitTransaction() {
        try {
            this.producer.commitTransaction();
        } catch (Exception e) {
            throw new DatabusClientRuntimeException("commitTransaction cannot be performed: " + e.getMessage(), e, Producer.class);
        }
    }

    public void abortTransaction() {
        try {
            this.producer.abortTransaction();
        } catch (Exception e) {
            throw new DatabusClientRuntimeException("abortTransaction cannot be performed: " + e.getMessage(), e, Producer.class);
        }
    }

    public ProducerMetric recordSendTotalMetric() {
        return getMetricPerClientId(ProducerMetricEnum.RECORD_SEND_TOTAL);
    }

    public ProducerMetric recordSendRateMetric() {
        return getMetricPerClientId(ProducerMetricEnum.RECORD_SEND_RATE);
    }

    public ProducerMetric recordSizeAvgMetric() {
        return getMetricPerClientId(ProducerMetricEnum.RECORD_SIZE_AVG);
    }

    public ProducerMetric recordSizeMaxMetric() {
        return getMetricPerClientId(ProducerMetricEnum.RECORD_SIZE_MAX);
    }

    public ProducerMetric recordErrorTotalMetric() {
        return getMetricPerClientId(ProducerMetricEnum.RECORD_ERROR_TOTAL);
    }

    public ProducerMetric recordErrorRateMetric() {
        return getMetricPerClientId(ProducerMetricEnum.RECORD_ERROR_RATE);
    }

    public ProducerMetric recordBatchSizeMaxMetric() {
        return getMetricPerClientId(ProducerMetricEnum.RECORD_BATCH_SIZE_MAX);
    }

    public ProducerMetric recordBatchSizeAvgMetric() {
        return getMetricPerClientId(ProducerMetricEnum.RECORD_BATCH_SIZE_AVG);
    }

    public ProducerMetric recordSendTotalPerTopicMetric(String str) {
        return getMetricPerClientIdAndTopic(str, ProducerMetricEnum.RECORD_SEND_TOTAL_PER_TOPIC);
    }

    public ProducerMetric recordSendRatePerTopicMetric(String str) {
        return getMetricPerClientIdAndTopic(str, ProducerMetricEnum.RECORD_SEND_RATE_PER_TOPIC);
    }

    public ProducerMetric recordErrorTotalPerTopicMetric(String str) {
        return getMetricPerClientIdAndTopic(str, ProducerMetricEnum.RECORD_ERROR_TOTAL_PER_TOPIC);
    }

    public ProducerMetric recordErrorRatePerTopicMetric(String str) {
        return getMetricPerClientIdAndTopic(str, ProducerMetricEnum.RECORD_ERROR_RATE_PER_TOPIC);
    }

    public ProducerMetric recordByteTotalPerTopicMetric(String str) {
        return getMetricPerClientIdAndTopic(str, ProducerMetricEnum.RECORD_BYTE_TOTAL_PER_TOPIC);
    }

    public ProducerMetric recordByteRatePerTopicMetric(String str) {
        return getMetricPerClientIdAndTopic(str, ProducerMetricEnum.RECORD_BYTE_RATE_PER_TOPIC);
    }

    private ProducerMetric getMetricPerClientId(ProducerMetricEnum producerMetricEnum) {
        try {
            return ProducerMetricBuilder.buildClientIdMetric(metrics(), producerMetricEnum.getName(), this.clientId);
        } catch (DatabusClientRuntimeException e) {
            throw e;
        } catch (Exception e2) {
            String str = producerMetricEnum.getError() + e2.getMessage();
            LOG.error(str, e2);
            throw new DatabusClientRuntimeException(str, e2, Producer.class);
        }
    }

    private ProducerMetric getMetricPerClientIdAndTopic(String str, ProducerMetricEnum producerMetricEnum) {
        if (StringUtils.isBlank(str)) {
            throw new DatabusClientRuntimeException("topic cannot be empty or null", Producer.class);
        }
        try {
            return ProducerMetricBuilder.buildClientIdTopicMetric(metrics(), producerMetricEnum.getName(), this.clientId, new TopicPartition(str));
        } catch (DatabusClientRuntimeException e) {
            throw e;
        } catch (Exception e2) {
            String str2 = producerMetricEnum.getError() + e2.getMessage();
            LOG.error(str2, e2);
            throw new DatabusClientRuntimeException(str2, e2, Producer.class);
        }
    }
}
