package com.scalar.db.server;

import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Empty;
import com.scalar.db.api.DistributedStorage;
import com.scalar.db.api.Result;
import com.scalar.db.api.Scan;
import com.scalar.db.api.Scanner;
import com.scalar.db.api.TableMetadata;
import com.scalar.db.common.TableMetadataManager;
import com.scalar.db.exception.storage.NoMutationException;
import com.scalar.db.rpc.DistributedStorageGrpc;
import com.scalar.db.rpc.GetRequest;
import com.scalar.db.rpc.GetResponse;
import com.scalar.db.rpc.MutateRequest;
import com.scalar.db.rpc.Mutation;
import com.scalar.db.rpc.ScanRequest;
import com.scalar.db.rpc.ScanResponse;
import com.scalar.db.util.ProtoUtils;
import com.scalar.db.util.ThrowableRunnable;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:com/scalar/db/server/DistributedStorageService.class */
public class DistributedStorageService extends DistributedStorageGrpc.DistributedStorageImplBase {
    private static final Logger logger = LoggerFactory.getLogger(DistributedStorageService.class);
    private static final String SERVICE_NAME = "distributed_storage";
    private static final int DEFAULT_SCAN_FETCH_COUNT = 100;
    private final DistributedStorage storage;
    private final TableMetadataManager tableMetadataManager;
    private final GateKeeper gateKeeper;
    private final Metrics metrics;

    @VisibleForTesting
    /* loaded from: input_file:com/scalar/db/server/DistributedStorageService$ScanStreamObserver.class */
    static class ScanStreamObserver implements StreamObserver<ScanRequest> {
        private final DistributedStorage storage;
        private final TableMetadataManager tableMetadataManager;
        private final StreamObserver<ScanResponse> responseObserver;
        private final Metrics metrics;
        private final Function<StreamObserver<?>, Boolean> preProcessor;
        private final Runnable postProcessor;
        private final AtomicBoolean preProcessed = new AtomicBoolean();
        private final AtomicBoolean cleanedUp = new AtomicBoolean();
        private Scanner scanner;
        private boolean requestFromOldClient;

        public ScanStreamObserver(DistributedStorage distributedStorage, TableMetadataManager tableMetadataManager, StreamObserver<ScanResponse> streamObserver, Metrics metrics, Function<StreamObserver<?>, Boolean> function, Runnable runnable) {
            this.storage = distributedStorage;
            this.tableMetadataManager = tableMetadataManager;
            this.responseObserver = streamObserver;
            this.metrics = metrics;
            this.preProcessor = function;
            this.postProcessor = runnable;
        }

        public void onNext(ScanRequest scanRequest) {
            if (!this.preProcessed.compareAndSet(false, true) || this.preProcessor.apply(this.responseObserver).booleanValue()) {
                if (this.scanner == null) {
                    if (!scanRequest.hasScan()) {
                        respondInvalidArgumentError("the request doesn't have a Scan object even though scanner hasn't been opened yet");
                        return;
                    } else if (!openScanner(scanRequest)) {
                        return;
                    }
                } else if (scanRequest.hasScan()) {
                    respondInvalidArgumentError("scanner has already been opened. Don't specify a Scan object");
                    return;
                }
                ScanResponse next = next(scanRequest);
                if (next == null) {
                    return;
                }
                if (next.getHasMoreResults()) {
                    this.responseObserver.onNext(next);
                    return;
                }
                cleanUp();
                this.responseObserver.onNext(next);
                this.responseObserver.onCompleted();
            }
        }

        private boolean openScanner(ScanRequest scanRequest) {
            try {
                this.metrics.measure(DistributedStorageService.SERVICE_NAME, "scan.open_scanner", () -> {
                    TableMetadata tableMetadata = this.tableMetadataManager.getTableMetadata(scanRequest.getScan().getNamespace(), scanRequest.getScan().getTable());
                    if (tableMetadata == null) {
                        throw new IllegalArgumentException("The specified table is not found");
                    }
                    Scan scan = ProtoUtils.toScan(scanRequest.getScan(), tableMetadata);
                    this.requestFromOldClient = ProtoUtils.isRequestFromOldClient(scanRequest.getScan());
                    this.scanner = this.storage.scan(scan);
                });
                return true;
            } catch (IllegalArgumentException e) {
                respondInvalidArgumentError(e.getMessage());
                return false;
            } catch (Throwable th) {
                DistributedStorageService.logger.error("An internal error happened when opening a scanner", th);
                respondInternalError(th.getMessage());
                if (th instanceof Error) {
                    throw ((Error) th);
                }
                return false;
            }
        }

        private ScanResponse next(ScanRequest scanRequest) {
            try {
                return (ScanResponse) this.metrics.measure(DistributedStorageService.SERVICE_NAME, "scan.next", () -> {
                    Iterator<Result> it = this.scanner.iterator();
                    List<Result> fetch = fetch(it, scanRequest.hasFetchCount() ? scanRequest.getFetchCount() : DistributedStorageService.DEFAULT_SCAN_FETCH_COUNT);
                    ScanResponse.Builder newBuilder = ScanResponse.newBuilder();
                    if (this.requestFromOldClient) {
                        fetch.forEach(result -> {
                            newBuilder.addResults(ProtoUtils.toResultWithValue(result));
                        });
                    } else {
                        fetch.forEach(result2 -> {
                            newBuilder.addResults(ProtoUtils.toResult(result2));
                        });
                    }
                    return newBuilder.setHasMoreResults(it.hasNext()).build();
                });
            } catch (Throwable th) {
                DistributedStorageService.logger.error("An internal error happened during the execution", th);
                respondInternalError(th.getMessage());
                if (th instanceof Error) {
                    throw ((Error) th);
                }
                return null;
            }
        }

        private List<Result> fetch(Iterator<Result> it, int i) {
            ArrayList arrayList = new ArrayList(i);
            for (int i2 = 0; i2 < i && it.hasNext(); i2++) {
                arrayList.add(it.next());
            }
            return arrayList;
        }

        public void onError(Throwable th) {
            DistributedStorageService.logger.error("An error was received", th);
            cleanUp();
        }

        public void onCompleted() {
            if (this.cleanedUp.get()) {
                return;
            }
            cleanUp();
            this.responseObserver.onCompleted();
        }

        private void cleanUp() {
            try {
                if (this.scanner != null) {
                    this.scanner.close();
                }
            } catch (IOException e) {
                DistributedStorageService.logger.warn("Failed to close the scanner");
            }
            this.postProcessor.run();
            this.cleanedUp.set(true);
        }

        private void respondInternalError(String str) {
            this.responseObserver.onError(Status.INTERNAL.withDescription(str).asRuntimeException());
            cleanUp();
        }

        private void respondInvalidArgumentError(String str) {
            this.responseObserver.onError(Status.INVALID_ARGUMENT.withDescription(str).asRuntimeException());
            cleanUp();
        }
    }

    @SuppressFBWarnings({"EI_EXPOSE_REP2"})
    public DistributedStorageService(DistributedStorage distributedStorage, TableMetadataManager tableMetadataManager, GateKeeper gateKeeper, Metrics metrics) {
        this.storage = distributedStorage;
        this.tableMetadataManager = tableMetadataManager;
        this.gateKeeper = gateKeeper;
        this.metrics = metrics;
    }

    public void get(GetRequest getRequest, StreamObserver<GetResponse> streamObserver) {
        execute(() -> {
            TableMetadata tableMetadata = this.tableMetadataManager.getTableMetadata(getRequest.getGet().getNamespace(), getRequest.getGet().getTable());
            if (tableMetadata == null) {
                throw new IllegalArgumentException("The specified table is not found");
            }
            Optional optional = this.storage.get(ProtoUtils.toGet(getRequest.getGet(), tableMetadata));
            GetResponse.Builder newBuilder = GetResponse.newBuilder();
            if (ProtoUtils.isRequestFromOldClient(getRequest.getGet())) {
                optional.ifPresent(result -> {
                    newBuilder.setResult(ProtoUtils.toResultWithValue(result));
                });
            } else {
                optional.ifPresent(result2 -> {
                    newBuilder.setResult(ProtoUtils.toResult(result2));
                });
            }
            streamObserver.onNext(newBuilder.build());
            streamObserver.onCompleted();
        }, streamObserver, "get");
    }

    public StreamObserver<ScanRequest> scan(StreamObserver<ScanResponse> streamObserver) {
        return new ScanStreamObserver(this.storage, this.tableMetadataManager, streamObserver, this.metrics, this::preProcess, this::postProcess);
    }

    public void mutate(MutateRequest mutateRequest, StreamObserver<Empty> streamObserver) {
        execute(() -> {
            List emptyList;
            if (mutateRequest.getMutationsCount() > 0) {
                TableMetadata tableMetadata = this.tableMetadataManager.getTableMetadata(((Mutation) mutateRequest.getMutationsList().get(0)).getNamespace(), ((Mutation) mutateRequest.getMutationsList().get(0)).getTable());
                if (tableMetadata == null) {
                    throw new IllegalArgumentException("The specified table is not found");
                }
                emptyList = new ArrayList(mutateRequest.getMutationsCount());
                Iterator it = mutateRequest.getMutationsList().iterator();
                while (it.hasNext()) {
                    emptyList.add(ProtoUtils.toMutation((Mutation) it.next(), tableMetadata));
                }
            } else {
                emptyList = Collections.emptyList();
            }
            this.storage.mutate(emptyList);
            streamObserver.onNext(Empty.getDefaultInstance());
            streamObserver.onCompleted();
        }, streamObserver, "mutate");
    }

    private void execute(ThrowableRunnable<Throwable> throwableRunnable, StreamObserver<?> streamObserver, String str) {
        try {
            if (preProcess(streamObserver)) {
                try {
                    try {
                        this.metrics.measure(SERVICE_NAME, str, throwableRunnable);
                        postProcess();
                    } catch (IllegalArgumentException | IllegalStateException e) {
                        streamObserver.onError(Status.INVALID_ARGUMENT.withDescription(e.getMessage()).asRuntimeException());
                        postProcess();
                    }
                } catch (NoMutationException e2) {
                    streamObserver.onError(Status.FAILED_PRECONDITION.withDescription(e2.getMessage()).asRuntimeException());
                    postProcess();
                } catch (Throwable th) {
                    logger.error("An internal error happened during the execution", th);
                    streamObserver.onError(Status.INTERNAL.withDescription(th.getMessage()).asRuntimeException());
                    if (th instanceof Error) {
                        throw ((Error) th);
                    }
                    postProcess();
                }
            }
        } catch (Throwable th2) {
            postProcess();
            throw th2;
        }
    }

    private boolean preProcess(StreamObserver<?> streamObserver) {
        if (this.gateKeeper.letIn()) {
            return true;
        }
        respondUnavailableError(streamObserver);
        return false;
    }

    private void respondUnavailableError(StreamObserver<?> streamObserver) {
        streamObserver.onError(Status.UNAVAILABLE.withDescription("the server is paused").asRuntimeException());
    }

    private void postProcess() {
        this.gateKeeper.letOut();
    }
}
