netty源码解析之IO读写(三)

作者: binecy | 来源:发表于2019-03-23 17:58 被阅读3次

    第二次看netty源码,对netty的理解也更深入了点,修改了不少文章内容。
    后面有时间再分析一下netty的buffer,codec等内容。

    源码分析基于netty 4.1

    前面已经说过netty对accept事件的处理,现在来讲讲netty中的read/write过程。

    Pipeline

    DefaultChannelPipeline是一个netty处理io事件的默认通道,通道中的每个节点都是AbstractChannelHandlerContext,
    AbstractChannelHandlerContext.next指向下一个AbstractChannelHandlerContext,prev指向前一个AbstractChannelHandlerContext。
    Pipeline是标准的责任链。
    AbstractChannelHandlerContext.handler()方法返回一个ChannelHandler,ChannelInboundHandler/ChannelOutboundHandler都继承自这个接口,
    我们继承这两个接口的适配类ChannelInboundHandlerAdapter/ChannelInboundHandlerAdapter,编写具体的业务逻辑。
    DefaultChannelPipeline固定有两个节点head/tail,addLast会把节点添加到tail前。

    read

    回顾一下, NioEventLoop中对read事件的处理

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        int readyOps = k.readyOps();
        
        ...
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
            if (!ch.isOpen()) {
                return;
            }
        }
    }
    

    ch是NioSocketChannel对象, ch.unsafe()返回NioSocketChannel.NioSocketChannelUnsafe,
    unsafe.read() 会调用到NioSocketChannelUnsafe父类NioByteUnsafe.read():

    final ChannelPipeline pipeline = pipeline();
    final ByteBufAllocator allocator = config.getAllocator();
    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
    allocHandle.reset(config);
    
    ByteBuf byteBuf = null;
    boolean close = false;
    
        do {
            // 申请缓存区空间
            byteBuf = allocHandle.allocate(allocator);
            // 从socket读取数据到缓存区
            allocHandle.lastBytesRead(doReadBytes(byteBuf));
    
            allocHandle.incMessagesRead(1);
            
            // 触发ChannelRead事件
            pipeline.fireChannelRead(byteBuf);
            byteBuf = null;
        } while (allocHandle.continueReading());
    
        allocHandle.readComplete();
        // 触发ChannelReadComplete事件
        pipeline.fireChannelReadComplete();
    

    从socket读取数据到byteBuf中,再调用DefaultChannelPipeline.fireChannelRead。

    public final ChannelPipeline fireChannelRead(Object msg) {
        AbstractChannelHandlerContext.invokeChannelRead(head, msg);
        return this;
    }
    

    调用AbstractChannelHandlerContext静态方法invokeChannelRead,参数是head和msg

    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRead(m);
        } else {
            ...
        }
    }
    
    private void invokeChannelRead(Object msg) {
        if (invokeHandler()) {  // 检查handler的状态
            try {
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelRead(msg);
        }
    }    
    

    invokeHandler()会检查ChannelHandler是否已经调用了handlerAdded

    handler()返回一个ChannelHandler,这里再转化为ChannelInboundHandler,并调用它的channelRead。(HeadContext.handler返回this,HeadContext同时实现了ChannelOutboundHandler/ChannelInboundHandler)。

    看看HeadContext.channelRead

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ctx.fireChannelRead(msg);
    }
    

    ctx.fireChannelRead调用的是AbstractChannelHandlerContext.fireChannelRead:

    public ChannelHandlerContext fireChannelRead(final Object msg) {
        invokeChannelRead(findContextInbound(), msg);
        return this;
    }
    

    findContextInbound会找到下一个ChannelInboundHandler

    private AbstractChannelHandlerContext findContextInbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.next;
        } while (!ctx.inbound);
        return ctx;
    }
    

    ctx.fireChannelRead(msg);的作用就是找到下一个ChannelInboundHandler,并调用它的fireChannelRead方法, 这里会调用到我们实现的ChannelInboundHandler接口,并调用我们重写的fireChannelRead方法,进行逻辑处理。

    我们重写的fireChannelRead方法最后要调用ctx.fireChannelRead(msg),这样会调用到AbstractChannelHandlerContext.fireChannelRead, 它会找到下一个InboundHandler并调用fireChannelRead方法,这个数据才能在通道中继续流转(除非调用write相关方法)。

    write

    下面看看write过程
    我们可以通过ChannelHandlerContext .writeAndFlush写入结果给客户, 它会调用AbstractChannelHandlerContext.write:

        private void write(Object msg, boolean flush, ChannelPromise promise) {
            AbstractChannelHandlerContext next = findContextOutbound();
            final Object m = pipeline.touch(msg, next);
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                if (flush) {
                    next.invokeWriteAndFlush(m, promise);
                } else {
                    next.invokeWrite(m, promise);
                }
            } else {
                ...
            }
        }
    

    findContextOutbound会找到当前节点前一个OutboundHandler(write和read的方向相反,这里向前找OutboundHandler)

    private AbstractChannelHandlerContext findContextOutbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.prev;
        } while (!ctx.outbound);
        return ctx;
    }
    

    next.invokeWriteAndFlush还是调用到AbstractChannelHandlerContext.invokeWriteAndFlush

    private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
        if (invokeHandler()) {
            invokeWrite0(msg, promise);
            invokeFlush0();
        } else {
            writeAndFlush(msg, promise);
        }
    }
    

    invokeWrite0也比较简单, 就是调用handler的处理

        private void invokeWrite0(Object msg, ChannelPromise promise) {
            try {
                ((ChannelOutboundHandler) handler()).write(this, msg, promise);
            } catch (Throwable t) {
                notifyOutboundHandlerException(t, promise);
            }
        }
    

    这里会调用到我们实现的ChannelOutboundHandler,并调用我们重写的write方法,实现业务逻辑。

    最后会调用到HeadContext.write, 注意, HeadContext既是InboundHandler, 也是OutboundHandler

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        unsafe.write(msg, promise);
    }
    

    这里调用了AbstractUnsafe.write, 将数据write到socket中,具体过程这里不再描述。

    invokeFlush0();也是类似的流程, 这里不再复述。

    那么ChannelOutboundHandler的read事件, 是在哪里触发的呢? 其实是在fireChannelReadComplete中
    pipeline.fireChannelReadComplete(); 会调用到DefaultChannelPipeline.channelReadComplete

    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelReadComplete();
    
        readIfIsAutoRead();
    }
    
    
    private void readIfIsAutoRead() {
        if (channel.config().isAutoRead()) {
            channel.read();
        }
    }
    

    如果配置为AutoRead, 就会调用channel.read(), 进而调用 pipeline.read(), 最终就会触发ChannelOutboundHandler.read方法。

    到这里, netty启动, accept, read/write的一个完整流程都讲完了。
    netty是非常优秀的框架, 模块化做到很好, 对jdk的future, buffer这些模块都做了扩展,还自行进行了内存管理。
    对netty流程熟悉后, 就可以继续学习netty的这些闪光点, 网上也有很多优秀的教程了。

    下面是一些非常优秀的netty博客:
    Netty源码分析-占小狼
    Netty那点事-黄亿华
    Netty系列之Netty线程模型-李林锋
    Netty系列之Netty高性能之道-李林锋
    Netty_in_Action-译文
    Netty 源码分析之 一 揭开 Bootstrap 神秘的红盖头 (服务器端)

    相关文章

      网友评论

        本文标题:netty源码解析之IO读写(三)

        本文链接:https://www.haomeiwen.com/subject/mnjqvqtx.html