package org.apache.hadoop.hdfs.server.datanode;

import com.google.protobuf.ByteString;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.nio.channels.ClosedChannelException;
import org.apache.commons.configuration.tree.DefaultExpressionEngine;
import org.apache.commons.logging.Log;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
import org.apache.hadoop.hdfs.protocol.datatransfer.Receiver;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.SocketInputWrapper;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.Token;
import org.apache.zookeeper.server.quorum.QuorumStats;

/* loaded from: input_file:lib/hadoop-hdfs-2.0.1-alpha.jar:org/apache/hadoop/hdfs/server/datanode/DataXceiver.class */
class DataXceiver extends Receiver implements Runnable {
    public static final Log LOG;
    static final Log ClientTraceLog;
    private final Socket s;
    private final boolean isLocal;
    private final String remoteAddress;
    private final String localAddress;
    private final DataNode datanode;
    private final DNConf dnConf;
    private final DataXceiverServer dataXceiverServer;
    private long opStartTime;
    private final SocketInputWrapper socketInputWrapper;
    private String previousOpClientName;
    static final /* synthetic */ boolean $assertionsDisabled;

    public static DataXceiver create(Socket socket, DataNode dataNode, DataXceiverServer dataXceiverServer) throws IOException {
        return new DataXceiver(socket, NetUtils.getInputStream(socket), dataNode, dataXceiverServer);
    }

    private DataXceiver(Socket socket, SocketInputWrapper socketInputWrapper, DataNode dataNode, DataXceiverServer dataXceiverServer) throws IOException {
        super(new DataInputStream(new BufferedInputStream(socketInputWrapper, HdfsConstants.SMALL_BUFFER_SIZE)));
        this.s = socket;
        this.socketInputWrapper = socketInputWrapper;
        this.isLocal = socket.getInetAddress().equals(socket.getLocalAddress());
        this.datanode = dataNode;
        this.dnConf = dataNode.getDnConf();
        this.dataXceiverServer = dataXceiverServer;
        this.remoteAddress = socket.getRemoteSocketAddress().toString();
        this.localAddress = socket.getLocalSocketAddress().toString();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Number of active connections is: " + dataNode.getXceiverCount());
        }
    }

    private void updateCurrentThreadName(String str) {
        StringBuilder sb = new StringBuilder();
        sb.append("DataXceiver for client ");
        if (this.previousOpClientName != null) {
            sb.append(this.previousOpClientName).append(" at ");
        }
        sb.append(this.remoteAddress);
        if (str != null) {
            sb.append(" [").append(str).append(DefaultExpressionEngine.DEFAULT_ATTRIBUTE_END);
        }
        Thread.currentThread().setName(sb.toString());
    }

    DataNode getDataNode() {
        return this.datanode;
    }

    @Override // java.lang.Runnable
    public void run() {
        int i = 0;
        Op op = null;
        this.dataXceiverServer.childSockets.add(this.s);
        do {
            try {
                try {
                    updateCurrentThreadName("Waiting for operation #" + (i + 1));
                    if (i != 0) {
                        try {
                            if (!$assertionsDisabled && this.dnConf.socketKeepaliveTimeout <= 0) {
                                throw new AssertionError();
                            }
                            this.socketInputWrapper.setTimeout(this.dnConf.socketKeepaliveTimeout);
                        } catch (InterruptedIOException e) {
                        } catch (IOException e2) {
                            if (i <= 0 || !((e2 instanceof EOFException) || (e2 instanceof ClosedChannelException))) {
                                throw e2;
                            }
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("Cached " + this.s.toString() + " closing after " + i + " ops");
                            }
                        }
                    } else {
                        this.socketInputWrapper.setTimeout(this.dnConf.socketTimeout);
                    }
                    op = readOp();
                    if (i != 0) {
                        this.s.setSoTimeout(this.dnConf.socketTimeout);
                    }
                    this.opStartTime = Util.now();
                    processOp(op);
                    i++;
                    if (this.s.isClosed()) {
                        break;
                    }
                } catch (Throwable th) {
                    LOG.error(this.datanode.getDisplayName() + ":DataXceiver error processing " + (op == null ? QuorumStats.Provider.UNKNOWN_STATE : op.name()) + " operation  src: " + this.remoteAddress + " dest: " + this.localAddress, th);
                    if (LOG.isDebugEnabled()) {
                        LOG.debug(this.datanode.getDisplayName() + ":Number of active connections is: " + this.datanode.getXceiverCount());
                    }
                    updateCurrentThreadName("Cleaning up");
                    IOUtils.closeStream(this.in);
                    IOUtils.closeSocket(this.s);
                    this.dataXceiverServer.childSockets.remove(this.s);
                    return;
                }
            } catch (Throwable th2) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug(this.datanode.getDisplayName() + ":Number of active connections is: " + this.datanode.getXceiverCount());
                }
                updateCurrentThreadName("Cleaning up");
                IOUtils.closeStream(this.in);
                IOUtils.closeSocket(this.s);
                this.dataXceiverServer.childSockets.remove(this.s);
                throw th2;
            }
        } while (this.dnConf.socketKeepaliveTimeout > 0);
        if (LOG.isDebugEnabled()) {
            LOG.debug(this.datanode.getDisplayName() + ":Number of active connections is: " + this.datanode.getXceiverCount());
        }
        updateCurrentThreadName("Cleaning up");
        IOUtils.closeStream(this.in);
        IOUtils.closeSocket(this.s);
        this.dataXceiverServer.childSockets.remove(this.s);
    }

    @Override // org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol
    public void readBlock(ExtendedBlock extendedBlock, Token<BlockTokenIdentifier> token, String str, long j, long j2) throws IOException {
        this.previousOpClientName = str;
        OutputStream outputStream = NetUtils.getOutputStream(this.s, this.dnConf.socketWriteTimeout);
        DataOutputStream dataOutputStream = new DataOutputStream(new BufferedOutputStream(outputStream, HdfsConstants.SMALL_BUFFER_SIZE));
        checkAccess(dataOutputStream, true, extendedBlock, token, Op.READ_BLOCK, BlockTokenSecretManager.AccessMode.READ);
        DatanodeRegistration dNRegistrationForBP = this.datanode.getDNRegistrationForBP(extendedBlock.getBlockPoolId());
        String format = (str.length() <= 0 || !ClientTraceLog.isInfoEnabled()) ? dNRegistrationForBP + " Served block " + extendedBlock + " to " + this.remoteAddress : String.format(DataNode.DN_CLIENTTRACE_FORMAT, this.localAddress, this.remoteAddress, "%d", "HDFS_READ", str, "%d", dNRegistrationForBP.getStorageID(), extendedBlock, "%d");
        updateCurrentThreadName("Sending block " + extendedBlock);
        try {
            try {
                try {
                    try {
                        BlockSender blockSender = new BlockSender(extendedBlock, j, j2, true, false, this.datanode, format);
                        writeSuccessWithChecksumInfo(blockSender, getStreamWithTimeout(this.s, this.dnConf.socketWriteTimeout));
                        long sendBlock = blockSender.sendBlock(dataOutputStream, outputStream, null);
                        if (blockSender.didSendEntireByteRange()) {
                            try {
                                if (!DataTransferProtos.ClientReadStatusProto.parseFrom(HdfsProtoUtil.vintPrefixed(this.in)).hasStatus()) {
                                    LOG.warn("Client " + this.s.getInetAddress() + " did not send a valid status code after reading. Will close connection.");
                                    IOUtils.closeStream(dataOutputStream);
                                }
                            } catch (IOException e) {
                                LOG.debug("Error reading client status response. Will close connection.", e);
                                IOUtils.closeStream(dataOutputStream);
                            }
                        } else {
                            IOUtils.closeStream(dataOutputStream);
                        }
                        this.datanode.metrics.incrBytesRead((int) sendBlock);
                        this.datanode.metrics.incrBlocksRead();
                        IOUtils.closeStream(blockSender);
                    } catch (Throwable th) {
                        IOUtils.closeStream(null);
                        throw th;
                    }
                } catch (IOException e2) {
                    String str2 = "opReadBlock " + extendedBlock + " received exception " + e2;
                    LOG.info(str2);
                    sendResponse(this.s, DataTransferProtos.Status.ERROR, str2, this.dnConf.socketWriteTimeout);
                    throw e2;
                }
            } catch (IOException e3) {
                LOG.warn(dNRegistrationForBP + ":Got exception while serving " + extendedBlock + " to " + this.remoteAddress, e3);
                throw e3;
            }
        } catch (SocketException e4) {
            if (LOG.isTraceEnabled()) {
                LOG.trace(dNRegistrationForBP + ":Ignoring exception while serving " + extendedBlock + " to " + this.remoteAddress, e4);
            }
            this.datanode.metrics.incrBlocksRead();
            IOUtils.closeStream(dataOutputStream);
            IOUtils.closeStream(null);
        }
        this.datanode.metrics.addReadBlockOp(elapsed());
        this.datanode.metrics.incrReadsFromClient(this.isLocal);
    }

    /* JADX WARN: Removed duplicated region for block: B:29:0x0245 A[Catch: IOException -> 0x0520, all -> 0x054a, TryCatch #2 {IOException -> 0x0520, blocks: (B:86:0x01f0, B:88:0x022e, B:27:0x023f, B:29:0x0245, B:31:0x0261, B:33:0x02ed, B:35:0x0310, B:37:0x0318, B:43:0x034e, B:44:0x0369, B:46:0x0386, B:47:0x03be, B:48:0x03bf, B:53:0x03fc, B:55:0x0407, B:57:0x0436, B:58:0x040f, B:64:0x0460, B:66:0x0477, B:68:0x0482, B:69:0x048c, B:73:0x049a, B:75:0x04a2, B:78:0x04b3, B:84:0x04bb, B:26:0x01f8), top: B:85:0x01f0, outer: #1 }] */
    /* JADX WARN: Removed duplicated region for block: B:61:0x0455  */
    @Override // org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void writeBlock(org.apache.hadoop.hdfs.protocol.ExtendedBlock r19, org.apache.hadoop.security.token.Token<org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier> r20, java.lang.String r21, org.apache.hadoop.hdfs.protocol.DatanodeInfo[] r22, org.apache.hadoop.hdfs.protocol.DatanodeInfo r23, org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage r24, int r25, long r26, long r28, long r30, org.apache.hadoop.util.DataChecksum r32) throws java.io.IOException {
        /*
            Method dump skipped, instructions count: 1413
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(org.apache.hadoop.hdfs.protocol.ExtendedBlock, org.apache.hadoop.security.token.Token, java.lang.String, org.apache.hadoop.hdfs.protocol.DatanodeInfo[], org.apache.hadoop.hdfs.protocol.DatanodeInfo, org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage, int, long, long, long, org.apache.hadoop.util.DataChecksum):void");
    }

    @Override // org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol
    public void transferBlock(ExtendedBlock extendedBlock, Token<BlockTokenIdentifier> token, String str, DatanodeInfo[] datanodeInfoArr) throws IOException {
        checkAccess(null, true, extendedBlock, token, Op.TRANSFER_BLOCK, BlockTokenSecretManager.AccessMode.COPY);
        this.previousOpClientName = str;
        updateCurrentThreadName(Op.TRANSFER_BLOCK + " " + extendedBlock);
        DataOutputStream dataOutputStream = new DataOutputStream(NetUtils.getOutputStream(this.s, this.dnConf.socketWriteTimeout));
        try {
            this.datanode.transferReplicaForPipelineRecovery(extendedBlock, datanodeInfoArr, str);
            writeResponse(DataTransferProtos.Status.SUCCESS, null, dataOutputStream);
            IOUtils.closeStream(dataOutputStream);
        } catch (Throwable th) {
            IOUtils.closeStream(dataOutputStream);
            throw th;
        }
    }

    @Override // org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol
    public void blockChecksum(ExtendedBlock extendedBlock, Token<BlockTokenIdentifier> token) throws IOException {
        DataOutputStream dataOutputStream = new DataOutputStream(NetUtils.getOutputStream(this.s, this.dnConf.socketWriteTimeout));
        checkAccess(dataOutputStream, true, extendedBlock, token, Op.BLOCK_CHECKSUM, BlockTokenSecretManager.AccessMode.READ);
        updateCurrentThreadName("Reading metadata for block " + extendedBlock);
        LengthInputStream metaDataInputStream = this.datanode.data.getMetaDataInputStream(extendedBlock);
        DataInputStream dataInputStream = new DataInputStream(new BufferedInputStream(metaDataInputStream, HdfsConstants.IO_FILE_BUFFER_SIZE));
        updateCurrentThreadName("Getting checksum for block " + extendedBlock);
        try {
            int bytesPerChecksum = BlockMetadataHeader.readHeader(dataInputStream).getChecksum().getBytesPerChecksum();
            long length = (metaDataInputStream.getLength() - BlockMetadataHeader.getHeaderSize()) / r0.getChecksumSize();
            MD5Hash digest = MD5Hash.digest(dataInputStream);
            if (LOG.isDebugEnabled()) {
                LOG.debug("block=" + extendedBlock + ", bytesPerCRC=" + bytesPerChecksum + ", crcPerBlock=" + length + ", md5=" + digest);
            }
            DataTransferProtos.BlockOpResponseProto.newBuilder().setStatus(DataTransferProtos.Status.SUCCESS).setChecksumResponse(DataTransferProtos.OpBlockChecksumResponseProto.newBuilder().setBytesPerCrc(bytesPerChecksum).setCrcPerBlock(length).setMd5(ByteString.copyFrom(digest.getDigest()))).build().writeDelimitedTo(dataOutputStream);
            dataOutputStream.flush();
            IOUtils.closeStream(dataOutputStream);
            IOUtils.closeStream(dataInputStream);
            IOUtils.closeStream(metaDataInputStream);
            this.datanode.metrics.addBlockChecksumOp(elapsed());
        } catch (Throwable th) {
            IOUtils.closeStream(dataOutputStream);
            IOUtils.closeStream(dataInputStream);
            IOUtils.closeStream(metaDataInputStream);
            throw th;
        }
    }

    @Override // org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol
    public void copyBlock(ExtendedBlock extendedBlock, Token<BlockTokenIdentifier> token) throws IOException {
        updateCurrentThreadName("Copying block " + extendedBlock);
        if (this.datanode.isBlockTokenEnabled) {
            try {
                this.datanode.blockPoolTokenSecretManager.checkAccess(token, (String) null, extendedBlock, BlockTokenSecretManager.AccessMode.COPY);
            } catch (SecretManager.InvalidToken e) {
                LOG.warn("Invalid access token in request from " + this.remoteAddress + " for OP_COPY_BLOCK for block " + extendedBlock + " : " + e.getLocalizedMessage());
                sendResponse(this.s, DataTransferProtos.Status.ERROR_ACCESS_TOKEN, "Invalid access token", this.dnConf.socketWriteTimeout);
                return;
            }
        }
        if (!this.dataXceiverServer.balanceThrottler.acquire()) {
            String str = "Not able to copy block " + extendedBlock.getBlockId() + " to " + this.s.getRemoteSocketAddress() + " because threads quota is exceeded.";
            LOG.info(str);
            sendResponse(this.s, DataTransferProtos.Status.ERROR, str, this.dnConf.socketWriteTimeout);
            return;
        }
        BlockSender blockSender = null;
        DataOutputStream dataOutputStream = null;
        boolean z = true;
        try {
            try {
                blockSender = new BlockSender(extendedBlock, 0L, -1L, false, false, this.datanode, null);
                OutputStream outputStream = NetUtils.getOutputStream(this.s, this.dnConf.socketWriteTimeout);
                dataOutputStream = new DataOutputStream(new BufferedOutputStream(outputStream, HdfsConstants.SMALL_BUFFER_SIZE));
                writeSuccessWithChecksumInfo(blockSender, dataOutputStream);
                this.datanode.metrics.incrBytesRead((int) blockSender.sendBlock(dataOutputStream, outputStream, this.dataXceiverServer.balanceThrottler));
                this.datanode.metrics.incrBlocksRead();
                LOG.info("Copied block " + extendedBlock + " to " + this.s.getRemoteSocketAddress());
                this.dataXceiverServer.balanceThrottler.release();
                if (1 != 0) {
                    try {
                        dataOutputStream.writeChar(100);
                    } catch (IOException e2) {
                    }
                }
                IOUtils.closeStream(dataOutputStream);
                IOUtils.closeStream(blockSender);
                this.datanode.metrics.addCopyBlockOp(elapsed());
            } catch (IOException e3) {
                z = false;
                LOG.info("opCopyBlock " + extendedBlock + " received exception " + e3);
                throw e3;
            }
        } catch (Throwable th) {
            this.dataXceiverServer.balanceThrottler.release();
            if (z) {
                try {
                    dataOutputStream.writeChar(100);
                } catch (IOException e4) {
                }
            }
            IOUtils.closeStream(dataOutputStream);
            IOUtils.closeStream(blockSender);
            throw th;
        }
    }

    @Override // org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol
    public void replaceBlock(ExtendedBlock extendedBlock, Token<BlockTokenIdentifier> token, String str, DatanodeInfo datanodeInfo) throws IOException {
        updateCurrentThreadName("Replacing block " + extendedBlock + " from " + str);
        extendedBlock.setNumBytes(this.dataXceiverServer.estimateBlockSize);
        if (this.datanode.isBlockTokenEnabled) {
            try {
                this.datanode.blockPoolTokenSecretManager.checkAccess(token, (String) null, extendedBlock, BlockTokenSecretManager.AccessMode.REPLACE);
            } catch (SecretManager.InvalidToken e) {
                LOG.warn("Invalid access token in request from " + this.remoteAddress + " for OP_REPLACE_BLOCK for block " + extendedBlock + " : " + e.getLocalizedMessage());
                sendResponse(this.s, DataTransferProtos.Status.ERROR_ACCESS_TOKEN, "Invalid access token", this.dnConf.socketWriteTimeout);
                return;
            }
        }
        if (!this.dataXceiverServer.balanceThrottler.acquire()) {
            String str2 = "Not able to receive block " + extendedBlock.getBlockId() + " from " + this.s.getRemoteSocketAddress() + " because threads quota is exceeded.";
            LOG.warn(str2);
            sendResponse(this.s, DataTransferProtos.Status.ERROR, str2, this.dnConf.socketWriteTimeout);
            return;
        }
        DataTransferProtos.Status status = DataTransferProtos.Status.SUCCESS;
        DataInputStream dataInputStream = null;
        try {
            try {
                InetSocketAddress createSocketAddr = NetUtils.createSocketAddr(datanodeInfo.getXferAddr());
                Socket newSocket = this.datanode.newSocket();
                NetUtils.connect(newSocket, createSocketAddr, this.dnConf.socketTimeout);
                newSocket.setSoTimeout(this.dnConf.socketTimeout);
                DataOutputStream dataOutputStream = new DataOutputStream(new BufferedOutputStream(NetUtils.getOutputStream(newSocket, this.dnConf.socketWriteTimeout), HdfsConstants.SMALL_BUFFER_SIZE));
                new Sender(dataOutputStream).copyBlock(extendedBlock, token);
                DataInputStream dataInputStream2 = new DataInputStream(new BufferedInputStream(NetUtils.getInputStream(newSocket), HdfsConstants.IO_FILE_BUFFER_SIZE));
                DataTransferProtos.BlockOpResponseProto parseFrom = DataTransferProtos.BlockOpResponseProto.parseFrom(HdfsProtoUtil.vintPrefixed(dataInputStream2));
                if (parseFrom.getStatus() != DataTransferProtos.Status.SUCCESS) {
                    if (parseFrom.getStatus() != DataTransferProtos.Status.ERROR_ACCESS_TOKEN) {
                        throw new IOException("Copy block " + extendedBlock + " from " + newSocket.getRemoteSocketAddress() + " failed");
                    }
                    throw new IOException("Copy block " + extendedBlock + " from " + newSocket.getRemoteSocketAddress() + " failed due to access token error");
                }
                BlockReceiver blockReceiver = new BlockReceiver(extendedBlock, dataInputStream2, newSocket.getRemoteSocketAddress().toString(), newSocket.getLocalSocketAddress().toString(), null, 0L, 0L, 0L, "", null, this.datanode, DataTransferProtoUtil.fromProto(parseFrom.getReadOpChecksumInfo().getChecksum()));
                blockReceiver.receiveBlock(null, null, null, null, this.dataXceiverServer.balanceThrottler, null);
                this.datanode.notifyNamenodeReceivedBlock(extendedBlock, str);
                LOG.info("Moved block " + extendedBlock + " from " + this.s.getRemoteSocketAddress());
                if (status == DataTransferProtos.Status.SUCCESS) {
                    try {
                        dataInputStream2.readChar();
                    } catch (IOException e2) {
                    }
                }
                this.dataXceiverServer.balanceThrottler.release();
                try {
                    sendResponse(this.s, status, null, this.dnConf.socketWriteTimeout);
                } catch (IOException e3) {
                    LOG.warn("Error writing reply back to " + this.s.getRemoteSocketAddress());
                }
                IOUtils.closeStream(dataOutputStream);
                IOUtils.closeStream(blockReceiver);
                IOUtils.closeStream(dataInputStream2);
                this.datanode.metrics.addReplaceBlockOp(elapsed());
            } catch (IOException e4) {
                DataTransferProtos.Status status2 = DataTransferProtos.Status.ERROR;
                LOG.info("opReplaceBlock " + extendedBlock + " received exception " + e4);
                throw e4;
            }
        } catch (Throwable th) {
            if (status == DataTransferProtos.Status.SUCCESS) {
                try {
                    dataInputStream.readChar();
                } catch (IOException e5) {
                }
            }
            this.dataXceiverServer.balanceThrottler.release();
            try {
                sendResponse(this.s, status, null, this.dnConf.socketWriteTimeout);
            } catch (IOException e6) {
                LOG.warn("Error writing reply back to " + this.s.getRemoteSocketAddress());
            }
            IOUtils.closeStream(null);
            IOUtils.closeStream(null);
            IOUtils.closeStream(null);
            throw th;
        }
    }

    private long elapsed() {
        return Util.now() - this.opStartTime;
    }

    private static void sendResponse(Socket socket, DataTransferProtos.Status status, String str, long j) throws IOException {
        writeResponse(status, str, getStreamWithTimeout(socket, j));
    }

    private static DataOutputStream getStreamWithTimeout(Socket socket, long j) throws IOException {
        return new DataOutputStream(NetUtils.getOutputStream(socket, j));
    }

    private static void writeResponse(DataTransferProtos.Status status, String str, OutputStream outputStream) throws IOException {
        DataTransferProtos.BlockOpResponseProto.Builder status2 = DataTransferProtos.BlockOpResponseProto.newBuilder().setStatus(status);
        if (str != null) {
            status2.setMessage(str);
        }
        status2.build().writeDelimitedTo(outputStream);
        outputStream.flush();
    }

    private void writeSuccessWithChecksumInfo(BlockSender blockSender, DataOutputStream dataOutputStream) throws IOException {
        DataTransferProtos.BlockOpResponseProto.newBuilder().setStatus(DataTransferProtos.Status.SUCCESS).setReadOpChecksumInfo(DataTransferProtos.ReadOpChecksumInfoProto.newBuilder().setChecksum(DataTransferProtoUtil.toProto(blockSender.getChecksum())).setChunkOffset(blockSender.getOffset()).build()).build().writeDelimitedTo(dataOutputStream);
        dataOutputStream.flush();
    }

    private void checkAccess(DataOutputStream dataOutputStream, boolean z, ExtendedBlock extendedBlock, Token<BlockTokenIdentifier> token, Op op, BlockTokenSecretManager.AccessMode accessMode) throws IOException {
        if (this.datanode.isBlockTokenEnabled) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Checking block access token for block '" + extendedBlock.getBlockId() + "' with mode '" + accessMode + "'");
            }
            try {
                this.datanode.blockPoolTokenSecretManager.checkAccess(token, (String) null, extendedBlock, accessMode);
            } catch (SecretManager.InvalidToken e) {
                if (z) {
                    if (dataOutputStream == null) {
                        try {
                            dataOutputStream = new DataOutputStream(NetUtils.getOutputStream(this.s, this.dnConf.socketWriteTimeout));
                        } catch (Throwable th) {
                            IOUtils.closeStream(dataOutputStream);
                            throw th;
                        }
                    }
                    DataTransferProtos.BlockOpResponseProto.Builder status = DataTransferProtos.BlockOpResponseProto.newBuilder().setStatus(DataTransferProtos.Status.ERROR_ACCESS_TOKEN);
                    if (accessMode == BlockTokenSecretManager.AccessMode.WRITE) {
                        status.setFirstBadLink(this.datanode.getDNRegistrationForBP(extendedBlock.getBlockPoolId()).getXferAddr());
                    }
                    status.build().writeDelimitedTo(dataOutputStream);
                    dataOutputStream.flush();
                }
                LOG.warn("Block token verification failed: op=" + op + ", remoteAddress=" + this.remoteAddress + ", message=" + e.getLocalizedMessage());
                throw e;
            }
        }
    }

    static {
        $assertionsDisabled = !DataXceiver.class.desiredAssertionStatus();
        LOG = DataNode.LOG;
        ClientTraceLog = DataNode.ClientTraceLog;
    }
}
