package org.kitesdk.cli.commands;

import au.com.bytecode.opencsv.CSVWriter;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import edu.umd.cs.findbugs.annotations.SuppressWarnings;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import org.apache.avro.generic.GenericRecord;
import org.jboss.netty.handler.codec.http.multipart.HttpPostBodyUtil;
import org.kitesdk.data.Dataset;
import org.kitesdk.data.DatasetException;
import org.kitesdk.data.spi.DatasetRepositories;
import org.mortbay.io.Portable;
import org.mortbay.util.URIUtil;
import org.slf4j.Logger;

@Parameters(commandDescription = "Build a Flume config to log events to a dataset")
/* loaded from: input_file:org/kitesdk/cli/commands/FlumeConfigCommand.class */
public class FlumeConfigCommand extends BaseDatasetCommand {

    @Parameter(description = "Dataset name or URI", required = true)
    List<String> datasetName;

    @Parameter(names = {"--use-dataset-uri"}, description = "Configure Flume with a dataset URI. Requires Flume 1.6+")
    boolean newFlume;

    @Parameter(names = {"--agent"}, description = "Flume agent name")
    String agent;

    @Parameter(names = {"--source"}, description = "Flume source name")
    String sourceName;

    @Parameter(names = {"--bind"}, description = "Avro source bind address")
    String bindAddress;

    @Parameter(names = {"--port"}, description = "Avro source port")
    int port;

    @Parameter(names = {"--channel"}, description = "Flume channel name")
    String channelName;

    @Parameter(names = {"--channel-type"}, description = "Flume channel type")
    String channelType;

    @Parameter(names = {"--channel-capacity"}, description = "Flume channel capacity")
    Integer capacity;
    int defaultMemoryChannelCapacity;

    @Parameter(names = {"--channel-transaction-capacity"}, description = "Flume channel transaction capacity")
    Integer transactionCapacity;
    int defaultMemoryChannelTransactionCapacity;

    @Parameter(names = {"--checkpoint-dir"}, description = "File channel checkpoint directory")
    String checkpointDir;

    @Parameter(names = {"--data-dir"}, description = "File channel data directory, use the option multiple times for multiple data directories")
    List<String> dataDirs;

    @Parameter(names = {"--sink"}, description = "Avro sink name")
    String sinkName;

    @Parameter(names = {"--batch-size"}, description = "Records to write per batch")
    int batchSize;

    @Parameter(names = {"--roll-interval"}, description = "Time in seconds before starting the next file")
    int rollInterval;

    @Parameter(names = {"--proxy-user"}, description = "User to write to HDFS as")
    String proxyUser;

    @SuppressWarnings(value = {"UWF_NULL_FIELD"}, justification = "Field set by JCommander")
    @Parameter(names = {"-o", "--output"}, description = "Save logging config to path")
    String outputPath;

    public FlumeConfigCommand(Logger logger) {
        super(logger);
        this.agent = "tier1";
        this.sourceName = "avro-event-source";
        this.bindAddress = Portable.ALL_INTERFACES;
        this.port = 41415;
        this.channelName = "avro-event-channel";
        this.channelType = HttpPostBodyUtil.FILE;
        this.capacity = null;
        this.defaultMemoryChannelCapacity = 10000000;
        this.transactionCapacity = null;
        this.defaultMemoryChannelTransactionCapacity = 1000;
        this.checkpointDir = null;
        this.dataDirs = null;
        this.sinkName = "kite-dataset";
        this.batchSize = 1000;
        this.rollInterval = 30;
        this.proxyUser = null;
        this.outputPath = null;
    }

    @Override // org.kitesdk.cli.Command
    @SuppressWarnings(value = {"NP_NULL_ON_SOME_PATH"}, justification = "Null case checked by precondition")
    public int run() throws IOException {
        Preconditions.checkArgument((this.datasetName == null || this.datasetName.isEmpty()) ? false : true, "Missing dataset uri");
        Preconditions.checkArgument((HttpPostBodyUtil.FILE.equals(this.channelType) && (this.checkpointDir == null || this.dataDirs == null || this.dataDirs.isEmpty())) ? false : true, "--checkpoint-dir and --data-dir are required options when the channel type is 'file'");
        Dataset<GenericRecord> dataset = load(this.datasetName.get(0), GenericRecord.class).getDataset();
        String buildDatasetUri = buildDatasetUri(this.datasetName.get(0));
        URI legacyRepoUri = getLegacyRepoUri(dataset);
        String name = dataset.getName();
        StringBuilder sb = new StringBuilder();
        sb.append(this.agent).append(".sources = ").append(this.sourceName).append('\n');
        sb.append(this.agent).append(".channels = ").append(this.channelName).append('\n');
        sb.append(this.agent).append(".sinks = ").append(this.sinkName).append('\n');
        sb.append('\n');
        sb.append(this.agent).append(".sources.").append(this.sourceName).append(".type = avro").append('\n');
        sb.append(this.agent).append(".sources.").append(this.sourceName).append(".channels = ").append(this.channelName).append('\n');
        sb.append(this.agent).append(".sources.").append(this.sourceName).append(".bind = ").append(this.bindAddress).append('\n');
        sb.append(this.agent).append(".sources.").append(this.sourceName).append(".port = ").append(this.port).append('\n');
        sb.append('\n');
        sb.append(this.agent).append(".channels.").append(this.channelName).append(".type = ").append(this.channelType).append('\n');
        if ("memory".equals(this.channelType) && this.capacity == null) {
            this.capacity = Integer.valueOf(this.defaultMemoryChannelCapacity);
        }
        if (this.capacity != null) {
            sb.append(this.agent).append(".channels.").append(this.channelName).append(".capacity = ").append(this.capacity).append('\n');
        }
        if ("memory".equals(this.channelType) && this.transactionCapacity == null) {
            this.transactionCapacity = Integer.valueOf(this.defaultMemoryChannelTransactionCapacity);
        }
        if (this.transactionCapacity != null) {
            sb.append(this.agent).append(".channels.").append(this.channelName).append(".transactionCapacity = ").append(this.transactionCapacity).append('\n');
        }
        if (HttpPostBodyUtil.FILE.equals(this.channelType)) {
            sb.append(this.agent).append(".channels.").append(this.channelName).append(".checkpointDir = ").append(this.checkpointDir).append('\n');
        }
        if (HttpPostBodyUtil.FILE.equals(this.channelType)) {
            sb.append(CSVWriter.DEFAULT_LINE_END);
            sb.append("# A list of directories where Flume will persist records that are waiting to be\n");
            sb.append("# processed by the sink. You can use multiple directories on different physical\n");
            sb.append("# disks to increase throughput.\n");
            sb.append(this.agent).append(".channels.").append(this.channelName).append(".dataDirs = ");
            boolean z = true;
            for (String str : this.dataDirs) {
                if (!z) {
                    sb.append(", ");
                }
                sb.append(str);
                z = false;
            }
            sb.append('\n');
        }
        sb.append('\n');
        sb.append(this.agent).append(".sinks.").append(this.sinkName).append(".type = org.apache.flume.sink.kite.DatasetSink").append('\n');
        sb.append(this.agent).append(".sinks.").append(this.sinkName).append(".channel = ").append(this.channelName).append('\n');
        if (this.newFlume) {
            sb.append(this.agent).append(".sinks.").append(this.sinkName).append(".kite.dataset.uri = ").append(buildDatasetUri).append('\n');
        } else {
            sb.append(this.agent).append(".sinks.").append(this.sinkName).append(".kite.repo.uri = ").append(legacyRepoUri).append('\n');
            sb.append(this.agent).append(".sinks.").append(this.sinkName).append(".kite.dataset.name = ").append(name).append('\n');
        }
        sb.append(this.agent).append(".sinks.").append(this.sinkName).append(".kite.batchSize = ").append(this.batchSize).append('\n');
        sb.append(this.agent).append(".sinks.").append(this.sinkName).append(".kite.rollInterval = ").append(this.rollInterval).append('\n');
        if (this.proxyUser != null) {
            sb.append(this.agent).append(".sinks.").append(this.sinkName).append(".auth.proxyUser = ").append(this.proxyUser).append('\n');
        }
        output(sb.toString(), this.console, this.outputPath);
        return 0;
    }

    private URI getLegacyRepoUri(Dataset<GenericRecord> dataset) {
        return getLegacyRepoUri(dataset.getUri(), dataset.getNamespace());
    }

    @VisibleForTesting
    URI getLegacyRepoUri(URI uri, String str) {
        URI uri2 = DatasetRepositories.repositoryFor(uri).getUri();
        URI create = URI.create(uri2.getSchemeSpecificPart());
        if (Sets.newHashSet(new String[]{"hdfs", HttpPostBodyUtil.FILE, "hive"}).contains(create.getScheme())) {
            try {
                uri2 = URI.create("repo:" + new URI(create.getScheme(), create.getUserInfo(), create.getHost(), create.getPort(), create.getPath() + URIUtil.SLASH + str, create.getQuery(), create.getFragment()).toString());
            } catch (URISyntaxException e) {
                throw new DatasetException("Error generating legacy URI", e);
            }
        }
        return uri2;
    }

    @Override // org.kitesdk.cli.Command
    public List<String> getExamples() {
        return Lists.newArrayList(new String[]{"# Print Flume configuration to log to dataset \"users\":", "--checkpoint-dir /data/0/flume/checkpoint --data-dir /data/1/flume/data users", "# Print Flume configuration to log to dataset \"dataset:hdfs:/datasets/default/users\":", "--channel-type memory dataset:hdfs:/datasets/default/users", "# Save Flume configuration to the file \"flume.properties\":", "--channel-type memory -o flume.properties users"});
    }

    @Override // org.kitesdk.cli.commands.BaseDatasetCommand
    public /* bridge */ /* synthetic */ String buildRepoURI() {
        return super.buildRepoURI();
    }
}
