package org.apache.beam.fn.harness;

import com.google.auto.service.AutoService;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.apache.beam.fn.harness.PTransformRunnerFactory;
import org.apache.beam.fn.harness.control.BundleSplitListener;
import org.apache.beam.fn.harness.data.BeamFnDataClient;
import org.apache.beam.fn.harness.data.MultiplexingFnDataReceiver;
import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.beam_sdks_java_harness.com.google.common.collect.ImmutableMap;
import org.apache.beam.repackaged.beam_sdks_java_harness.com.google.common.collect.Iterables;
import org.apache.beam.repackaged.beam_sdks_java_harness.com.google.common.collect.ListMultimap;
import org.apache.beam.runners.core.construction.CoderTranslation;
import org.apache.beam.runners.core.construction.RehydratedComponents;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.data.InboundDataClient;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.sdk.fn.data.RemoteGrpcPortRead;
import org.apache.beam.sdk.fn.function.ThrowingRunnable;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.WindowedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/fn/harness/BeamFnDataReadRunner.class */
public class BeamFnDataReadRunner<OutputT> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) BeamFnDataReadRunner.class);
    private final Endpoints.ApiServiceDescriptor apiServiceDescriptor;
    private final FnDataReceiver<WindowedValue<OutputT>> receiver;
    private final Supplier<String> processBundleInstructionIdSupplier;
    private final BeamFnDataClient beamFnDataClient;
    private final Coder<WindowedValue<OutputT>> coder;
    private final BeamFnApi.Target inputTarget;
    private InboundDataClient readFuture;

    /* loaded from: input_file:org/apache/beam/fn/harness/BeamFnDataReadRunner$Factory.class */
    static class Factory<OutputT> implements PTransformRunnerFactory<BeamFnDataReadRunner<OutputT>> {
        Factory() {
        }

        @Override // org.apache.beam.fn.harness.PTransformRunnerFactory
        public BeamFnDataReadRunner<OutputT> createRunnerForPTransform(PipelineOptions pipelineOptions, BeamFnDataClient beamFnDataClient, BeamFnStateClient beamFnStateClient, String str, RunnerApi.PTransform pTransform, Supplier<String> supplier, Map<String, RunnerApi.PCollection> map, Map<String, RunnerApi.Coder> map2, Map<String, RunnerApi.WindowingStrategy> map3, ListMultimap<String, FnDataReceiver<WindowedValue<?>>> listMultimap, Consumer<ThrowingRunnable> consumer, Consumer<ThrowingRunnable> consumer2, BundleSplitListener bundleSplitListener) throws IOException {
            BeamFnDataReadRunner<OutputT> beamFnDataReadRunner = new BeamFnDataReadRunner<>(pTransform, supplier, BeamFnApi.Target.newBuilder().setPrimitiveTransformReference(str).setName((String) Iterables.getOnlyElement(pTransform.getOutputsMap().keySet())).build(), map2.get(map.get(Iterables.getOnlyElement(pTransform.getOutputsMap().values())).getCoderId()), map2, beamFnDataClient, listMultimap.get((ListMultimap<String, FnDataReceiver<WindowedValue<?>>>) Iterables.getOnlyElement(pTransform.getOutputsMap().values())));
            Objects.requireNonNull(beamFnDataReadRunner);
            consumer.accept(beamFnDataReadRunner::registerInputLocation);
            Objects.requireNonNull(beamFnDataReadRunner);
            consumer2.accept(beamFnDataReadRunner::blockTillReadFinishes);
            return beamFnDataReadRunner;
        }

        @Override // org.apache.beam.fn.harness.PTransformRunnerFactory
        public /* bridge */ /* synthetic */ Object createRunnerForPTransform(PipelineOptions pipelineOptions, BeamFnDataClient beamFnDataClient, BeamFnStateClient beamFnStateClient, String str, RunnerApi.PTransform pTransform, Supplier supplier, Map map, Map map2, Map map3, ListMultimap listMultimap, Consumer consumer, Consumer consumer2, BundleSplitListener bundleSplitListener) throws IOException {
            return createRunnerForPTransform(pipelineOptions, beamFnDataClient, beamFnStateClient, str, pTransform, (Supplier<String>) supplier, (Map<String, RunnerApi.PCollection>) map, (Map<String, RunnerApi.Coder>) map2, (Map<String, RunnerApi.WindowingStrategy>) map3, (ListMultimap<String, FnDataReceiver<WindowedValue<?>>>) listMultimap, (Consumer<ThrowingRunnable>) consumer, (Consumer<ThrowingRunnable>) consumer2, bundleSplitListener);
        }
    }

    @AutoService(PTransformRunnerFactory.Registrar.class)
    /* loaded from: input_file:org/apache/beam/fn/harness/BeamFnDataReadRunner$Registrar.class */
    public static class Registrar implements PTransformRunnerFactory.Registrar {
        @Override // org.apache.beam.fn.harness.PTransformRunnerFactory.Registrar
        public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() {
            return ImmutableMap.of(RemoteGrpcPortRead.URN, new Factory());
        }
    }

    BeamFnDataReadRunner(RunnerApi.PTransform pTransform, Supplier<String> supplier, BeamFnApi.Target target, RunnerApi.Coder coder, Map<String, RunnerApi.Coder> map, BeamFnDataClient beamFnDataClient, Collection<FnDataReceiver<WindowedValue<OutputT>>> collection) throws IOException {
        BeamFnApi.RemoteGrpcPort port = RemoteGrpcPortRead.fromPTransform(pTransform).getPort();
        this.apiServiceDescriptor = port.getApiServiceDescriptor();
        this.inputTarget = target;
        this.processBundleInstructionIdSupplier = supplier;
        this.beamFnDataClient = beamFnDataClient;
        this.receiver = MultiplexingFnDataReceiver.forConsumers(collection);
        RehydratedComponents forComponents = RehydratedComponents.forComponents(RunnerApi.Components.newBuilder().putAllCoders(map).build());
        this.coder = (Coder<WindowedValue<OutputT>>) (!port.getCoderId().isEmpty() ? CoderTranslation.fromProto(map.get(port.getCoderId()), forComponents) : CoderTranslation.fromProto(coder, forComponents));
    }

    public void registerInputLocation() {
        this.readFuture = this.beamFnDataClient.receive(this.apiServiceDescriptor, LogicalEndpoint.of(this.processBundleInstructionIdSupplier.get(), this.inputTarget), this.coder, this.receiver);
    }

    public void blockTillReadFinishes() throws Exception {
        LOG.debug("Waiting for process bundle instruction {} and target {} to close.", this.processBundleInstructionIdSupplier.get(), this.inputTarget);
        this.readFuture.awaitCompletion();
    }
}
