package org.apache.hadoop.fs.adl;

import com.squareup.okhttp.mockwebserver.Dispatcher;
import com.squareup.okhttp.mockwebserver.MockResponse;
import com.squareup.okhttp.mockwebserver.RecordedRequest;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import okio.Buffer;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
/* loaded from: input_file:test-classes/org/apache/hadoop/fs/adl/TestConcurrentDataReadOperations.class */
public class TestConcurrentDataReadOperations extends AdlMockWebServer {
    private static final Logger LOG = LoggerFactory.getLogger(TestConcurrentDataReadOperations.class);
    private static final Object LOCK = new Object();
    private static FSDataInputStream commonHandle = null;
    private int concurrencyLevel;

    /* loaded from: input_file:test-classes/org/apache/hadoop/fs/adl/TestConcurrentDataReadOperations$CreateTestData.class */
    class CreateTestData {
        private Path path;
        private byte[] data;

        CreateTestData() {
        }

        public void set(Path path, byte[] bArr) {
            this.path = path;
            this.data = bArr;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:test-classes/org/apache/hadoop/fs/adl/TestConcurrentDataReadOperations$ReadConcurrentRunnable.class */
    public class ReadConcurrentRunnable implements Callable<Boolean> {
        private Path path;
        private int offset;
        private byte[] expectedData;
        private boolean useSameStream;

        public ReadConcurrentRunnable(byte[] bArr, Path path, int i, boolean z) {
            this.path = path;
            this.offset = i;
            this.expectedData = bArr;
            this.useSameStream = z;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Boolean call() throws IOException {
            FSDataInputStream open;
            try {
                if (this.useSameStream) {
                    synchronized (TestConcurrentDataReadOperations.LOCK) {
                        if (TestConcurrentDataReadOperations.commonHandle == null) {
                            FSDataInputStream unused = TestConcurrentDataReadOperations.commonHandle = TestConcurrentDataReadOperations.this.getMockAdlFileSystem().open(this.path);
                        }
                        open = TestConcurrentDataReadOperations.commonHandle;
                    }
                } else {
                    open = TestConcurrentDataReadOperations.this.getMockAdlFileSystem().open(this.path);
                }
                byte[] bArr = new byte[this.expectedData.length];
                open.readFully(this.offset, bArr);
                Assert.assertArrayEquals("Path :" + this.path.toString() + " did not match.", this.expectedData, bArr);
                if (!this.useSameStream) {
                    open.close();
                }
                return true;
            } catch (IOException e) {
                e.printStackTrace();
                return false;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:test-classes/org/apache/hadoop/fs/adl/TestConcurrentDataReadOperations$ReadTestData.class */
    public class ReadTestData {
        private Path path;
        private byte[] data;
        private int offset;

        ReadTestData() {
        }

        public void set(Path path, byte[] bArr, int i) {
            this.path = path;
            this.data = bArr;
            this.offset = i;
        }
    }

    public TestConcurrentDataReadOperations(int i) {
        this.concurrencyLevel = i;
    }

    @Parameterized.Parameters(name = "{index}")
    public static Collection<?> testDataNumberOfConcurrentRun() {
        return Arrays.asList(new Object[]{1}, new Object[]{2}, new Object[]{3}, new Object[]{4}, new Object[]{5});
    }

    public static byte[] getRandomByteArrayData(int i) {
        byte[] bArr = new byte[i];
        new Random().nextBytes(bArr);
        return bArr;
    }

    private void setDispatcher(final ArrayList<CreateTestData> arrayList) {
        getMockServer().setDispatcher(new Dispatcher() { // from class: org.apache.hadoop.fs.adl.TestConcurrentDataReadOperations.1
            public MockResponse dispatch(RecordedRequest recordedRequest) throws InterruptedException {
                CreateTestData createTestData = null;
                Iterator it = arrayList.iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    CreateTestData createTestData2 = (CreateTestData) it.next();
                    if (recordedRequest.getPath().contains(createTestData2.path.toString())) {
                        createTestData = createTestData2;
                        break;
                    }
                }
                if (createTestData == null) {
                    new MockResponse().setBody("Request data not found").setResponseCode(501);
                }
                if (recordedRequest.getRequestLine().contains("op=GETFILESTATUS")) {
                    return new MockResponse().setResponseCode(200).setBody(TestADLResponseData.getGetFileStatusJSONResponse(createTestData.data.length));
                }
                if (!recordedRequest.getRequestLine().contains("op=OPEN")) {
                    return new MockResponse().setBody("NOT SUPPORTED").setResponseCode(501);
                }
                String requestLine = recordedRequest.getRequestLine();
                int i = 0;
                int i2 = 0;
                Matcher matcher = Pattern.compile("offset=([0-9]+)").matcher(requestLine);
                if (matcher.find()) {
                    TestConcurrentDataReadOperations.LOG.debug(matcher.group(1));
                    i = Integer.parseInt(matcher.group(1));
                }
                Matcher matcher2 = Pattern.compile("length=([0-9]+)").matcher(requestLine);
                if (matcher2.find()) {
                    TestConcurrentDataReadOperations.LOG.debug(matcher2.group(1));
                    i2 = Integer.parseInt(matcher2.group(1));
                }
                Buffer buffer = new Buffer();
                buffer.write(createTestData.data, i, Math.min(createTestData.data.length - i, i2));
                return new MockResponse().setResponseCode(200).setChunkedBody(buffer, 4194304);
            }
        });
    }

    @Before
    public void resetHandle() {
        commonHandle = null;
    }

    @Test
    public void testParallelReadOnDifferentStreams() throws IOException, InterruptedException, ExecutionException {
        ArrayList<CreateTestData> arrayList = new ArrayList<>();
        Random random = new Random();
        for (int i = 0; i < this.concurrencyLevel; i++) {
            CreateTestData createTestData = new CreateTestData();
            createTestData.set(new Path("/test/concurrentRead/" + UUID.randomUUID().toString()), getRandomByteArrayData(random.nextInt(1048576)));
            arrayList.add(createTestData);
        }
        setDispatcher(arrayList);
        ArrayList<ReadTestData> arrayList2 = new ArrayList<>();
        Iterator<CreateTestData> it = arrayList.iterator();
        while (it.hasNext()) {
            CreateTestData next = it.next();
            ReadTestData readTestData = new ReadTestData();
            readTestData.set(next.path, next.data, 0);
            arrayList2.add(readTestData);
        }
        runReadTest(arrayList2, false);
    }

    @Test
    public void testParallelReadOnSameStreams() throws IOException, InterruptedException, ExecutionException {
        ArrayList<CreateTestData> arrayList = new ArrayList<>();
        Random random = new Random();
        for (int i = 0; i < 1; i++) {
            CreateTestData createTestData = new CreateTestData();
            createTestData.set(new Path("/test/concurrentRead/" + UUID.randomUUID().toString()), getRandomByteArrayData(1048576));
            arrayList.add(createTestData);
        }
        setDispatcher(arrayList);
        ArrayList<ReadTestData> arrayList2 = new ArrayList<>();
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(arrayList.get(0).data);
        ReadTestData readTestData = new ReadTestData();
        byte[] bArr = new byte[1048576];
        byteArrayInputStream.read(bArr);
        readTestData.set(arrayList.get(0).path, bArr, 0);
        arrayList2.add(readTestData);
        runReadTest(arrayList2, false);
        arrayList2.clear();
        for (int i2 = 0; i2 < this.concurrencyLevel * 5; i2++) {
            ReadTestData readTestData2 = new ReadTestData();
            int nextInt = random.nextInt(1048575);
            byte[] bArr2 = new byte[1048576 - nextInt];
            byteArrayInputStream.reset();
            byteArrayInputStream.skip(nextInt);
            byteArrayInputStream.read(bArr2);
            readTestData2.set(arrayList.get(0).path, bArr2, nextInt);
            arrayList2.add(readTestData2);
        }
        runReadTest(arrayList2, true);
    }

    void runReadTest(ArrayList<ReadTestData> arrayList, boolean z) throws InterruptedException, ExecutionException {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(arrayList.size());
        Future[] futureArr = new Future[arrayList.size()];
        for (int i = 0; i < arrayList.size(); i++) {
            futureArr[i] = newFixedThreadPool.submit(new ReadConcurrentRunnable(arrayList.get(i).data, arrayList.get(i).path, arrayList.get(i).offset, z));
        }
        newFixedThreadPool.shutdown();
        newFixedThreadPool.awaitTermination(120L, TimeUnit.SECONDS);
        for (int i2 = 0; i2 < arrayList.size(); i2++) {
            Assert.assertTrue(((Boolean) futureArr[i2].get()).booleanValue());
        }
    }
}
