package org.apache.beam.runners.core;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.Any;
import com.google.protobuf.ByteString;
import com.google.protobuf.BytesValue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.ServiceLoader;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.fn.harness.fn.ThrowingConsumer;
import org.apache.beam.fn.harness.fn.ThrowingRunnable;
import org.apache.beam.fn.harness.test.TestExecutors;
import org.apache.beam.fn.v1.BeamFnApi;
import org.apache.beam.runners.core.BeamFnDataReadRunner;
import org.apache.beam.runners.core.PTransformRunnerFactory;
import org.apache.beam.runners.dataflow.util.CloudObjects;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.common.runner.v1.RunnerApi;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdks.java.harness.repackaged.com.google.common.base.Supplier;
import org.apache.beam.sdks.java.harness.repackaged.com.google.common.base.Suppliers;
import org.apache.beam.sdks.java.harness.repackaged.com.google.common.collect.HashMultimap;
import org.apache.beam.sdks.java.harness.repackaged.com.google.common.collect.ImmutableList;
import org.apache.beam.sdks.java.harness.repackaged.com.google.common.collect.ImmutableMap;
import org.apache.beam.sdks.java.harness.repackaged.com.google.common.collect.Iterables;
import org.apache.beam.sdks.java.harness.repackaged.com.google.common.util.concurrent.Uninterruptibles;
import org.hamcrest.collection.IsMapContaining;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/core/BeamFnDataReadRunnerTest.class */
public class BeamFnDataReadRunnerTest {
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private static final BeamFnApi.RemoteGrpcPort PORT_SPEC = BeamFnApi.RemoteGrpcPort.newBuilder().setApiServiceDescriptor(BeamFnApi.ApiServiceDescriptor.getDefaultInstance()).build();
    private static final RunnerApi.FunctionSpec FUNCTION_SPEC = RunnerApi.FunctionSpec.newBuilder().setParameter(Any.pack(PORT_SPEC)).build();
    private static final Coder<WindowedValue<String>> CODER = WindowedValue.getFullCoder(StringUtf8Coder.of(), GlobalWindow.Coder.INSTANCE);
    private static final String CODER_SPEC_ID = "string-coder-id";
    private static final RunnerApi.Coder CODER_SPEC;
    private static final String URN = "urn:org.apache.beam:source:runner:0.1";
    private static final BeamFnApi.Target INPUT_TARGET;

    @Rule
    public TestExecutors.TestExecutorService executor = TestExecutors.from(Executors::newCachedThreadPool);

    @Mock
    private BeamFnDataClient mockBeamFnDataClient;

    @Captor
    private ArgumentCaptor<ThrowingConsumer<WindowedValue<String>>> consumerCaptor;

    @Before
    public void setUp() {
        MockitoAnnotations.initMocks(this);
    }

    @Test
    public void testCreatingAndProcessingBeamFnDataReadRunner() throws Exception {
        ArrayList arrayList = new ArrayList();
        HashMultimap create = HashMultimap.create();
        arrayList.getClass();
        create.put("outputPC", (v1) -> {
            r2.add(v1);
        });
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        RunnerApi.PTransform build = RunnerApi.PTransform.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(URN).setParameter(Any.pack(PORT_SPEC)).build()).putOutputs("101", "outputPC").build();
        BeamFnDataReadRunner.Factory factory = new BeamFnDataReadRunner.Factory();
        PipelineOptions create2 = PipelineOptionsFactory.create();
        BeamFnDataClient beamFnDataClient = this.mockBeamFnDataClient;
        Supplier ofInstance = Suppliers.ofInstance("57");
        ofInstance.getClass();
        java.util.function.Supplier supplier = ofInstance::get;
        ImmutableMap of = ImmutableMap.of("outputPC", RunnerApi.PCollection.newBuilder().setCoderId(CODER_SPEC_ID).build());
        ImmutableMap of2 = ImmutableMap.of(CODER_SPEC_ID, CODER_SPEC);
        arrayList2.getClass();
        Consumer consumer = (v1) -> {
            r9.add(v1);
        };
        arrayList3.getClass();
        factory.createRunnerForPTransform(create2, beamFnDataClient, "pTransformId", build, supplier, of, of2, create, consumer, (v1) -> {
            r10.add(v1);
        });
        Mockito.verifyZeroInteractions(new Object[]{this.mockBeamFnDataClient});
        CompletableFuture completableFuture = new CompletableFuture();
        Mockito.when(this.mockBeamFnDataClient.forInboundConsumer((BeamFnApi.ApiServiceDescriptor) Matchers.any(), (KV) Matchers.any(), (Coder) Matchers.any(), (ThrowingConsumer) Matchers.any())).thenReturn(completableFuture);
        ((ThrowingRunnable) Iterables.getOnlyElement(arrayList2)).run();
        ((BeamFnDataClient) Mockito.verify(this.mockBeamFnDataClient)).forInboundConsumer((BeamFnApi.ApiServiceDescriptor) Matchers.eq(PORT_SPEC.getApiServiceDescriptor()), (KV) Matchers.eq(KV.of("57", BeamFnApi.Target.newBuilder().setPrimitiveTransformReference("pTransformId").setName("101").build())), (Coder) Matchers.eq(CODER), (ThrowingConsumer) this.consumerCaptor.capture());
        ((ThrowingConsumer) this.consumerCaptor.getValue()).accept(WindowedValue.valueInGlobalWindow("TestValue"));
        Assert.assertThat(arrayList, org.hamcrest.Matchers.contains(new WindowedValue[]{WindowedValue.valueInGlobalWindow("TestValue")}));
        arrayList.clear();
        Assert.assertThat(create.keySet(), org.hamcrest.Matchers.containsInAnyOrder(new String[]{"outputPC"}));
        completableFuture.complete(null);
        ((ThrowingRunnable) Iterables.getOnlyElement(arrayList3)).run();
        Mockito.verifyNoMoreInteractions(new Object[]{this.mockBeamFnDataClient});
    }

    @Test
    public void testReuseForMultipleBundles() throws Exception {
        final CompletableFuture completableFuture = new CompletableFuture();
        final CompletableFuture completableFuture2 = new CompletableFuture();
        Mockito.when(this.mockBeamFnDataClient.forInboundConsumer((BeamFnApi.ApiServiceDescriptor) Matchers.any(), (KV) Matchers.any(), (Coder) Matchers.any(), (ThrowingConsumer) Matchers.any())).thenReturn(completableFuture).thenReturn(completableFuture2);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        AtomicReference atomicReference = new AtomicReference("0");
        RunnerApi.FunctionSpec functionSpec = FUNCTION_SPEC;
        atomicReference.getClass();
        java.util.function.Supplier supplier = atomicReference::get;
        BeamFnApi.Target target = INPUT_TARGET;
        RunnerApi.Coder coder = CODER_SPEC;
        BeamFnDataClient beamFnDataClient = this.mockBeamFnDataClient;
        arrayList.getClass();
        ThrowingConsumer throwingConsumer = (v1) -> {
            r7.add(v1);
        };
        arrayList2.getClass();
        BeamFnDataReadRunner beamFnDataReadRunner = new BeamFnDataReadRunner(functionSpec, supplier, target, coder, beamFnDataClient, ImmutableList.of(throwingConsumer, (v1) -> {
            r8.add(v1);
        }));
        beamFnDataReadRunner.registerInputLocation();
        ((BeamFnDataClient) Mockito.verify(this.mockBeamFnDataClient)).forInboundConsumer((BeamFnApi.ApiServiceDescriptor) Matchers.eq(PORT_SPEC.getApiServiceDescriptor()), (KV) Matchers.eq(KV.of(atomicReference.get(), INPUT_TARGET)), (Coder) Matchers.eq(CODER), (ThrowingConsumer) this.consumerCaptor.capture());
        this.executor.submit(new Runnable() { // from class: org.apache.beam.runners.core.BeamFnDataReadRunnerTest.1
            @Override // java.lang.Runnable
            public void run() {
                Uninterruptibles.sleepUninterruptibly(100L, TimeUnit.MILLISECONDS);
                try {
                    ((ThrowingConsumer) BeamFnDataReadRunnerTest.this.consumerCaptor.getValue()).accept(WindowedValue.valueInGlobalWindow("ABC"));
                    ((ThrowingConsumer) BeamFnDataReadRunnerTest.this.consumerCaptor.getValue()).accept(WindowedValue.valueInGlobalWindow("DEF"));
                } catch (Exception e) {
                    completableFuture.completeExceptionally(e);
                } finally {
                    completableFuture.complete(false);
                }
            }
        });
        beamFnDataReadRunner.blockTillReadFinishes();
        Assert.assertThat(arrayList, org.hamcrest.Matchers.contains(new WindowedValue[]{WindowedValue.valueInGlobalWindow("ABC"), WindowedValue.valueInGlobalWindow("DEF")}));
        Assert.assertThat(arrayList2, org.hamcrest.Matchers.contains(new WindowedValue[]{WindowedValue.valueInGlobalWindow("ABC"), WindowedValue.valueInGlobalWindow("DEF")}));
        atomicReference.set("1");
        arrayList.clear();
        arrayList2.clear();
        beamFnDataReadRunner.registerInputLocation();
        ((BeamFnDataClient) Mockito.verify(this.mockBeamFnDataClient)).forInboundConsumer((BeamFnApi.ApiServiceDescriptor) Matchers.eq(PORT_SPEC.getApiServiceDescriptor()), (KV) Matchers.eq(KV.of(atomicReference.get(), INPUT_TARGET)), (Coder) Matchers.eq(CODER), (ThrowingConsumer) this.consumerCaptor.capture());
        this.executor.submit(new Runnable() { // from class: org.apache.beam.runners.core.BeamFnDataReadRunnerTest.2
            @Override // java.lang.Runnable
            public void run() {
                Uninterruptibles.sleepUninterruptibly(100L, TimeUnit.MILLISECONDS);
                try {
                    ((ThrowingConsumer) BeamFnDataReadRunnerTest.this.consumerCaptor.getValue()).accept(WindowedValue.valueInGlobalWindow("GHI"));
                    ((ThrowingConsumer) BeamFnDataReadRunnerTest.this.consumerCaptor.getValue()).accept(WindowedValue.valueInGlobalWindow("JKL"));
                } catch (Exception e) {
                    completableFuture2.completeExceptionally(e);
                } finally {
                    completableFuture2.complete(false);
                }
            }
        });
        beamFnDataReadRunner.blockTillReadFinishes();
        Assert.assertThat(arrayList, org.hamcrest.Matchers.contains(new WindowedValue[]{WindowedValue.valueInGlobalWindow("GHI"), WindowedValue.valueInGlobalWindow("JKL")}));
        Assert.assertThat(arrayList2, org.hamcrest.Matchers.contains(new WindowedValue[]{WindowedValue.valueInGlobalWindow("GHI"), WindowedValue.valueInGlobalWindow("JKL")}));
        Mockito.verifyNoMoreInteractions(new Object[]{this.mockBeamFnDataClient});
    }

    @Test
    public void testRegistration() {
        Iterator it = ServiceLoader.load(PTransformRunnerFactory.Registrar.class).iterator();
        while (it.hasNext()) {
            PTransformRunnerFactory.Registrar registrar = (PTransformRunnerFactory.Registrar) it.next();
            if (registrar instanceof BeamFnDataReadRunner.Registrar) {
                Assert.assertThat(registrar.getPTransformRunnerFactories(), IsMapContaining.hasKey(URN));
                return;
            }
        }
        Assert.fail("Expected registrar not found.");
    }

    static {
        try {
            CODER_SPEC = RunnerApi.Coder.newBuilder().setSpec(RunnerApi.SdkFunctionSpec.newBuilder().setSpec(RunnerApi.FunctionSpec.newBuilder().setParameter(Any.pack(BytesValue.newBuilder().setValue(ByteString.copyFrom(OBJECT_MAPPER.writeValueAsBytes(CloudObjects.asCloudObject(CODER)))).build())).build()).build()).build();
            INPUT_TARGET = BeamFnApi.Target.newBuilder().setPrimitiveTransformReference("1").setName("out").build();
        } catch (IOException e) {
            throw new ExceptionInInitializerError(e);
        }
    }
}
