package org.apache.hadoop.hbase.io.asyncfs;

import ch.cern.hbase.thirdparty.com.google.common.base.Preconditions;
import ch.cern.hbase.thirdparty.com.google.common.base.Throwables;
import ch.cern.hbase.thirdparty.io.netty.buffer.ByteBuf;
import ch.cern.hbase.thirdparty.io.netty.buffer.ByteBufAllocator;
import ch.cern.hbase.thirdparty.io.netty.channel.Channel;
import ch.cern.hbase.thirdparty.io.netty.channel.ChannelHandler;
import ch.cern.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
import ch.cern.hbase.thirdparty.io.netty.channel.ChannelId;
import ch.cern.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
import ch.cern.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufDecoder;
import ch.cern.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import ch.cern.hbase.thirdparty.io.netty.handler.timeout.IdleState;
import ch.cern.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
import ch.cern.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.Encryptor;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos;
import org.apache.hadoop.util.DataChecksum;
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
/* loaded from: input_file:org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput.class */
public class FanOutOneBlockAsyncDFSOutput implements AsyncFSOutput {
    private static final int MAX_DATA_LEN = 12582912;
    private final Configuration conf;
    private final FSUtils fsUtils;
    private final DistributedFileSystem dfs;
    private final DFSClient client;
    private final ClientProtocol namenode;
    private final String clientName;
    private final String src;
    private final long fileId;
    private final ExtendedBlock block;
    private final DatanodeInfo[] locations;
    private final Encryptor encryptor;
    private final List<Channel> datanodeList;
    private final DataChecksum summer;
    private final int maxDataLen;
    private final ByteBufAllocator alloc;
    private ByteBuf buf;
    private final ConcurrentLinkedDeque<Callback> waitingAckQueue = new ConcurrentLinkedDeque<>();
    private volatile long ackedBlockLength = 0;
    private long nextPacketOffsetInBlock = 0;
    private int trailingPartialChunkLength = 0;
    private long nextPacketSeqno = 0;
    private final SendBufSizePredictor sendBufSizePRedictor = new SendBufSizePredictor();
    private volatile State state = State.STREAMING;

    /* JADX INFO: Access modifiers changed from: private */
    @ChannelHandler.Sharable
    /* loaded from: input_file:org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput$AckHandler.class */
    public final class AckHandler extends SimpleChannelInboundHandler<DataTransferProtos.PipelineAckProto> {
        private final int timeoutMs;

        public AckHandler(int i) {
            this.timeoutMs = i;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void channelRead0(ChannelHandlerContext channelHandlerContext, DataTransferProtos.PipelineAckProto pipelineAckProto) throws Exception {
            DataTransferProtos.Status status = FanOutOneBlockAsyncDFSOutputHelper.getStatus(pipelineAckProto);
            if (status != DataTransferProtos.Status.SUCCESS) {
                FanOutOneBlockAsyncDFSOutput.this.failed(channelHandlerContext.channel(), () -> {
                    return new IOException("Bad response " + status + " for block " + FanOutOneBlockAsyncDFSOutput.this.block + " from datanode " + channelHandlerContext.channel().remoteAddress());
                });
            } else if (PipelineAck.isRestartOOBStatus(status)) {
                FanOutOneBlockAsyncDFSOutput.this.failed(channelHandlerContext.channel(), () -> {
                    return new IOException("Restart response " + status + " for block " + FanOutOneBlockAsyncDFSOutput.this.block + " from datanode " + channelHandlerContext.channel().remoteAddress());
                });
            } else {
                if (pipelineAckProto.getSeqno() == -1) {
                    return;
                }
                FanOutOneBlockAsyncDFSOutput.this.completed(channelHandlerContext.channel());
            }
        }

        public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
            if (FanOutOneBlockAsyncDFSOutput.this.state == State.CLOSED) {
                return;
            }
            FanOutOneBlockAsyncDFSOutput.this.failed(channelHandlerContext.channel(), () -> {
                return new IOException("Connection to " + channelHandlerContext.channel().remoteAddress() + " closed");
            });
        }

        public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
            FanOutOneBlockAsyncDFSOutput.this.failed(channelHandlerContext.channel(), () -> {
                return th;
            });
        }

        public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
            if (!(obj instanceof IdleStateEvent)) {
                super.userEventTriggered(channelHandlerContext, obj);
                return;
            }
            IdleStateEvent idleStateEvent = (IdleStateEvent) obj;
            if (idleStateEvent.state() == IdleState.READER_IDLE) {
                FanOutOneBlockAsyncDFSOutput.this.failed(channelHandlerContext.channel(), () -> {
                    return new IOException("Timeout(" + this.timeoutMs + "ms) waiting for response");
                });
                return;
            }
            if (idleStateEvent.state() == IdleState.WRITER_IDLE) {
                PacketHeader packetHeader = new PacketHeader(4, 0L, -1L, false, 0, false);
                int serializedSize = packetHeader.getSerializedSize();
                ByteBuf buffer = FanOutOneBlockAsyncDFSOutput.this.alloc.buffer(serializedSize);
                packetHeader.putInBuffer(buffer.nioBuffer(0, serializedSize));
                buffer.writerIndex(serializedSize);
                channelHandlerContext.channel().writeAndFlush(buffer);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput$Callback.class */
    public static final class Callback {
        private final CompletableFuture<Long> future;
        private final long ackedLength;
        private final Set<ChannelId> unfinishedReplicas;

        public Callback(CompletableFuture<Long> completableFuture, long j, Collection<Channel> collection) {
            this.future = completableFuture;
            this.ackedLength = j;
            if (collection.isEmpty()) {
                this.unfinishedReplicas = Collections.emptySet();
                return;
            }
            this.unfinishedReplicas = Collections.newSetFromMap(new ConcurrentHashMap(collection.size()));
            Stream<R> map = collection.stream().map(channel -> {
                return channel.id();
            });
            Set<ChannelId> set = this.unfinishedReplicas;
            set.getClass();
            map.forEachOrdered((v1) -> {
                r1.add(v1);
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutput$State.class */
    public enum State {
        STREAMING,
        CLOSING,
        BROKEN,
        CLOSED
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void completed(Channel channel) {
        Iterator<Callback> it = this.waitingAckQueue.iterator();
        while (it.hasNext()) {
            Callback next = it.next();
            if (next.unfinishedReplicas.remove(channel.id())) {
                if (next.unfinishedReplicas.isEmpty()) {
                    it.remove();
                    this.ackedBlockLength = next.ackedLength;
                    if (next.future.complete(Long.valueOf(next.ackedLength))) {
                        while (it.hasNext()) {
                            Callback next2 = it.next();
                            if (next2.ackedLength != next.ackedLength) {
                                return;
                            }
                            it.remove();
                            next2.future.complete(Long.valueOf(next.ackedLength));
                        }
                        return;
                    }
                    return;
                }
                return;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized void failed(Channel channel, Supplier<Throwable> supplier) {
        Callback peekFirst;
        if (this.state == State.BROKEN || this.state == State.CLOSED) {
            return;
        }
        if (this.state != State.CLOSING || ((peekFirst = this.waitingAckQueue.peekFirst()) != null && peekFirst.unfinishedReplicas.contains(channel.id()))) {
            this.state = State.BROKEN;
            Throwable th = supplier.get();
            Iterator<Callback> it = this.waitingAckQueue.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                Callback next = it.next();
                if (next.unfinishedReplicas.contains(channel.id())) {
                    while (true) {
                        next.future.completeExceptionally(th);
                        if (!it.hasNext()) {
                            break;
                        } else {
                            next = it.next();
                        }
                    }
                }
            }
            this.datanodeList.forEach(channel2 -> {
                channel2.close();
            });
        }
    }

    private void setupReceiver(int i) {
        ChannelHandler ackHandler = new AckHandler(i);
        for (Channel channel : this.datanodeList) {
            channel.pipeline().addLast(new ChannelHandler[]{new IdleStateHandler(i, i / 2, 0L, TimeUnit.MILLISECONDS), new ProtobufVarint32FrameDecoder(), new ProtobufDecoder(DataTransferProtos.PipelineAckProto.getDefaultInstance()), ackHandler});
            channel.config().setAutoRead(true);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public FanOutOneBlockAsyncDFSOutput(Configuration configuration, FSUtils fSUtils, DistributedFileSystem distributedFileSystem, DFSClient dFSClient, ClientProtocol clientProtocol, String str, String str2, long j, LocatedBlock locatedBlock, Encryptor encryptor, List<Channel> list, DataChecksum dataChecksum, ByteBufAllocator byteBufAllocator) {
        this.conf = configuration;
        this.fsUtils = fSUtils;
        this.dfs = distributedFileSystem;
        this.client = dFSClient;
        this.namenode = clientProtocol;
        this.fileId = j;
        this.clientName = str;
        this.src = str2;
        this.block = locatedBlock.getBlock();
        this.locations = locatedBlock.getLocations();
        this.encryptor = encryptor;
        this.datanodeList = list;
        this.summer = dataChecksum;
        this.maxDataLen = MAX_DATA_LEN - (MAX_DATA_LEN % dataChecksum.getBytesPerChecksum());
        this.alloc = byteBufAllocator;
        this.buf = byteBufAllocator.directBuffer(this.sendBufSizePRedictor.initialSize());
        setupReceiver(configuration.getInt("dfs.client.socket-timeout", 60000));
    }

    @Override // org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput
    public void writeInt(int i) {
        this.buf.ensureWritable(4);
        this.buf.writeInt(i);
    }

    @Override // org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput
    public void write(ByteBuffer byteBuffer) {
        this.buf.ensureWritable(byteBuffer.remaining());
        this.buf.writeBytes(byteBuffer);
    }

    @Override // org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput
    public void write(byte[] bArr) {
        write(bArr, 0, bArr.length);
    }

    @Override // org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput
    public void write(byte[] bArr, int i, int i2) {
        this.buf.ensureWritable(i2);
        this.buf.writeBytes(bArr, i, i2);
    }

    @Override // org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput
    public int buffered() {
        return this.buf.readableBytes();
    }

    @Override // org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput
    public DatanodeInfo[] getPipeline() {
        return this.locations;
    }

    private void flushBuffer(CompletableFuture<Long> completableFuture, ByteBuf byteBuf, long j, boolean z) {
        int readableBytes = byteBuf.readableBytes();
        int bytesPerChecksum = this.summer.getBytesPerChecksum();
        int checksumSize = ((readableBytes / bytesPerChecksum) + (readableBytes % bytesPerChecksum != 0 ? 1 : 0)) * this.summer.getChecksumSize();
        ByteBuf directBuffer = this.alloc.directBuffer(checksumSize);
        this.summer.calculateChunkedSums(byteBuf.nioBuffer(), directBuffer.nioBuffer(0, checksumSize));
        directBuffer.writerIndex(checksumSize);
        PacketHeader packetHeader = new PacketHeader(4 + checksumSize + readableBytes, j, this.nextPacketSeqno, false, readableBytes, z);
        int serializedSize = packetHeader.getSerializedSize();
        ByteBuf buffer = this.alloc.buffer(serializedSize);
        packetHeader.putInBuffer(buffer.nioBuffer(0, serializedSize));
        buffer.writerIndex(serializedSize);
        Callback callback = new Callback(completableFuture, j + readableBytes, this.datanodeList);
        this.waitingAckQueue.addLast(callback);
        if (this.state != State.STREAMING && this.waitingAckQueue.peekFirst() == callback) {
            completableFuture.completeExceptionally(new IOException("stream already broken"));
            this.waitingAckQueue.removeFirst();
            return;
        }
        this.datanodeList.forEach(channel -> {
            channel.write(buffer.retainedDuplicate());
            channel.write(directBuffer.retainedDuplicate());
            channel.writeAndFlush(byteBuf.retainedDuplicate());
        });
        directBuffer.release();
        buffer.release();
        byteBuf.release();
        this.nextPacketSeqno++;
    }

    private void flush0(CompletableFuture<Long> completableFuture, boolean z) {
        if (this.state != State.STREAMING) {
            completableFuture.completeExceptionally(new IOException("stream already broken"));
            return;
        }
        int readableBytes = this.buf.readableBytes();
        if (readableBytes == this.trailingPartialChunkLength) {
            long j = this.nextPacketOffsetInBlock + readableBytes;
            if (this.waitingAckQueue.peekLast() == null) {
                completableFuture.complete(Long.valueOf(j));
                return;
            }
            Callback callback = new Callback(completableFuture, j, Collections.emptyList());
            this.waitingAckQueue.addLast(callback);
            if (this.waitingAckQueue.peekFirst() == callback) {
                if (this.state != State.STREAMING) {
                    completableFuture.completeExceptionally(new IOException("stream already broken"));
                } else {
                    completableFuture.complete(Long.valueOf(j));
                }
                this.waitingAckQueue.removeFirst();
                return;
            }
            return;
        }
        if (this.encryptor != null) {
            ByteBuf directBuffer = this.alloc.directBuffer(readableBytes);
            this.buf.readBytes(directBuffer, this.trailingPartialChunkLength);
            int i = readableBytes - this.trailingPartialChunkLength;
            try {
                this.encryptor.encrypt(this.buf.nioBuffer(this.trailingPartialChunkLength, i), directBuffer.nioBuffer(this.trailingPartialChunkLength, i));
                directBuffer.writerIndex(readableBytes);
                this.buf.release();
                this.buf = directBuffer;
            } catch (IOException e) {
                directBuffer.release();
                completableFuture.completeExceptionally(e);
                return;
            }
        }
        if (readableBytes > this.maxDataLen) {
            long j2 = this.nextPacketOffsetInBlock;
            int i2 = readableBytes;
            while (i2 >= this.maxDataLen) {
                flushBuffer(new CompletableFuture<>(), this.buf.readRetainedSlice(this.maxDataLen), j2, z);
                i2 -= this.maxDataLen;
                j2 += this.maxDataLen;
            }
            flushBuffer(completableFuture, this.buf.readRetainedSlice(i2), j2, z);
        } else {
            flushBuffer(completableFuture, this.buf.retain(), this.nextPacketOffsetInBlock, z);
        }
        this.trailingPartialChunkLength = readableBytes % this.summer.getBytesPerChecksum();
        ByteBuf ensureWritable = this.alloc.directBuffer(this.sendBufSizePRedictor.guess(readableBytes)).ensureWritable(this.trailingPartialChunkLength);
        if (this.trailingPartialChunkLength != 0) {
            this.buf.readerIndex(readableBytes - this.trailingPartialChunkLength).readBytes(ensureWritable, this.trailingPartialChunkLength);
        }
        this.buf.release();
        this.buf = ensureWritable;
        this.nextPacketOffsetInBlock += readableBytes - this.trailingPartialChunkLength;
    }

    @Override // org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput
    public CompletableFuture<Long> flush(boolean z) {
        CompletableFuture<Long> completableFuture = new CompletableFuture<>();
        flush0(completableFuture, z);
        return completableFuture;
    }

    private void endBlock() throws IOException {
        Preconditions.checkState(this.waitingAckQueue.isEmpty(), "should call flush first before calling close");
        if (this.state != State.STREAMING) {
            throw new IOException("stream already broken");
        }
        this.state = State.CLOSING;
        long j = this.ackedBlockLength;
        PacketHeader packetHeader = new PacketHeader(4, j, this.nextPacketSeqno, true, 0, false);
        this.buf.release();
        this.buf = null;
        int serializedSize = packetHeader.getSerializedSize();
        ByteBuf directBuffer = this.alloc.directBuffer(serializedSize);
        packetHeader.putInBuffer(directBuffer.nioBuffer(0, serializedSize));
        directBuffer.writerIndex(serializedSize);
        CompletableFuture completableFuture = new CompletableFuture();
        this.waitingAckQueue.add(new Callback(completableFuture, j, this.datanodeList));
        this.datanodeList.forEach(channel -> {
            channel.writeAndFlush(directBuffer.retainedDuplicate());
        });
        directBuffer.release();
        try {
            completableFuture.get();
        } catch (InterruptedException e) {
            throw ((IOException) new InterruptedIOException().initCause(e));
        } catch (ExecutionException e2) {
            Throwable cause = e2.getCause();
            Throwables.propagateIfPossible(cause, IOException.class);
            throw new IOException(cause);
        }
    }

    @Override // org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput
    public void recoverAndClose(CancelableProgressable cancelableProgressable) throws IOException {
        if (this.buf != null) {
            this.buf.release();
            this.buf = null;
        }
        this.datanodeList.forEach(channel -> {
            channel.close();
        });
        this.datanodeList.forEach(channel2 -> {
            channel2.closeFuture().awaitUninterruptibly();
        });
        FanOutOneBlockAsyncDFSOutputHelper.endFileLease(this.client, this.fileId);
        this.fsUtils.recoverFileLease(this.dfs, new Path(this.src), this.conf, cancelableProgressable == null ? new FanOutOneBlockAsyncDFSOutputHelper.CancelOnClose(this.client) : cancelableProgressable);
    }

    @Override // org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        endBlock();
        this.state = State.CLOSED;
        this.datanodeList.forEach(channel -> {
            channel.close();
        });
        this.datanodeList.forEach(channel2 -> {
            channel2.closeFuture().awaitUninterruptibly();
        });
        this.block.setNumBytes(this.ackedBlockLength);
        FanOutOneBlockAsyncDFSOutputHelper.completeFile(this.client, this.namenode, this.src, this.clientName, this.block, this.fileId);
    }

    @Override // org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput
    public boolean isBroken() {
        return this.state == State.BROKEN;
    }
}
