package org.apache.beam.sdk.fn.data;

import com.google.protobuf.ByteString;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.repackaged.beam_sdks_java_fn_execution.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.util.WindowedValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/fn/data/BeamFnDataBufferingOutboundObserver.class */
public class BeamFnDataBufferingOutboundObserver<T> implements CloseableFnDataReceiver<WindowedValue<T>> {
    public static final String BEAM_FN_API_DATA_BUFFER_LIMIT = "beam_fn_api_data_buffer_limit=";

    @VisibleForTesting
    static final int DEFAULT_BUFFER_LIMIT_BYTES = 1000000;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) BeamFnDataBufferingOutboundObserver.class);
    private long byteCounter;
    private long counter;
    private final int bufferLimit;
    private final Coder<WindowedValue<T>> coder;
    private final LogicalEndpoint outputLocation;
    private final StreamObserver<BeamFnApi.Elements> outboundObserver;
    private final ByteString.Output bufferedElements = ByteString.newOutput();
    private boolean closed = false;

    public static <T> BeamFnDataBufferingOutboundObserver<T> forLocation(LogicalEndpoint logicalEndpoint, Coder<WindowedValue<T>> coder, StreamObserver<BeamFnApi.Elements> streamObserver) {
        return forLocationWithBufferLimit(1000000, logicalEndpoint, coder, streamObserver);
    }

    public static <T> BeamFnDataBufferingOutboundObserver<T> forLocationWithBufferLimit(int i, LogicalEndpoint logicalEndpoint, Coder<WindowedValue<T>> coder, StreamObserver<BeamFnApi.Elements> streamObserver) {
        return new BeamFnDataBufferingOutboundObserver<>(i, logicalEndpoint, coder, streamObserver);
    }

    private BeamFnDataBufferingOutboundObserver(int i, LogicalEndpoint logicalEndpoint, Coder<WindowedValue<T>> coder, StreamObserver<BeamFnApi.Elements> streamObserver) {
        this.bufferLimit = i;
        this.outputLocation = logicalEndpoint;
        this.coder = coder;
        this.outboundObserver = streamObserver;
    }

    @Override // org.apache.beam.sdk.fn.data.CloseableFnDataReceiver, java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.closed) {
            throw new IllegalStateException("Already closed.");
        }
        this.closed = true;
        BeamFnApi.Elements.Builder convertBufferForTransmission = convertBufferForTransmission();
        convertBufferForTransmission.addDataBuilder().setInstructionReference(this.outputLocation.getInstructionId()).setTarget(this.outputLocation.getTarget());
        LOG.debug("Closing stream for instruction {} and target {} having transmitted {} values {} bytes", this.outputLocation.getInstructionId(), this.outputLocation.getTarget(), Long.valueOf(this.counter), Long.valueOf(this.byteCounter));
        this.outboundObserver.onNext(convertBufferForTransmission.build());
    }

    @Override // org.apache.beam.sdk.fn.data.FnDataReceiver
    public void accept(WindowedValue<T> windowedValue) throws IOException {
        if (this.closed) {
            throw new IllegalStateException("Already closed.");
        }
        this.coder.encode(windowedValue, this.bufferedElements);
        this.counter++;
        if (this.bufferedElements.size() >= this.bufferLimit) {
            this.outboundObserver.onNext(convertBufferForTransmission().build());
        }
    }

    private BeamFnApi.Elements.Builder convertBufferForTransmission() {
        BeamFnApi.Elements.Builder newBuilder = BeamFnApi.Elements.newBuilder();
        if (this.bufferedElements.size() == 0) {
            return newBuilder;
        }
        newBuilder.addDataBuilder().setInstructionReference(this.outputLocation.getInstructionId()).setTarget(this.outputLocation.getTarget()).setData(this.bufferedElements.toByteString());
        this.byteCounter += this.bufferedElements.size();
        this.bufferedElements.reset();
        return newBuilder;
    }
}
