package org.apache.hadoop.ozone.container.stream;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.stream.ChunkedFile;
import io.netty.util.ByteProcessor;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/ozone/container/stream/DirstreamServerHandler.class */
public class DirstreamServerHandler extends ChannelInboundHandlerAdapter {
    private static final Logger LOG = LoggerFactory.getLogger(DirstreamServerHandler.class);
    public static final String END_MARKER = "0 END";
    private StreamingSource source;
    private final StringBuilder id = new StringBuilder();
    private boolean headerProcessed = false;

    public DirstreamServerHandler(StreamingSource streamingSource) {
        this.source = streamingSource;
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if (!this.headerProcessed) {
            ByteBuf byteBuf = (ByteBuf) obj;
            int forEachByte = byteBuf.forEachByte(ByteProcessor.FIND_LF) - byteBuf.readerIndex();
            if (forEachByte > 0) {
                this.headerProcessed = true;
                this.id.append(byteBuf.toString(Charset.defaultCharset()));
            } else {
                this.id.append(byteBuf.toString(0, forEachByte, Charset.defaultCharset()));
            }
            byteBuf.release();
        }
        if (this.headerProcessed) {
            writeOneElement(channelHandlerContext, new ArrayList(this.source.getFilesToStream(this.id.toString().trim()).entrySet()), 0);
        }
    }

    public void writeOneElement(ChannelHandlerContext channelHandlerContext, List<Map.Entry<String, Path>> list, int i) throws IOException {
        Map.Entry<String, Path> entry = list.get(i);
        Path value = entry.getValue();
        channelHandlerContext.writeAndFlush(Unpooled.wrappedBuffer((Files.size(value) + " " + entry.getKey() + "\n").getBytes(StandardCharsets.UTF_8))).addListener(future -> {
            ChannelFuture writeAndFlush = channelHandlerContext.writeAndFlush(new ChunkedFile(value.toFile()));
            if (i == list.size() - 1) {
                writeAndFlush.addListener(future -> {
                    if (!future.isSuccess()) {
                        LOG.error("Error on streaming file", future.cause());
                    }
                    channelHandlerContext.writeAndFlush(Unpooled.wrappedBuffer(END_MARKER.getBytes(StandardCharsets.UTF_8))).addListener(future -> {
                        channelHandlerContext.channel().close();
                    });
                });
            } else {
                writeAndFlush.addListener(future2 -> {
                    writeOneElement(channelHandlerContext, list, i + 1);
                });
            }
        });
    }

    public void channelReadComplete(ChannelHandlerContext channelHandlerContext) {
        channelHandlerContext.flush();
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
        th.printStackTrace();
        if (channelHandlerContext.channel().isActive()) {
            channelHandlerContext.writeAndFlush("ERR: " + th.getClass().getSimpleName() + ": " + th.getMessage() + '\n').addListener(ChannelFutureListener.CLOSE);
        }
        channelHandlerContext.close();
    }
}
