package org.kitesdk.data.mapreduce;

import edu.umd.cs.findbugs.annotations.SuppressWarnings;
import java.io.IOException;
import java.net.URI;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.kitesdk.compat.Hadoop;
import org.kitesdk.data.Dataset;
import org.kitesdk.data.DatasetDescriptor;
import org.kitesdk.data.DatasetException;
import org.kitesdk.data.Datasets;
import org.kitesdk.data.TypeNotFoundException;
import org.kitesdk.data.View;
import org.kitesdk.data.spi.DataModelUtil;
import org.kitesdk.data.spi.DefaultConfiguration;
import org.kitesdk.data.spi.InputFormatAccessor;
import org.kitesdk.data.spi.PartitionKey;
import org.kitesdk.data.spi.PartitionedDataset;
import org.kitesdk.data.spi.filesystem.FileSystemDataset;
import org.kitesdk.shaded.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/kitesdk/data/mapreduce/DatasetKeyInputFormat.class */
public class DatasetKeyInputFormat<E> extends InputFormat<E, Void> implements Configurable {
    private static final Logger LOG = LoggerFactory.getLogger(DatasetKeyInputFormat.class);
    public static final String KITE_INPUT_URI = "kite.inputUri";
    public static final String KITE_PARTITION_DIR = "kite.inputPartitionDir";
    public static final String KITE_TYPE = "kite.inputEntityType";
    public static final String KITE_READER_SCHEMA = "kite.readerSchema";
    private Configuration conf;
    private InputFormat<E, Void> delegate;

    /* loaded from: input_file:org/kitesdk/data/mapreduce/DatasetKeyInputFormat$ConfigBuilder.class */
    public static class ConfigBuilder {
        private final Configuration conf;

        private ConfigBuilder(Configuration configuration) {
            this.conf = configuration;
        }

        public ConfigBuilder readFrom(URI uri) {
            return readFrom(Datasets.load(uri));
        }

        public ConfigBuilder readFrom(View<?> view) {
            DatasetDescriptor descriptor = view.getDataset().getDescriptor();
            if (view instanceof FileSystemDataset) {
                this.conf.set(DatasetKeyInputFormat.KITE_PARTITION_DIR, String.valueOf(descriptor.getLocation()));
            }
            for (String str : descriptor.listProperties()) {
                this.conf.set(str, descriptor.getProperty(str));
            }
            if (!DataModelUtil.isGeneric(view.getType())) {
                withType(view.getType());
            } else if (!view.getDataset().getDescriptor().getSchema().equals(view.getSchema())) {
                withSchema(view.getSchema());
            }
            this.conf.set(DatasetKeyInputFormat.KITE_INPUT_URI, view.getUri().toString());
            return this;
        }

        public ConfigBuilder readFrom(String str) {
            return readFrom(URI.create(str));
        }

        public <E> ConfigBuilder withType(Class<E> cls) {
            String str = this.conf.get(DatasetKeyInputFormat.KITE_READER_SCHEMA);
            Preconditions.checkArgument(DataModelUtil.isGeneric(cls) || str == null, "Can't configure a type when a reader schema is already set: {}", str);
            this.conf.setClass(DatasetKeyInputFormat.KITE_TYPE, cls, cls);
            return this;
        }

        public ConfigBuilder withSchema(Schema schema) {
            Class cls = this.conf.getClass(DatasetKeyInputFormat.KITE_TYPE, (Class) null);
            Preconditions.checkArgument(cls == null || DataModelUtil.isGeneric(cls), "Can't configure a reader schema when a type is already set: {}", cls);
            this.conf.set(DatasetKeyInputFormat.KITE_READER_SCHEMA, schema.toString());
            return this;
        }
    }

    public static ConfigBuilder configure(Job job) {
        job.setInputFormatClass(DatasetKeyInputFormat.class);
        return new ConfigBuilder((Configuration) Hadoop.JobContext.getConfiguration.invoke(job, new Object[0]));
    }

    public static ConfigBuilder configure(Configuration configuration) {
        setInputFormatClass(configuration);
        return new ConfigBuilder(configuration);
    }

    private static void setInputFormatClass(Configuration configuration) {
        if (Hadoop.isHadoop1()) {
            configuration.set("mapreduce.inputformat.class", DatasetKeyInputFormat.class.getName());
            return;
        }
        Job job = (Job) Hadoop.Job.newInstance.invoke(new Configuration(false));
        job.setInputFormatClass(DatasetKeyInputFormat.class);
        Iterator it = job.getConfiguration().iterator();
        while (it.hasNext()) {
            Map.Entry entry = (Map.Entry) it.next();
            configuration.set((String) entry.getKey(), (String) entry.getValue());
        }
    }

    public Configuration getConf() {
        return this.conf;
    }

    public void setConf(Configuration configuration) {
        this.conf = configuration;
        View<E> load = load(configuration);
        String str = this.conf.get(KITE_PARTITION_DIR);
        if (!load.getDataset().getDescriptor().isPartitioned() || str == null) {
            this.delegate = getDelegateInputFormat(load, this.conf);
        } else {
            this.delegate = getDelegateInputFormatForPartition(load.getDataset(), str, this.conf);
        }
    }

    private InputFormat<E, Void> getDelegateInputFormat(View<E> view, Configuration configuration) {
        if (view instanceof InputFormatAccessor) {
            return ((InputFormatAccessor) view).getInputFormat(configuration);
        }
        throw new UnsupportedOperationException("Implementation does not provide InputFormat support. View: " + view);
    }

    private InputFormat<E, Void> getDelegateInputFormatForPartition(Dataset<E> dataset, String str, Configuration configuration) {
        if (!(dataset instanceof FileSystemDataset)) {
            throw new UnsupportedOperationException("Partitions only supported for FileSystemDataset. Dataset: " + dataset);
        }
        FileSystemDataset fileSystemDataset = (FileSystemDataset) dataset;
        LOG.debug("Getting delegate input format for dataset {} with partition directory {}", dataset, str);
        PartitionKey keyFromDirectory = fileSystemDataset.keyFromDirectory(new Path(str));
        LOG.debug("Partition key: {}", keyFromDirectory);
        if (keyFromDirectory == null) {
            throw new DatasetException("Cannot find partition " + str);
        }
        PartitionedDataset<E> partition = fileSystemDataset.getPartition(keyFromDirectory, false);
        LOG.debug("Partition: {}", partition);
        return getDelegateInputFormat(partition, configuration);
    }

    private static <E> View<E> load(Configuration configuration) {
        try {
            Class cls = configuration.getClass(KITE_TYPE, GenericData.Record.class);
            String str = configuration.get(KITE_READER_SCHEMA);
            Schema schema = null;
            if (str != null) {
                schema = new Schema.Parser().parse(str);
            }
            String str2 = configuration.get(KITE_INPUT_URI);
            return schema != null ? (View<E>) Datasets.load(str2).asSchema(schema).asType(cls) : Datasets.load(str2, cls);
        } catch (RuntimeException e) {
            if (e.getCause() instanceof ClassNotFoundException) {
                throw new TypeNotFoundException(String.format("The Java class %s for the entity type could not be found", configuration.get(KITE_TYPE)), e.getCause());
            }
            throw e;
        }
    }

    @SuppressWarnings(value = {"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}, justification = "Delegate set by setConf")
    public List<InputSplit> getSplits(JobContext jobContext) throws IOException, InterruptedException {
        return this.delegate.getSplits(jobContext);
    }

    @SuppressWarnings(value = {"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}, justification = "Delegate set by setConf")
    public RecordReader<E, Void> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        DefaultConfiguration.init((Configuration) Hadoop.TaskAttemptContext.getConfiguration.invoke(taskAttemptContext, new Object[0]));
        return this.delegate.createRecordReader(inputSplit, taskAttemptContext);
    }
}
