package org.kitesdk.data.mapreduce;

import com.google.common.base.Preconditions;
import java.io.IOException;
import java.net.URI;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.reflect.ReflectData;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.kitesdk.compat.Hadoop;
import org.kitesdk.data.Dataset;
import org.kitesdk.data.DatasetDescriptor;
import org.kitesdk.data.DatasetException;
import org.kitesdk.data.DatasetWriter;
import org.kitesdk.data.Datasets;
import org.kitesdk.data.TypeNotFoundException;
import org.kitesdk.data.View;
import org.kitesdk.data.spi.DataModelUtil;
import org.kitesdk.data.spi.DatasetRepositories;
import org.kitesdk.data.spi.DatasetRepository;
import org.kitesdk.data.spi.Mergeable;
import org.kitesdk.data.spi.PartitionKey;
import org.kitesdk.data.spi.TemporaryDatasetRepository;
import org.kitesdk.data.spi.TemporaryDatasetRepositoryAccessor;
import org.kitesdk.data.spi.filesystem.FileSystemDataset;

/* loaded from: input_file:org/kitesdk/data/mapreduce/DatasetKeyOutputFormat.class */
public class DatasetKeyOutputFormat<E> extends OutputFormat<E, Void> {
    public static final String KITE_OUTPUT_URI = "kite.outputUri";
    public static final String KITE_PARTITION_DIR = "kite.outputPartitionDir";
    public static final String KITE_TYPE = "kite.outputEntityType";
    private static final String KITE_WRITE_MODE = "kite.outputMode";
    private static final String TEMP_NAMESPACE = "mr";

    /* loaded from: input_file:org/kitesdk/data/mapreduce/DatasetKeyOutputFormat$ConfigBuilder.class */
    public static class ConfigBuilder {
        private final Configuration conf;

        private ConfigBuilder(Job job) {
            this((Configuration) Hadoop.JobContext.getConfiguration.invoke(job, new Object[0]));
        }

        private ConfigBuilder(Configuration configuration) {
            this.conf = configuration;
            configuration.setBoolean("mapred.reducer.new-api", true);
        }

        public ConfigBuilder writeTo(URI uri) {
            this.conf.set(DatasetKeyOutputFormat.KITE_OUTPUT_URI, uri.toString());
            return this;
        }

        public ConfigBuilder overwrite(URI uri) {
            setOverwrite();
            return writeTo(uri);
        }

        public ConfigBuilder appendTo(URI uri) {
            setAppend();
            return writeTo(uri);
        }

        public ConfigBuilder writeTo(View<?> view) {
            if (view instanceof FileSystemDataset) {
                this.conf.set(DatasetKeyOutputFormat.KITE_PARTITION_DIR, String.valueOf(((FileSystemDataset) view).getDescriptor().getLocation()));
            }
            withType(view.getType());
            return writeTo(view.getUri());
        }

        public ConfigBuilder overwrite(View<?> view) {
            setOverwrite();
            return writeTo(view);
        }

        public ConfigBuilder appendTo(View<?> view) {
            setAppend();
            return writeTo(view);
        }

        public ConfigBuilder writeTo(String str) {
            return writeTo(URI.create(str));
        }

        public ConfigBuilder overwrite(String str) {
            setOverwrite();
            return writeTo(str);
        }

        public ConfigBuilder appendTo(String str) {
            setAppend();
            return writeTo(str);
        }

        public <E> ConfigBuilder withType(Class<E> cls) {
            this.conf.setClass(DatasetKeyOutputFormat.KITE_TYPE, cls, cls);
            return this;
        }

        private void setOverwrite() {
            String str = this.conf.get(DatasetKeyOutputFormat.KITE_WRITE_MODE);
            Preconditions.checkState(str == null, "Cannot replace existing write mode: " + str);
            this.conf.setEnum(DatasetKeyOutputFormat.KITE_WRITE_MODE, WriteMode.OVERWRITE);
        }

        private void setAppend() {
            String str = this.conf.get(DatasetKeyOutputFormat.KITE_WRITE_MODE);
            Preconditions.checkState(str == null, "Cannot replace existing write mode: " + str);
            this.conf.setEnum(DatasetKeyOutputFormat.KITE_WRITE_MODE, WriteMode.APPEND);
        }
    }

    /* loaded from: input_file:org/kitesdk/data/mapreduce/DatasetKeyOutputFormat$DatasetRecordWriter.class */
    static class DatasetRecordWriter<E> extends RecordWriter<E, Void> {
        private DatasetWriter<E> datasetWriter;
        private GenericData dataModel;
        private boolean copyEntities;
        private Schema schema;

        public DatasetRecordWriter(View<E> view) {
            this.datasetWriter = view.newWriter();
            this.schema = view.getDataset().getDescriptor().getSchema();
            this.dataModel = DataModelUtil.getDataModelForType(view.getType());
            if (this.dataModel.getClass() != ReflectData.class) {
                this.copyEntities = true;
            }
        }

        public void write(E e, Void r5) {
            if (this.copyEntities) {
                e = copy(e);
            }
            this.datasetWriter.write(e);
        }

        private <E> E copy(E e) {
            return (E) this.dataModel.deepCopy(this.schema, e);
        }

        public void close(TaskAttemptContext taskAttemptContext) {
            this.datasetWriter.close();
        }

        /* JADX WARN: Multi-variable type inference failed */
        public /* bridge */ /* synthetic */ void write(Object obj, Object obj2) throws IOException, InterruptedException {
            write((DatasetRecordWriter<E>) obj, (Void) obj2);
        }
    }

    /* loaded from: input_file:org/kitesdk/data/mapreduce/DatasetKeyOutputFormat$MergeOutputCommitter.class */
    static class MergeOutputCommitter<E> extends OutputCommitter {
        MergeOutputCommitter() {
        }

        public void setupJob(JobContext jobContext) {
            DatasetKeyOutputFormat.createJobDataset(jobContext);
        }

        public void commitJob(JobContext jobContext) throws IOException {
            TemporaryDatasetRepository datasetRepository = DatasetKeyOutputFormat.getDatasetRepository(jobContext);
            boolean z = datasetRepository instanceof TemporaryDatasetRepository;
            String jobDatasetName = DatasetKeyOutputFormat.getJobDatasetName(jobContext);
            View load = DatasetKeyOutputFormat.load(jobContext);
            load.getDataset().merge(datasetRepository.load(DatasetKeyOutputFormat.TEMP_NAMESPACE, jobDatasetName));
            if (z) {
                datasetRepository.delete();
            } else {
                datasetRepository.delete(DatasetKeyOutputFormat.TEMP_NAMESPACE, jobDatasetName);
            }
        }

        public void abortJob(JobContext jobContext, JobStatus.State state) {
            DatasetKeyOutputFormat.deleteJobDataset(jobContext);
        }

        public void setupTask(TaskAttemptContext taskAttemptContext) {
        }

        public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) {
            return true;
        }

        public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {
            DatasetRepository datasetRepository = DatasetKeyOutputFormat.getDatasetRepository(taskAttemptContext);
            boolean z = datasetRepository instanceof TemporaryDatasetRepository;
            Mergeable load = datasetRepository.load(DatasetKeyOutputFormat.TEMP_NAMESPACE, DatasetKeyOutputFormat.getJobDatasetName(taskAttemptContext));
            String taskAttemptDatasetName = DatasetKeyOutputFormat.getTaskAttemptDatasetName(taskAttemptContext);
            if (datasetRepository.exists(DatasetKeyOutputFormat.TEMP_NAMESPACE, taskAttemptDatasetName)) {
                load.merge(datasetRepository.load(DatasetKeyOutputFormat.TEMP_NAMESPACE, taskAttemptDatasetName));
                if (z) {
                    return;
                }
                datasetRepository.delete(DatasetKeyOutputFormat.TEMP_NAMESPACE, taskAttemptDatasetName);
            }
        }

        public void abortTask(TaskAttemptContext taskAttemptContext) {
            DatasetKeyOutputFormat.deleteTaskAttemptDataset(taskAttemptContext);
        }
    }

    /* loaded from: input_file:org/kitesdk/data/mapreduce/DatasetKeyOutputFormat$NullOutputCommitter.class */
    static class NullOutputCommitter extends OutputCommitter {
        NullOutputCommitter() {
        }

        public void setupJob(JobContext jobContext) {
        }

        public void setupTask(TaskAttemptContext taskAttemptContext) {
        }

        public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) {
            return false;
        }

        public void commitTask(TaskAttemptContext taskAttemptContext) {
        }

        public void abortTask(TaskAttemptContext taskAttemptContext) {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/kitesdk/data/mapreduce/DatasetKeyOutputFormat$WriteMode.class */
    public enum WriteMode {
        DEFAULT,
        APPEND,
        OVERWRITE
    }

    public static ConfigBuilder configure(Job job) {
        job.setOutputFormatClass(DatasetKeyOutputFormat.class);
        return new ConfigBuilder(job);
    }

    public static ConfigBuilder configure(Configuration configuration) {
        return new ConfigBuilder(configuration);
    }

    public RecordWriter<E, Void> getRecordWriter(TaskAttemptContext taskAttemptContext) {
        Configuration configuration = (Configuration) Hadoop.TaskAttemptContext.getConfiguration.invoke(taskAttemptContext, new Object[0]);
        Dataset load = load(taskAttemptContext);
        Dataset loadOrCreateTaskAttemptDataset = usePerTaskAttemptDatasets(load) ? loadOrCreateTaskAttemptDataset(taskAttemptContext) : load;
        String str = configuration.get(KITE_PARTITION_DIR);
        if (!loadOrCreateTaskAttemptDataset.getDataset().getDescriptor().isPartitioned() || str == null) {
            return new DatasetRecordWriter(loadOrCreateTaskAttemptDataset);
        }
        if (!(load instanceof FileSystemDataset)) {
            throw new UnsupportedOperationException("Partitions only supported for FileSystemDataset. Dataset: " + load);
        }
        FileSystemDataset fileSystemDataset = (FileSystemDataset) load;
        PartitionKey keyFromDirectory = fileSystemDataset.keyFromDirectory(new Path(str));
        if (keyFromDirectory != null) {
            loadOrCreateTaskAttemptDataset = fileSystemDataset.getPartition(keyFromDirectory, true);
        }
        return new DatasetRecordWriter(loadOrCreateTaskAttemptDataset);
    }

    public void checkOutputSpecs(JobContext jobContext) {
        View load = load(jobContext);
        switch ((WriteMode) ((Configuration) Hadoop.JobContext.getConfiguration.invoke(jobContext, new Object[0])).getEnum(KITE_WRITE_MODE, WriteMode.DEFAULT)) {
            case APPEND:
                return;
            case OVERWRITE:
                if (load.isEmpty()) {
                    return;
                }
                load.deleteAll();
                return;
            case DEFAULT:
            default:
                if (!load.isEmpty()) {
                    throw new DatasetException("View is not empty: " + load);
                }
                return;
        }
    }

    public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext) {
        return usePerTaskAttemptDatasets(load(taskAttemptContext)) ? new MergeOutputCommitter() : new NullOutputCommitter();
    }

    private static <E> boolean usePerTaskAttemptDatasets(View<E> view) {
        return !Hadoop.isHadoop1() && (view.getDataset() instanceof Mergeable);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static DatasetRepository getDatasetRepository(JobContext jobContext) {
        TemporaryDatasetRepository repositoryFor = DatasetRepositories.repositoryFor(((Configuration) Hadoop.JobContext.getConfiguration.invoke(jobContext, new Object[0])).get(KITE_OUTPUT_URI));
        if (repositoryFor instanceof TemporaryDatasetRepositoryAccessor) {
            repositoryFor = ((TemporaryDatasetRepositoryAccessor) repositoryFor).getTemporaryRepository(load(jobContext).getDataset().getNamespace(), getJobDatasetName(jobContext));
        }
        return repositoryFor;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static String getJobDatasetName(JobContext jobContext) {
        return jobContext.getJobID().toString();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static String getTaskAttemptDatasetName(TaskAttemptContext taskAttemptContext) {
        return taskAttemptContext.getTaskAttemptID().toString();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <E> View<E> load(JobContext jobContext) {
        Configuration configuration = (Configuration) Hadoop.JobContext.getConfiguration.invoke(jobContext, new Object[0]);
        return Datasets.load(configuration.get(KITE_OUTPUT_URI), getType(jobContext));
    }

    private static <E> Class<E> getType(JobContext jobContext) {
        Configuration configuration = (Configuration) Hadoop.JobContext.getConfiguration.invoke(jobContext, new Object[0]);
        try {
            return configuration.getClass(KITE_TYPE, GenericData.Record.class);
        } catch (RuntimeException e) {
            if (e.getCause() instanceof ClassNotFoundException) {
                throw new TypeNotFoundException(String.format("The Java class %s for the entity type could not be found", configuration.get(KITE_TYPE)), e.getCause());
            }
            throw e;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <E> Dataset<E> createJobDataset(JobContext jobContext) {
        Dataset dataset = load(jobContext).getDataset();
        return getDatasetRepository(jobContext).create(TEMP_NAMESPACE, getJobDatasetName(jobContext), copy(dataset.getDescriptor()), getType(jobContext));
    }

    private static <E> Dataset<E> loadJobDataset(JobContext jobContext) {
        return getDatasetRepository(jobContext).load(TEMP_NAMESPACE, getJobDatasetName(jobContext));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void deleteJobDataset(JobContext jobContext) {
        getDatasetRepository(jobContext).delete(TEMP_NAMESPACE, getJobDatasetName(jobContext));
    }

    private static <E> Dataset<E> loadOrCreateTaskAttemptDataset(TaskAttemptContext taskAttemptContext) {
        String taskAttemptDatasetName = getTaskAttemptDatasetName(taskAttemptContext);
        DatasetRepository datasetRepository = getDatasetRepository(taskAttemptContext);
        return datasetRepository.exists(TEMP_NAMESPACE, taskAttemptDatasetName) ? datasetRepository.load(TEMP_NAMESPACE, taskAttemptDatasetName) : datasetRepository.create(TEMP_NAMESPACE, taskAttemptDatasetName, copy(loadJobDataset(taskAttemptContext).getDescriptor()));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void deleteTaskAttemptDataset(TaskAttemptContext taskAttemptContext) {
        DatasetRepository datasetRepository = getDatasetRepository(taskAttemptContext);
        String taskAttemptDatasetName = getTaskAttemptDatasetName(taskAttemptContext);
        if (datasetRepository.exists(TEMP_NAMESPACE, taskAttemptDatasetName)) {
            datasetRepository.delete(TEMP_NAMESPACE, taskAttemptDatasetName);
        }
    }

    private static DatasetDescriptor copy(DatasetDescriptor datasetDescriptor) {
        return new DatasetDescriptor.Builder(datasetDescriptor).property("kite.parquet.non-durable-writes", "true").location((URI) null).build();
    }
}
