package org.apache.paimon.service.server;

import java.util.concurrent.CompletableFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.service.exceptions.UnknownPartitionBucketException;
import org.apache.paimon.service.messages.KvRequest;
import org.apache.paimon.service.messages.KvResponse;
import org.apache.paimon.service.network.AbstractServerHandler;
import org.apache.paimon.service.network.messages.MessageSerializer;
import org.apache.paimon.service.network.stats.ServiceRequestStats;
import org.apache.paimon.table.query.TableQuery;
import org.apache.paimon.table.sink.ChannelComputer;
import org.apache.paimon.utils.ExceptionUtils;
import org.apache.paimon.utils.Preconditions;

@ChannelHandler.Sharable
/* loaded from: input_file:org/apache/paimon/service/server/KvServerHandler.class */
public class KvServerHandler extends AbstractServerHandler<KvRequest, KvResponse> {
    private final int serverId;
    private final int numServers;
    private final TableQuery lookup;
    private final InternalRowSerializer valueSerializer;

    public KvServerHandler(KvQueryServer kvQueryServer, int i, int i2, TableQuery tableQuery, MessageSerializer<KvRequest, KvResponse> messageSerializer, ServiceRequestStats serviceRequestStats) {
        super(kvQueryServer, messageSerializer, serviceRequestStats);
        this.serverId = i;
        this.numServers = i2;
        this.lookup = (TableQuery) Preconditions.checkNotNull(tableQuery);
        this.valueSerializer = tableQuery.createValueSerializer();
    }

    @Override // org.apache.paimon.service.network.AbstractServerHandler
    public CompletableFuture<KvResponse> handleRequest(long j, KvRequest kvRequest) {
        CompletableFuture<KvResponse> completableFuture = new CompletableFuture<>();
        if (ChannelComputer.select(kvRequest.partition(), kvRequest.bucket(), this.numServers) != this.serverId) {
            completableFuture.completeExceptionally(new UnknownPartitionBucketException(getServerName()));
            return completableFuture;
        }
        try {
            BinaryRow[] keys = kvRequest.keys();
            BinaryRow[] binaryRowArr = new BinaryRow[keys.length];
            for (int i = 0; i < binaryRowArr.length; i++) {
                InternalRow lookup = this.lookup.lookup(kvRequest.partition(), kvRequest.bucket(), keys[i]);
                if (lookup != null) {
                    binaryRowArr[i] = this.valueSerializer.toBinaryRow(lookup).copy();
                }
            }
            completableFuture.complete(new KvResponse(binaryRowArr));
            return completableFuture;
        } catch (Throwable th) {
            completableFuture.completeExceptionally(new RuntimeException("Error while processing request with ID " + j + ". Caused by: " + ExceptionUtils.stringifyException(th)));
            return completableFuture;
        }
    }

    @Override // org.apache.paimon.service.network.AbstractServerHandler
    public CompletableFuture<Void> shutdown() {
        return CompletableFuture.completedFuture(null);
    }
}
