package org.kitesdk.data.mapreduce;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.avro.generic.GenericData;
import org.apache.avro.util.Utf8;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.kitesdk.data.Dataset;
import org.kitesdk.data.DatasetDescriptor;
import org.kitesdk.data.DatasetException;
import org.kitesdk.data.DatasetReader;
import org.kitesdk.data.DatasetWriter;
import org.kitesdk.data.Format;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/kitesdk/data/mapreduce/TestMapReduce.class */
public class TestMapReduce extends FileSystemTestBase {
    private Dataset<GenericData.Record> inputDataset;
    private Dataset<GenericData.Record> outputDataset;

    /* loaded from: input_file:org/kitesdk/data/mapreduce/TestMapReduce$GenericStatsReducer.class */
    private static class GenericStatsReducer extends Reducer<Text, IntWritable, GenericData.Record, Void> {
        private GenericStatsReducer() {
        }

        protected void reduce(Text text, Iterable<IntWritable> iterable, Reducer<Text, IntWritable, GenericData.Record, Void>.Context context) throws IOException, InterruptedException {
            GenericData.Record record = new GenericData.Record(FileSystemTestBase.STATS_SCHEMA);
            int i = 0;
            Iterator<IntWritable> it = iterable.iterator();
            while (it.hasNext()) {
                i += it.next().get();
            }
            record.put("name", new Utf8(text.toString()));
            record.put("count", new Integer(i));
            context.write(record, (Object) null);
        }

        protected /* bridge */ /* synthetic */ void reduce(Object obj, Iterable iterable, Reducer.Context context) throws IOException, InterruptedException {
            reduce((Text) obj, (Iterable<IntWritable>) iterable, (Reducer<Text, IntWritable, GenericData.Record, Void>.Context) context);
        }
    }

    /* loaded from: input_file:org/kitesdk/data/mapreduce/TestMapReduce$LineCountMapper.class */
    private static class LineCountMapper extends Mapper<GenericData.Record, Void, Text, IntWritable> {
        private LineCountMapper() {
        }

        protected void map(GenericData.Record record, Void r8, Mapper<GenericData.Record, Void, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            context.write(new Text(record.get("text").toString()), new IntWritable(1));
        }

        protected /* bridge */ /* synthetic */ void map(Object obj, Object obj2, Mapper.Context context) throws IOException, InterruptedException {
            map((GenericData.Record) obj, (Void) obj2, (Mapper<GenericData.Record, Void, Text, IntWritable>.Context) context);
        }
    }

    public TestMapReduce(Format format) {
        super(format);
    }

    @Override // org.kitesdk.data.mapreduce.FileSystemTestBase
    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.inputDataset = this.repo.create("ns", "in", new DatasetDescriptor.Builder().property("kite.allow.csv", "true").schema(STRING_SCHEMA).format(this.format).build(), GenericData.Record.class);
        this.outputDataset = this.repo.create("ns", "out", new DatasetDescriptor.Builder().property("kite.allow.csv", "true").schema(STATS_SCHEMA).format(this.format).build(), GenericData.Record.class);
    }

    @Test
    public void testJob() throws Exception {
        populateInputDataset();
        Assert.assertTrue(createJob().waitForCompletion(true));
        checkOutput(false);
    }

    @Test
    public void testJobEmptyView() throws Exception {
        Assert.assertTrue(createJob().waitForCompletion(true));
    }

    @Test(expected = DatasetException.class)
    public void testJobFailsWithExisting() throws Exception {
        populateInputDataset();
        populateOutputDataset();
        createJob().waitForCompletion(true);
    }

    @Test
    public void testJobOverwrite() throws Exception {
        populateInputDataset();
        populateOutputDataset();
        Job job = new Job();
        DatasetKeyInputFormat.configure(job).readFrom(this.inputDataset).withType(GenericData.Record.class);
        job.setMapperClass(LineCountMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setReducerClass(GenericStatsReducer.class);
        DatasetKeyOutputFormat.configure(job).overwrite(this.outputDataset).withType(GenericData.Record.class);
        Assert.assertTrue(job.waitForCompletion(true));
        checkOutput(false);
    }

    @Test
    public void testJobAppend() throws Exception {
        populateInputDataset();
        populateOutputDataset();
        Job job = new Job();
        DatasetKeyInputFormat.configure(job).readFrom(this.inputDataset).withType(GenericData.Record.class);
        job.setMapperClass(LineCountMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setReducerClass(GenericStatsReducer.class);
        DatasetKeyOutputFormat.configure(job).appendTo(this.outputDataset).withType(GenericData.Record.class);
        Assert.assertTrue(job.waitForCompletion(true));
        checkOutput(true);
    }

    private void populateInputDataset() {
        DatasetWriter newWriter = this.inputDataset.newWriter();
        newWriter.write(newStringRecord("apple"));
        newWriter.write(newStringRecord("banana"));
        newWriter.write(newStringRecord("banana"));
        newWriter.write(newStringRecord("carrot"));
        newWriter.write(newStringRecord("apple"));
        newWriter.write(newStringRecord("apple"));
        newWriter.close();
    }

    private void populateOutputDataset() {
        DatasetWriter newWriter = this.outputDataset.newWriter();
        newWriter.write(newStatsRecord(4, "date"));
        newWriter.close();
    }

    private void checkOutput(boolean z) {
        DatasetReader<GenericData.Record> newReader = this.outputDataset.newReader();
        HashMap hashMap = new HashMap();
        for (GenericData.Record record : newReader) {
            hashMap.put(record.get("name").toString(), (Integer) record.get("count"));
        }
        newReader.close();
        Assert.assertEquals(3L, ((Integer) hashMap.get("apple")).intValue());
        Assert.assertEquals(2L, ((Integer) hashMap.get("banana")).intValue());
        Assert.assertEquals(1L, ((Integer) hashMap.get("carrot")).intValue());
        if (z) {
            Assert.assertEquals(4L, ((Integer) hashMap.get("date")).intValue());
        } else {
            Assert.assertNull(hashMap.get("date"));
        }
    }

    private Job createJob() throws Exception {
        Job job = new Job();
        DatasetKeyInputFormat.configure(job).readFrom(this.inputDataset).withType(GenericData.Record.class);
        job.setMapperClass(LineCountMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setReducerClass(GenericStatsReducer.class);
        DatasetKeyOutputFormat.configure(job).writeTo(this.outputDataset).withType(GenericData.Record.class);
        return job;
    }
}
