package org.apache.iceberg.examples;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.io.FileUtils;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.SparkSession;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iceberg/examples/ConcurrencyTest.class */
public class ConcurrencyTest {
    private static final Logger log = LoggerFactory.getLogger(ConcurrencyTest.class);
    private SparkSession spark;
    private File tableLocation;
    private Table table;
    private Schema schema = new Schema(new Types.NestedField[]{Types.NestedField.optional(1, "key", Types.LongType.get()), Types.NestedField.optional(2, "value", Types.StringType.get())});
    private List<SimpleRecord> data = new ArrayList();

    @Before
    public void before() throws IOException {
        this.tableLocation = Files.createTempDirectory("temp", new FileAttribute[0]).toFile();
        this.spark = SparkSession.builder().master("local[2]").getOrCreate();
        this.spark.sparkContext().setLogLevel("WARN");
        this.table = new HadoopTables(this.spark.sparkContext().hadoopConfiguration()).create(this.schema, this.tableLocation.toString());
        for (int i = 0; i < 1000000; i++) {
            this.data.add(new SimpleRecord(1, "bdp"));
        }
        log.info("End of setup phase");
    }

    @Test
    public void writingAndReadingConcurrently() throws InterruptedException {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(5);
        ArrayList arrayList = new ArrayList();
        arrayList.add(() -> {
            return writeToTable(this.data);
        });
        for (int i = 0; i < 500; i++) {
            arrayList.add(() -> {
                return readTable();
            });
        }
        newFixedThreadPool.invokeAll(arrayList);
        newFixedThreadPool.shutdown();
        this.table.refresh();
        readTable();
    }

    private Void readTable() {
        log.info("" + this.spark.read().format("iceberg").load(this.tableLocation.toString()).count());
        return null;
    }

    private Void writeToTable(List<SimpleRecord> list) {
        log.info("WRITING!");
        this.spark.createDataFrame(list, SimpleRecord.class).select("key", new String[]{"value"}).write().format("iceberg").mode("append").save(this.tableLocation.toString());
        return null;
    }

    @After
    public void after() throws IOException {
        this.spark.stop();
        FileUtils.deleteDirectory(this.tableLocation);
    }
}
