package org.kitesdk.data.spi.filesystem;

import java.util.Iterator;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.kitesdk.data.DatasetDescriptor;
import org.kitesdk.data.MiniDFSTest;

/* loaded from: input_file:org/kitesdk/data/spi/filesystem/TestCSVAppender.class */
public class TestCSVAppender extends MiniDFSTest {
    private static final Schema schema = (Schema) SchemaBuilder.record("User").fields().requiredInt("id").requiredString("email").endRecord();

    @Test
    public void testCSVSyncDFS() throws Exception {
        String authority = getDFS().getUri().getAuthority();
        CSVAppender cSVAppender = new CSVAppender(getDFS(), new Path("hdfs://" + authority + "/tmp/test.csv"), new DatasetDescriptor.Builder().schema(schema).build());
        GenericData.Record record = new GenericData.Record(schema);
        cSVAppender.open();
        for (int i = 0; i < 10; i++) {
            record.put("id", Integer.valueOf(i));
            record.put("email", Integer.toString(i) + "@example.com");
            cSVAppender.append(record);
        }
        Assert.assertEquals("Should not find records before flush", 0L, count(r0, r0, r0));
        cSVAppender.flush();
        cSVAppender.sync();
        Assert.assertEquals("Should find the first 10 records", 10L, count(r0, r0, r0));
        for (int i2 = 10; i2 < 20; i2++) {
            record.put("id", Integer.valueOf(i2));
            record.put("email", Integer.toString(i2) + "@example.com");
            cSVAppender.append(record);
        }
        Assert.assertEquals("Newly written records should still be buffered", 10L, count(r0, r0, r0));
        cSVAppender.close();
        cSVAppender.cleanup();
        Assert.assertEquals("All records should be found after close", 20L, count(r0, r0, r0));
    }

    @Test
    @Ignore("LocalFileSystem is broken!?")
    public void testCSVSyncLocalFS() throws Exception {
        CSVAppender cSVAppender = new CSVAppender(FileSystem.getLocal(getConfiguration()), new Path("file:/tmp/test.csv"), new DatasetDescriptor.Builder().schema(schema).build());
        GenericData.Record record = new GenericData.Record(schema);
        cSVAppender.open();
        for (int i = 0; i < 10; i++) {
            record.put("id", Integer.valueOf(i));
            record.put("email", Integer.toString(i) + "@example.com");
            cSVAppender.append(record);
        }
        Assert.assertEquals("Should not find records before flush", 0L, count(r0, r0, r0));
        cSVAppender.flush();
        cSVAppender.sync();
        Assert.assertEquals("Should find the first 10 records", 10L, count(r0, r0, r0));
        for (int i2 = 10; i2 < 20; i2++) {
            record.put("id", Integer.valueOf(i2));
            record.put("email", Integer.toString(i2) + "@example.com");
            cSVAppender.append(record);
        }
        Assert.assertEquals("Newly written records should still be buffered", 10L, count(r0, r0, r0));
        cSVAppender.close();
        cSVAppender.cleanup();
        Assert.assertEquals("All records should be found after close", 20L, count(r0, r0, r0));
    }

    public int count(FileSystem fileSystem, Path path, DatasetDescriptor datasetDescriptor) {
        CSVFileReader cSVFileReader = new CSVFileReader(fileSystem, path, datasetDescriptor, GenericRecord.class);
        int i = 0;
        cSVFileReader.initialize();
        Iterator it = cSVFileReader.iterator();
        while (it.hasNext()) {
            i++;
            System.err.println((GenericRecord) it.next());
        }
        cSVFileReader.close();
        return i;
    }
}
