package org.apache.iceberg.spark.source;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.AnalysisException;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/iceberg/spark/source/TestSparkSchema.class */
public class TestSparkSchema {
    private static final Configuration CONF = new Configuration();
    private static final Schema SCHEMA = new Schema(new Types.NestedField[]{Types.NestedField.optional(1, "id", Types.IntegerType.get()), Types.NestedField.optional(2, "data", Types.StringType.get())});
    private static SparkSession spark = null;

    @Rule
    public TemporaryFolder temp = new TemporaryFolder();

    @BeforeClass
    public static void startSpark() {
        spark = SparkSession.builder().master("local[2]").getOrCreate();
    }

    @AfterClass
    public static void stopSpark() {
        SparkSession sparkSession = spark;
        spark = null;
        sparkSession.stop();
    }

    @Test
    public void testSparkReadSchemaIsHonored() throws IOException {
        String file = this.temp.newFolder("iceberg-table").toString();
        new HadoopTables(CONF).create(SCHEMA, PartitionSpec.unpartitioned(), (Map) null, file);
        spark.createDataFrame(Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "a")}), SimpleRecord.class).select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(file);
        Row[] rowArr = (Row[]) spark.read().schema(new StructType(new StructField[]{new StructField("id", DataTypes.IntegerType, true, Metadata.empty())})).format("iceberg").load(file).collect();
        Assert.assertEquals("Result size matches", 1L, rowArr.length);
        Assert.assertEquals("Row length matches with sparkReadSchema", 1L, rowArr[0].length());
        Assert.assertEquals("Row content matches data", 1L, rowArr[0].getInt(0));
    }

    @Test
    public void testFailIfSparkReadSchemaIsOff() throws IOException {
        String file = this.temp.newFolder("iceberg-table").toString();
        new HadoopTables(CONF).create(SCHEMA, PartitionSpec.unpartitioned(), (Map) null, file);
        spark.createDataFrame(Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "a")}), SimpleRecord.class).select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(file);
        StructType structType = new StructType(new StructField[]{new StructField("idd", DataTypes.IntegerType, true, Metadata.empty())});
        AssertHelpers.assertThrows("Iceberg should not allow a projection that contain unknown fields", IllegalArgumentException.class, "Field idd not found in source schema", () -> {
            return spark.read().schema(structType).format("iceberg").load(file);
        });
    }

    @Test
    public void testSparkReadSchemaCombinedWithProjection() throws IOException {
        String file = this.temp.newFolder("iceberg-table").toString();
        new HadoopTables(CONF).create(SCHEMA, PartitionSpec.unpartitioned(), (Map) null, file);
        spark.createDataFrame(Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "a")}), SimpleRecord.class).select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(file);
        Row[] rowArr = (Row[]) spark.read().schema(new StructType(new StructField[]{new StructField("id", DataTypes.IntegerType, true, Metadata.empty()), new StructField("data", DataTypes.StringType, true, Metadata.empty())})).format("iceberg").load(file).select("id", new String[0]).collect();
        Assert.assertEquals("Result size matches", 1L, rowArr.length);
        Assert.assertEquals("Row length matches with sparkReadSchema", 1L, rowArr[0].length());
        Assert.assertEquals("Row content matches data", 1L, rowArr[0].getInt(0));
    }

    @Test
    public void testFailSparkReadSchemaCombinedWithProjectionWhenSchemaDoesNotContainProjection() throws IOException {
        String file = this.temp.newFolder("iceberg-table").toString();
        new HadoopTables(CONF).create(SCHEMA, PartitionSpec.unpartitioned(), (Map) null, file);
        spark.createDataFrame(Lists.newArrayList(new SimpleRecord[]{new SimpleRecord(1, "a")}), SimpleRecord.class).select("id", new String[]{"data"}).write().format("iceberg").mode("append").save(file);
        StructType structType = new StructType(new StructField[]{new StructField("data", DataTypes.StringType, true, Metadata.empty())});
        AssertHelpers.assertThrows("Spark should not allow a projection that is not included in the read schema", AnalysisException.class, "cannot resolve '`id`' given input columns: [data]", () -> {
            return spark.read().schema(structType).format("iceberg").load(file).select("id", new String[0]);
        });
    }
}
