package org.apache.beam.sdk.fn.stream;

import io.grpc.stub.CallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.beam.repackaged.beam_sdks_java_fn_execution.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.repackaged.beam_sdks_java_fn_execution.com.google.common.base.Preconditions;

@ThreadSafe
/* loaded from: input_file:org/apache/beam/sdk/fn/stream/BufferingStreamObserver.class */
public final class BufferingStreamObserver<T> implements StreamObserver<T> {
    private static final Object POISON_PILL = new Object();
    private final LinkedBlockingDeque<T> queue;
    private final Phaser phaser;
    private final CallStreamObserver<T> outboundObserver;
    private final Future<?> queueDrainer;
    private final int bufferSize;

    public BufferingStreamObserver(Phaser phaser, CallStreamObserver<T> callStreamObserver, ExecutorService executorService, int i) {
        this.phaser = phaser;
        this.bufferSize = i;
        this.queue = new LinkedBlockingDeque<>(i);
        this.outboundObserver = callStreamObserver;
        this.queueDrainer = executorService.submit(this::drainQueue);
    }

    private void drainQueue() {
        while (true) {
            try {
                int phase = this.phaser.getPhase();
                while (this.outboundObserver.isReady()) {
                    T take = this.queue.take();
                    if (take == POISON_PILL) {
                        return;
                    } else {
                        this.outboundObserver.onNext(take);
                    }
                }
                this.phaser.awaitAdvance(phase);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IllegalStateException(e);
            }
        }
    }

    @Override // io.grpc.stub.StreamObserver
    public void onNext(T t) {
        while (!this.queue.offer(t, 60L, TimeUnit.SECONDS)) {
            try {
                Preconditions.checkState(!this.queueDrainer.isDone(), "Stream observer has finished.");
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }
    }

    @Override // io.grpc.stub.StreamObserver
    public void onError(Throwable th) {
        synchronized (this.outboundObserver) {
            if (!this.queueDrainer.isDone()) {
                while (!this.queueDrainer.isDone() && !this.queue.offerFirst(POISON_PILL, 60L, TimeUnit.SECONDS)) {
                    try {
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        throw new RuntimeException(e);
                    }
                }
                waitTillFinish();
            }
            this.outboundObserver.onError(th);
        }
    }

    @Override // io.grpc.stub.StreamObserver
    public void onCompleted() {
        synchronized (this.outboundObserver) {
            if (!this.queueDrainer.isDone()) {
                while (!this.queueDrainer.isDone() && !this.queue.offerLast(POISON_PILL, 60L, TimeUnit.SECONDS)) {
                    try {
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        throw new RuntimeException(e);
                    }
                }
                waitTillFinish();
            }
            this.outboundObserver.onCompleted();
        }
    }

    @VisibleForTesting
    public int getBufferSize() {
        return this.bufferSize;
    }

    private void waitTillFinish() {
        try {
            this.queueDrainer.get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        } catch (CancellationException e2) {
        } catch (ExecutionException e3) {
            throw new RuntimeException(e3);
        }
    }
}
