package com.scalar.db.storage.rpc;

import com.google.common.util.concurrent.Uninterruptibles;
import com.scalar.db.api.Result;
import com.scalar.db.api.Scan;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.exception.storage.ExecutionException;
import com.scalar.db.rpc.DistributedStorageGrpc;
import com.scalar.db.rpc.ScanRequest;
import com.scalar.db.rpc.ScanResponse;
import com.scalar.db.util.ProtoUtil;
import com.scalar.db.util.Utility;
import com.scalar.db.util.retry.ServiceTemporaryUnavailableException;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.ClientResponseObserver;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import javax.annotation.concurrent.NotThreadSafe;

@NotThreadSafe
/* loaded from: input_file:com/scalar/db/storage/rpc/GrpcScanOnBidirectionalStream.class */
public class GrpcScanOnBidirectionalStream implements ClientResponseObserver<ScanRequest, ScanResponse> {
    private final GrpcConfig config;
    private final TableMetadata metadata;
    private final BlockingQueue<ResponseOrError> queue = new LinkedBlockingQueue();
    private final AtomicBoolean hasMoreResults = new AtomicBoolean(true);
    private ClientCallStreamObserver<ScanRequest> requestStream;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/scalar/db/storage/rpc/GrpcScanOnBidirectionalStream$ResponseOrError.class */
    public static class ResponseOrError {
        private final ScanResponse response;
        private final Throwable error;

        public ResponseOrError(ScanResponse scanResponse) {
            this.response = scanResponse;
            this.error = null;
        }

        public ResponseOrError(Throwable th) {
            this.response = null;
            this.error = th;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isError() {
            return this.error != null;
        }

        public ScanResponse getResponse() {
            return this.response;
        }

        public Throwable getError() {
            return this.error;
        }
    }

    public GrpcScanOnBidirectionalStream(GrpcConfig grpcConfig, DistributedStorageGrpc.DistributedStorageStub distributedStorageStub, TableMetadata tableMetadata) {
        this.config = grpcConfig;
        this.metadata = tableMetadata;
        distributedStorageStub.scan(this);
    }

    public void beforeStart(ClientCallStreamObserver<ScanRequest> clientCallStreamObserver) {
        this.requestStream = clientCallStreamObserver;
    }

    public void onNext(ScanResponse scanResponse) {
        Uninterruptibles.putUninterruptibly(this.queue, new ResponseOrError(scanResponse));
    }

    public void onError(Throwable th) {
        Uninterruptibles.putUninterruptibly(this.queue, new ResponseOrError(th));
    }

    public void onCompleted() {
    }

    private ResponseOrError sendRequest(ScanRequest scanRequest) {
        this.requestStream.onNext(scanRequest);
        ResponseOrError responseOrError = (ResponseOrError) Utility.pollUninterruptibly(this.queue, this.config.getDeadlineDurationMillis(), TimeUnit.MILLISECONDS);
        if (responseOrError != null) {
            return responseOrError;
        }
        this.requestStream.cancel("deadline exceeded", (Throwable) null);
        return (ResponseOrError) Uninterruptibles.takeUninterruptibly(this.queue);
    }

    private void throwIfScannerHasNoMoreResults() {
        if (!this.hasMoreResults.get()) {
            throw new IllegalStateException("the scan operation has no more results");
        }
    }

    private List<Result> getResults(ScanResponse scanResponse) throws ExecutionException {
        if (!scanResponse.getHasMoreResults()) {
            closeScanner();
        }
        return (List) scanResponse.getResultList().stream().map(result -> {
            return ProtoUtil.toResult(result, this.metadata);
        }).collect(Collectors.toList());
    }

    public List<Result> openScanner(Scan scan) throws ExecutionException {
        throwIfScannerHasNoMoreResults();
        ResponseOrError sendRequest = sendRequest(ScanRequest.newBuilder().setScan(ProtoUtil.toScan(scan)).build());
        throwIfErrorForOpenScanner(sendRequest);
        return getResults(sendRequest.getResponse());
    }

    private void throwIfErrorForOpenScanner(ResponseOrError responseOrError) throws ExecutionException {
        if (responseOrError.isError()) {
            this.hasMoreResults.set(false);
            StatusRuntimeException error = responseOrError.getError();
            if (error instanceof StatusRuntimeException) {
                StatusRuntimeException statusRuntimeException = error;
                if (statusRuntimeException.getStatus().getCode() == Status.Code.INVALID_ARGUMENT) {
                    throw new IllegalArgumentException(statusRuntimeException.getMessage(), statusRuntimeException);
                }
                if (statusRuntimeException.getStatus().getCode() == Status.Code.UNAVAILABLE) {
                    throw new ServiceTemporaryUnavailableException(statusRuntimeException.getMessage(), statusRuntimeException);
                }
            }
            if (!(error instanceof Error)) {
                throw new ExecutionException("failed to open scanner", error);
            }
            throw ((Error) error);
        }
    }

    public List<Result> next() throws ExecutionException {
        throwIfScannerHasNoMoreResults();
        ResponseOrError sendRequest = sendRequest(ScanRequest.getDefaultInstance());
        throwIfErrorForNext(sendRequest);
        return getResults(sendRequest.getResponse());
    }

    public List<Result> next(int i) throws ExecutionException {
        throwIfScannerHasNoMoreResults();
        ResponseOrError sendRequest = sendRequest(ScanRequest.newBuilder().setFetchCount(i).build());
        throwIfErrorForNext(sendRequest);
        return getResults(sendRequest.getResponse());
    }

    private void throwIfErrorForNext(ResponseOrError responseOrError) throws ExecutionException {
        if (responseOrError.isError()) {
            this.hasMoreResults.set(false);
            StatusRuntimeException error = responseOrError.getError();
            if (error instanceof StatusRuntimeException) {
                StatusRuntimeException statusRuntimeException = error;
                if (statusRuntimeException.getStatus().getCode() == Status.Code.INVALID_ARGUMENT) {
                    throw new IllegalArgumentException(statusRuntimeException.getMessage(), statusRuntimeException);
                }
            }
            if (!(error instanceof Error)) {
                throw new ExecutionException("failed to next", error);
            }
            throw ((Error) error);
        }
    }

    public void closeScanner() throws ExecutionException {
        try {
            if (this.hasMoreResults.get()) {
                this.hasMoreResults.set(false);
                this.requestStream.onCompleted();
            }
        } catch (StatusRuntimeException e) {
            throw new ExecutionException("failed to close the scanner", e);
        }
    }

    public boolean hasMoreResults() {
        return this.hasMoreResults.get();
    }
}
