package com.opendxl.databus.cli.operation;

import com.opendxl.databus.cli.CliUtils;
import com.opendxl.databus.cli.Options;
import com.opendxl.databus.cli.entity.ConsumerRecordResult;
import com.opendxl.databus.cli.entity.ExecutionResult;
import com.opendxl.databus.consumer.Consumer;
import com.opendxl.databus.consumer.ConsumerConfiguration;
import com.opendxl.databus.consumer.ConsumerRecord;
import com.opendxl.databus.consumer.ConsumerRecords;
import com.opendxl.databus.consumer.DatabusConsumer;
import com.opendxl.databus.serialization.ByteArrayDeserializer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import joptsimple.ArgumentAcceptingOptionSpec;
import joptsimple.OptionSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/opendxl/databus/cli/operation/ConsumeOperation.class */
public class ConsumeOperation implements CommandLineOperation {
    private static final Logger LOG = LoggerFactory.getLogger(ConsumeOperation.class);
    private static final String OPERATION_NAME = OperationArguments.CONSUME.argumentName;
    private Map<Options, ArgumentAcceptingOptionSpec> mandatoryOptions = new HashMap();
    private final OptionSet options;

    public ConsumeOperation(Map<Options, ArgumentAcceptingOptionSpec> map, OptionSet optionSet) {
        this.options = optionSet;
        this.mandatoryOptions.put(Options.BROKER_LIST, map.get(Options.BROKER_LIST));
        this.mandatoryOptions.put(Options.FROM_TOPIC, map.get(Options.FROM_TOPIC));
    }

    @Override // com.opendxl.databus.cli.operation.CommandLineOperation
    public Map<Options, ArgumentAcceptingOptionSpec> getMandatoryOptions() {
        return this.mandatoryOptions;
    }

    @Override // com.opendxl.databus.cli.operation.CommandLineOperation
    public String getOperationName() {
        return OPERATION_NAME;
    }

    @Override // com.opendxl.databus.cli.operation.CommandLineOperation
    public ExecutionResult execute() {
        DatabusConsumer databusConsumer = null;
        try {
            try {
                String obj = this.options.valueOf(this.mandatoryOptions.get(Options.BROKER_LIST)).toString();
                Map<String, Object> config = getConfig();
                String consumerGroupName = getConsumerGroupName();
                String autoCommit = getAutoCommit(config);
                config.put("bootstrap.servers", obj);
                config.put(ConsumerConfiguration.GROUP_ID_CONFIG, consumerGroupName);
                config.put(ConsumerConfiguration.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
                databusConsumer = new DatabusConsumer(config, new ByteArrayDeserializer());
                subscribeToTopics(databusConsumer);
                ExecutionResult executionResult = new ExecutionResult("OK", getConsumerRecordResults(databusConsumer, ((Integer) this.options.valueOf(Options.CONSUME_TIMEOUT.getOptionName())).intValue(), ((Integer) this.options.valueOf(Options.CONSUME_RECORDS.getOptionName())).intValue(), autoCommit), this.options.asMap());
                if (databusConsumer != null) {
                    databusConsumer.close();
                }
                return executionResult;
            } catch (Exception e) {
                LOG.error("Error consuming records " + e.getMessage());
                ExecutionResult executionResult2 = new ExecutionResult("ERROR", e.getMessage(), this.options.asMap());
                if (databusConsumer != null) {
                    databusConsumer.close();
                }
                return executionResult2;
            }
        } catch (Throwable th) {
            if (databusConsumer != null) {
                databusConsumer.close();
            }
            throw th;
        }
    }

    private void subscribeToTopics(Consumer<byte[]> consumer) {
        List<String> list = (List) Arrays.stream(this.options.valueOf(this.mandatoryOptions.get(Options.FROM_TOPIC)).toString().split(",")).collect(Collectors.toList());
        String obj = this.options.hasArgument(Options.TENANT_GROUP.getOptionName()) ? this.options.valueOf(Options.TENANT_GROUP.getOptionName()).toString() : "";
        if (obj == null || obj.isEmpty()) {
            consumer.subscribe(list);
            return;
        }
        HashMap hashMap = new HashMap();
        hashMap.put(obj, list);
        consumer.subscribe(hashMap);
    }

    private String getAutoCommit(Map map) {
        String str = (String) map.get(ConsumerConfiguration.ENABLE_AUTO_COMMIT_CONFIG);
        if (str == null || str.isEmpty()) {
            str = "true";
        }
        return str;
    }

    private String getConsumerGroupName() {
        return this.options.hasArgument(Options.CG.name().toLowerCase()) ? this.options.valueOf(Options.CG.getOptionName()).toString().toLowerCase() : UUID.randomUUID().toString();
    }

    private Map<String, Object> getConfig() {
        HashMap hashMap = new HashMap();
        if (this.options.hasArgument(Options.CONFIG.name().toLowerCase())) {
            Properties stringToMap = CliUtils.stringToMap(this.options.valueOf(Options.CONFIG.name().toLowerCase()).toString());
            for (String str : stringToMap.stringPropertyNames()) {
                hashMap.put(str, stringToMap.getProperty(str));
            }
        }
        return hashMap;
    }

    private List<ConsumerRecordResult> getConsumerRecordResults(Consumer<byte[]> consumer, int i, int i2, String str) {
        ArrayList arrayList = new ArrayList();
        long nanoTime = System.nanoTime();
        boolean z = true;
        do {
            ConsumerRecords poll = consumer.poll(100L);
            LOG.warn(String.valueOf(poll.count()));
            if (poll.count() > 0 && Boolean.parseBoolean(str) == Boolean.FALSE.booleanValue()) {
                consumer.commitSync();
            }
            Iterator<ConsumerRecord> it = poll.iterator();
            while (it.hasNext()) {
                ConsumerRecord next = it.next();
                arrayList.add(new ConsumerRecordResult(next.getKey(), new String((byte[]) next.getMessagePayload().getPayload()), next.getComposedTopic(), next.getTopic(), next.getTenantGroup(), next.getHeaders().getAll(), next.getOffset(), next.getPartition(), next.getTimestamp()));
            }
            if (arrayList.size() >= i2 || TimeUnit.MILLISECONDS.convert(System.nanoTime() - nanoTime, TimeUnit.NANOSECONDS) >= i) {
                z = false;
            }
        } while (z);
        return arrayList;
    }
}
