美文网首页
2020-05-07

2020-05-07

作者: rockytsai | 来源:发表于2020-05-07 21:56 被阅读0次
//NioByteUnsafe
@Override
public final void read() {
    //获取config配置
    final ChannelConfig config = config();
    //判断是否因配置或者异常、错误等中断本次读操作
    if (shouldBreakReadReady(config)) {
        //这个函数会取消该通道对OP_READ事件的注册
        //取消之后即不再响应通道read准备好事件
        clearReadPending();
        return;
    }
    final ChannelPipeline pipeline = pipeline();
    //获取ByteBuf分配器
    final ByteBufAllocator allocator = config.getAllocator();
    //获取RecvByteBuf分配器,这个笔者有文章介绍过,RecvByteBuf分配器
    //主要告诉ByteBuf分配器分配ByteBuf的大小,默认配置的是
    //AdaptiveRecvByteBufAllocator
    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
    //每次read时,重置RecvByteBuf统计的一些信息
    allocHandle.reset(config);

    ByteBuf byteBuf = null;
    boolean close = false;
    try {
        do {
            //分配本次读取操作的ByteBuf
            byteBuf = allocHandle.allocate(allocator);
            //进行实际读取操作,doReadBytes我们后面列出源码
            allocHandle.lastBytesRead(doReadBytes(byteBuf));
            //如果没有读取到数据,直接返回,结束本次读取
            if (allocHandle.lastBytesRead() <= 0) {
                // nothing was read. release the buffer.
                byteBuf.release();
                byteBuf = null;
                close = allocHandle.lastBytesRead() < 0;
                if (close) {
                    // There is nothing left to read as we received an EOF.
                    readPending = false;
                }
                break;
            }

            allocHandle.incMessagesRead(1);
            readPending = false;
            //触发Pipeline的channelRead方法,让
            //用户定义的handler处理读到的数据
            //比如对数据进行解码,然后交给业务层handler处理等
            pipeline.fireChannelRead(byteBuf);
            byteBuf = null;
            //判断是否需要继续读
        } while (allocHandle.continueReading());

        allocHandle.readComplete();
        //触发channelReadComplete方法
        pipeline.fireChannelReadComplete();

        if (close) {
            //上面如果最后读取的字节数小于0,则关闭读取操作
            //但是不关闭通道
            closeOnRead(pipeline);
        }
    } catch (Throwable t) {
        //异常处理
        handleReadException(pipeline, byteBuf, t, close, allocHandle);
    } finally {
        // Check if there is a readPending which was not processed yet.
        // This could be for two reasons:
        // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
        // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
        //
        // See https://github.com/netty/netty/issues/2254
        if (!readPending && !config.isAutoRead()) {
            removeReadOp();
        }
    }
}

private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
        RecvByteBufAllocator.Handle allocHandle) {
    if (byteBuf != null) {
        if (byteBuf.isReadable()) {
            readPending = false;
            pipeline.fireChannelRead(byteBuf);
        } else {
            byteBuf.release();
        }
    }
    allocHandle.readComplete();
    pipeline.fireChannelReadComplete();
    //看到了熟悉的方法fireExceptionCaught
    //这个是handler定义的,可供应用处理异常
    pipeline.fireExceptionCaught(cause);
    if (close || cause instanceof IOException) {
        closeOnRead(pipeline);
    }
}

相关文章

网友评论

      本文标题:2020-05-07

      本文链接:https://www.haomeiwen.com/subject/efttnhtx.html