package com.opendxl.databus.cli.operation;

import com.opendxl.databus.cli.CliUtils;
import com.opendxl.databus.cli.CommandLineInterface;
import com.opendxl.databus.cli.Options;
import com.opendxl.databus.cli.entity.ExecutionResult;
import com.opendxl.databus.common.RecordMetadata;
import com.opendxl.databus.entities.Headers;
import com.opendxl.databus.entities.MessagePayload;
import com.opendxl.databus.entities.RoutingData;
import com.opendxl.databus.producer.Callback;
import com.opendxl.databus.producer.DatabusProducer;
import com.opendxl.databus.producer.ProducerRecord;
import com.opendxl.databus.serialization.ByteArraySerializer;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import joptsimple.ArgumentAcceptingOptionSpec;
import joptsimple.OptionSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/opendxl/databus/cli/operation/ProduceOperation.class */
public class ProduceOperation implements CommandLineOperation {
    private static final long PRODUCER_RESULT_TIMEOUT_MS = 8000;
    private static final Logger LOG = LoggerFactory.getLogger(ProduceOperation.class);
    private static final String OPERATION_NAME = OperationArguments.PRODUCE.argumentName;
    private final CountDownLatch countDownLatch;
    private Map<Options, ArgumentAcceptingOptionSpec> mandatoryOptions = new HashMap();
    private final OptionSet options;
    private ExecutionResult executionResult;

    /* loaded from: input_file:com/opendxl/databus/cli/operation/ProduceOperation$ProducerResultCallback.class */
    private class ProducerResultCallback implements Callback {
        private String shardingKey;

        ProducerResultCallback(String str) {
            this.shardingKey = str;
        }

        @Override // com.opendxl.databus.producer.Callback
        public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
            if (exc != null) {
                ProduceOperation.LOG.error("Error sending a record " + exc.getMessage());
                ProduceOperation.this.setProducerResult(new ExecutionResult("ERROR", exc.getMessage(), ProduceOperation.this.options.asMap()));
                ProduceOperation.this.countDownLatch.countDown();
            } else {
                String str = "SHARDING-KEY: " + this.shardingKey + " TOPICS:" + recordMetadata.topic() + " PARTITION:" + recordMetadata.partition() + " OFFSET:" + recordMetadata.offset();
                ProduceOperation.LOG.info("[PRODUCER <- KAFKA][OK MSG SENT] " + str);
                ProduceOperation.this.setProducerResult(new ExecutionResult("OK", str, ProduceOperation.this.options.asMap()));
                ProduceOperation.this.countDownLatch.countDown();
            }
        }
    }

    public ProduceOperation(Map<Options, ArgumentAcceptingOptionSpec> map, OptionSet optionSet) {
        this.options = optionSet;
        this.mandatoryOptions.put(Options.BROKER_LIST, map.get(Options.BROKER_LIST));
        this.mandatoryOptions.put(Options.MESSAGE, map.get(Options.MESSAGE));
        this.mandatoryOptions.put(Options.TO_TOPIC, map.get(Options.TO_TOPIC));
        this.countDownLatch = new CountDownLatch(1);
    }

    @Override // com.opendxl.databus.cli.operation.CommandLineOperation
    public Map<Options, ArgumentAcceptingOptionSpec> getMandatoryOptions() {
        return this.mandatoryOptions;
    }

    @Override // com.opendxl.databus.cli.operation.CommandLineOperation
    public String getOperationName() {
        return OPERATION_NAME;
    }

    @Override // com.opendxl.databus.cli.operation.CommandLineOperation
    public ExecutionResult execute() {
        try {
            String obj = this.options.valueOf(this.mandatoryOptions.get(Options.BROKER_LIST)).toString();
            String obj2 = this.options.valueOf(this.mandatoryOptions.get(Options.TO_TOPIC)).toString();
            String obj3 = this.options.valueOf(this.mandatoryOptions.get(Options.MESSAGE)).toString();
            String obj4 = this.options.hasArgument(Options.TENANT_GROUP.getOptionName()) ? this.options.valueOf(Options.TENANT_GROUP.getOptionName()).toString() : "";
            String obj5 = this.options.hasArgument(Options.SHARDING_KEY.getOptionName()) ? this.options.valueOf(Options.SHARDING_KEY.getOptionName()).toString() : "";
            String obj6 = this.options.hasArgument(Options.PARTITION.getOptionName()) ? this.options.valueOf(Options.PARTITION.getOptionName()).toString() : "";
            HashMap hashMap = new HashMap();
            if (this.options.hasArgument(Options.HEADERS.name().toLowerCase())) {
                Properties stringToMap = CliUtils.stringToMap(this.options.valueOf(Options.HEADERS.name().toLowerCase()).toString());
                for (String str : stringToMap.stringPropertyNames()) {
                    hashMap.put(str, stringToMap.getProperty(str));
                }
            }
            HashMap hashMap2 = new HashMap();
            if (this.options.hasArgument(Options.CONFIG.name().toLowerCase())) {
                Properties stringToMap2 = CliUtils.stringToMap(this.options.valueOf(Options.CONFIG.name().toLowerCase()).toString());
                for (String str2 : stringToMap2.stringPropertyNames()) {
                    hashMap2.put(str2, stringToMap2.getProperty(str2));
                }
            }
            hashMap2.put("bootstrap.servers", obj);
            DatabusProducer databusProducer = new DatabusProducer(hashMap2, new ByteArraySerializer());
            ProducerRecord producerRecord = new ProducerRecord(getRoutingData(obj2, obj5, obj4, obj6), new Headers(hashMap), new MessagePayload(obj3.getBytes(Charset.defaultCharset())));
            databusProducer.send(producerRecord, new ProducerResultCallback(producerRecord.getRoutingData().getShardingKey()));
            if (!this.countDownLatch.await(PRODUCER_RESULT_TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
                LOG.error("Error sending a record . TIMEOUT");
                return new ExecutionResult("ERROR", "TIMEOUT", this.options.asMap());
            }
            databusProducer.flush();
            databusProducer.close();
            return this.executionResult;
        } catch (Exception e) {
            CliUtils.printUsageAndFinish(CommandLineInterface.parser, e.getMessage(), e);
            return null;
        }
    }

    private RoutingData getRoutingData(String str, String str2, String str3, String str4) {
        return !str4.isEmpty() ? new RoutingData(str, str2, str3, Integer.valueOf(Integer.parseInt(str4))) : new RoutingData(str, str2, str3);
    }

    private synchronized void setProducerResult(ExecutionResult executionResult) {
        this.executionResult = executionResult;
    }
}
