package org.apache.beam.sdk.io.gcp.pubsublite;

import com.google.api.core.ApiService;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.wire.Subscriber;
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.FlowControlRequest;
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import io.grpc.StatusException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;

/* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsublite/BufferingPullSubscriber.class */
class BufferingPullSubscriber implements PullSubscriber {
    private final Subscriber underlying;
    private final AtomicReference<StatusException> error;
    private final LinkedBlockingQueue<SequencedMessage> messages;

    /* JADX INFO: Access modifiers changed from: package-private */
    public BufferingPullSubscriber(SubscriberFactory subscriberFactory, FlowControlSettings flowControlSettings) throws StatusException {
        this.error = new AtomicReference<>();
        this.messages = new LinkedBlockingQueue<>();
        this.underlying = subscriberFactory.New(immutableList -> {
            this.messages.addAll((Collection) immutableList.stream().map(sequencedMessage -> {
                return sequencedMessage.toProto();
            }).collect(Collectors.toList()));
        });
        this.underlying.addListener(new ApiService.Listener() { // from class: org.apache.beam.sdk.io.gcp.pubsublite.BufferingPullSubscriber.1
            public void failed(ApiService.State state, Throwable th) {
                BufferingPullSubscriber.this.error.set(ExtractStatus.toCanonical(th));
            }
        }, MoreExecutors.directExecutor());
        this.underlying.startAsync().awaitRunning();
        this.underlying.allowFlow(FlowControlRequest.newBuilder().setAllowedMessages(flowControlSettings.messagesOutstanding()).setAllowedBytes(flowControlSettings.bytesOutstanding()).build());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BufferingPullSubscriber(SubscriberFactory subscriberFactory, FlowControlSettings flowControlSettings, Offset offset) throws StatusException {
        this(subscriberFactory, flowControlSettings);
        try {
            this.underlying.seek(SeekRequest.newBuilder().setCursor(Cursor.newBuilder().setOffset(offset.value())).build()).get();
        } catch (InterruptedException e) {
            throw ExtractStatus.toCanonical(e);
        } catch (ExecutionException e2) {
            throw ExtractStatus.toCanonical(e2.getCause());
        }
    }

    @Override // org.apache.beam.sdk.io.gcp.pubsublite.PullSubscriber
    public List<SequencedMessage> pull() throws StatusException {
        StatusException statusException = this.error.get();
        if (statusException != null) {
            throw statusException;
        }
        ArrayList arrayList = new ArrayList();
        this.messages.drainTo(arrayList);
        this.underlying.allowFlow(FlowControlRequest.newBuilder().setAllowedBytes(arrayList.stream().mapToLong((v0) -> {
            return v0.getSizeBytes();
        }).sum()).setAllowedMessages(arrayList.size()).build());
        return ImmutableList.copyOf(arrayList);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.underlying.stopAsync().awaitTerminated();
    }
}
