美文网首页netty
netty 读写过程

netty 读写过程

作者: binecy | 来源:发表于2018-01-03 17:38 被阅读51次

    源码分析基于netty 4

    前面已经说过netty对accept事件的处理,现在来讲讲netty中的read/write过程。

    开始前,先重点说说netty的Handler/Pipeline

    netty的Handler分为ChannelInboundHandler、ChannelOutboundHandler两大类。ChannelInboundHandler对从客户端发往服务器的报文进行处理,一般用来执行解码、读取客户端数据、进行业务处理等;ChannelOutboundHandler对从服务器发往客户端的报文进行处理,一般用来进行编码、发送报文到客户端。

    Handler是netty提供的扩展点,非常重要。通过Handler可以完成通讯报文的解码编码、拦截指定的报文、统一错误处理、请求计数等工作。众多的网络协议都是通过Handler完成的, 如http、自定义rpc等。

    Netty中,可以注册多个handler,形成Pipeline。ChannelInboundHandler按照注册的先后顺序执行;ChannelOutboundHandler按照注册的先后顺序逆序执行

    netty在ChannelPipeline类的注释中给出了如下示意图

    
                                                     I/O Request
                                                via Channel or
                                            ChannelHandlerContext
                                                          |
      +---------------------------------------------------+---------------+
      |                           ChannelPipeline         |               |
      |                                                  \|/              |
      |    +---------------------+            +-----------+----------+    |
      |    | Inbound Handler  N  |            | Outbound Handler  1  |    |
      |    +----------+----------+            +-----------+----------+    |
      |              /|\                                  |               |
      |               |                                  \|/              |
      |    +----------+----------+            +-----------+----------+    |
      |    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
      |    +----------+----------+            +-----------+----------+    |
      |              /|\                                  .               |
      |               .                                   .               |
      | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
      |        [ method call]                       [method call]         |
      |               .                                   .               |
      |               .                                  \|/              |
      |    +----------+----------+            +-----------+----------+    |
      |    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
      |    +----------+----------+            +-----------+----------+    |
      |              /|\                                  |               |
      |               |                                  \|/              |
      |    +----------+----------+            +-----------+----------+    |
      |    | Inbound Handler  1  |            | Outbound Handler  M  |    |
      |    +----------+----------+            +-----------+----------+    |
      |              /|\                                  |               |
      +---------------+-----------------------------------+---------------+
                      |                                  \|/
      +---------------+-----------------------------------+---------------+
      |               |                                   |               |
      |       [ Socket.read() ]                    [ Socket.write() ]     |
      |                                                                   |
      |  Netty Internal I/O Threads (Transport Implementation)            |
      +-------------------------------------------------------------------+
    

    就是Channel/ChannelHandlerContext writer会依次通过Outbound处理, 最后通过socket write出去
    从socket read到的数据, 也会依次交给Inbound处理。

    channelHandler.png

    netty中还定义了ChannelInboundInvoker/ChannelOutboundInvoker, Invoker和ChannelHandler接口基本一致,只是参数少了ChannelHandlerContext(Invoker接口少了ChannelHandlerContext上下文)。

    ChannelHandlerContext和ChannelPipeline接口都继承了ChannelInboundInvoker和ChannelOutboundInvoker

    DefaultChannelPipeline是netty中的核心Pipeline, 聚合了ChannelHandlerContext, ChannelHandlerContext可以看做pipeline的节点。 HeadContext/TailContext(图中没展示)是Pipeline的开始/结束节点。

    当触发ChannelPipeline的事件时, netty会将事件委派给ChannelHandlerContext, 再由ChannelHandlerContext委派到ChannelHandler进行处理。

    ChannelHandler中有handlerAdded/handlerRemoved方法,当一个ChannelHandler添加/移除Pipeline中,会触发这些事件。
    ChannelInitializer是一个特殊的InboundHandler,提供了抽象的initChannel方法,用于提供给用户向pipeline添加自定义的ChannelHandler

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().isRegistered()) {
            initChannel(ctx);
        }
    }
    

    可以看到, ChannelInitializer添加到Pipeline后, 会调用initChannel方法

    值得注意的是,DefaultChannelPipeline的handlerAdded是通过task执行的(不过有时也会强行执行)。
    可以看一下DefaultChannelPipeline.addLast

    @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            checkMultiplicity(handler);
    
            newCtx = newContext(group, filterName(name, handler), handler);
    
            addLast0(newCtx);
    
            // 如果registered为false,那channel就没有注册到任何eventloop
            // 所以调用callHandlerCallbackLater方法,延时进行
            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }
    
            // 立即执行task
            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }
    

    callHandlerCallbackLater就是把task放到pendingHandlerCallbackHead,当loop启动后,会执行这些task

    回想一下启动注册过程中调用的AbstractUnsafe.register0方法, 会调用pipeline.invokeHandlerAddedIfNeeded();, 这里会强制执行pendingHandlerCallbackHead。

    先来看一个小栗子
    我们将tcp请求的内容通过"|"分割为一个字符串数组,进行逻辑处理后,再将结果数组用"|"合并为一个字符串返回给用户。

    负责分割的InboundHandler

    public class SplitHandler  extends ChannelInboundHandlerAdapter {
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("split in inboundHandler channelRead");
            ByteBuf in = (ByteBuf) msg;
            byte[] bytes = new byte[in.writerIndex()];
            in.readBytes(bytes);
            String s = new String(bytes);
    
            System.out.println("read string : " + s);
            ctx.fireChannelRead(s.split("\\|"));
        }
    }
    

    负责合并的OutboundHandler

    public class MergeHandler extends ChannelOutboundHandlerAdapter {
        public void read(ChannelHandlerContext ctx) throws Exception {
            System.out.println("outbound read");
            ctx.read();
        }
    
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            System.out.println("outbound write");
    
            String[] arr = (String[])msg;
            StringBuilder sb = new StringBuilder();
            for (int i = 0; i < arr.length; i++) {
                if(i > 0) {
                    sb.append("|");
                }
                sb.append(arr[i]);
            }
            System.out.println("response str : " + sb.toString());
            byte[] bytes = sb.toString().getBytes();
            ByteBuf byteBuf = ctx.alloc().buffer(bytes.length);
            byteBuf.writeBytes(bytes);
            ctx.write(byteBuf, promise);
        }
    }
    

    这里重写read方法, 主要是想关注该方法的触发时机。

    简单的逻辑处理:

    public class LogicHandler extends ChannelInboundHandlerAdapter {
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("server inboundHandler channelRead");
            String[] arr = (String[])msg;
    
            for (int i = 0; i < arr.length; i++) {
                String s = arr[i];
                System.out.println("split result : " + s);
                arr[i] = "ok for " + s;
            }
            ctx.writeAndFlush(arr);
        }
    }
    

    server端

    ServerBootstrap b = new ServerBootstrap();
    b.group(parentGroup, childGroup)
            .channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG, 100)
            .handler(new LoggingHandler(LogLevel.INFO))
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline p = ch.pipeline();
    
                    p.addLast(new MergeHandler());
                    p.addLast(new SplitHandler());
                    p.addLast(new LogicHandler());
    
                }
            });
    

    运行栗子后, 用telnet发送123|456字符串, 得到结果

    binecy ~/work/shadowsocks $ telnet 127.0.0.1 8007
    Trying 127.0.0.1...
    Connected to 127.0.0.1.
    Escape character is '^]'.
    123|345
    ok for 123|ok for 345
    

    服务端输出

    split in inboundHandler channelRead
    read string : 123|456
    
    logic handle in inboundHandler channelRead
    split result : 123
    split result : 456
    
    merge in outboundHandler write
    response str : ok for 123|ok for 456
    
    fire outboundHandler read
    

    以这个栗子入手,分析netty中read/write过程
    注意一下, accept后, netty中pipeline如下:
    head > ServerBootstrapAcceptor > MergeHandler > SplitHandler > LogicHandler > tail

    read

    回顾一下, NioEventLoop中对read事件的处理

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        int readyOps = k.readyOps();
        
        ...
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
            if (!ch.isOpen()) {
                return;
            }
        }
    }
    

    unsafe.read()这个方法, 会调用到AbstractNioByteChannel.read():

    final ChannelPipeline pipeline = pipeline();
    final ByteBufAllocator allocator = config.getAllocator();
    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
    allocHandle.reset(config);
    
    ByteBuf byteBuf = null;
    boolean close = false;
    
        do {
            // 申请缓存区空间
            byteBuf = allocHandle.allocate(allocator);
            // 从socket读取数据到缓存区
            allocHandle.lastBytesRead(doReadBytes(byteBuf));
    
            allocHandle.incMessagesRead(1);
            
            // 触发ChannelRead事件
            pipeline.fireChannelRead(byteBuf);
            byteBuf = null;
        } while (allocHandle.continueReading());
    
        allocHandle.readComplete();
        // 触发ChannelReadComplete事件
        pipeline.fireChannelReadComplete();
    
    

    pipeline.fireChannelRead触发read事件,看看DefaultChannelPipeline的处理

    public final ChannelPipeline fireChannelRead(Object msg) {
        AbstractChannelHandlerContext.invokeChannelRead(head, msg);
        return this;
    }
    

    很简单, pipeline的事件会委托给ChannelHandlerContext处理, 从head开始处理

    invokeChannelRead会调用到AbstractChannelHandlerContext.invokeChannelRead

    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRead(m);
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRead(m);
                }
            });
        }
    }
    
    
    private void invokeChannelRead(Object msg) {
        if (invokeHandler()) {  // 检查handler的状态
            try {
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelRead(msg);
        }
    }    
    

    invokeHandler()会检查ChannelHandler是否已经调用了handlerAdded

    看看HeadContext.channelRead

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ctx.fireChannelRead(msg);
    }
    

    ctx.fireChannelRead调用的是AbstractChannelHandlerContext.fireChannelRead:

    public ChannelHandlerContext fireChannelRead(final Object msg) {
        invokeChannelRead(findContextInbound(), msg);
        return this;
    }
    

    findContextInbound会找到下一个InboundHandler

    private AbstractChannelHandlerContext findContextInbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.next;
        } while (!ctx.inbound);
        return ctx;
    }
    

    ctx.fireChannelRead(msg);的作用就是找到下一个InboundHandler,并调用它的fireChannelRead方法, 所以我们重写InboundHandler的fireChannelRead方法,方法最后也要调用ctx.fireChannelRead(msg);,以免调用链就此断掉, 除非使用write。这里会沿pipeline,依次查找InboundHandler并fireChannelRead方法。

    write

    下面看看write过程
    LogicHandler中调用ctx.writeAndFlush触发write过程, 调用到AbstractChannelHandlerContext.write:

        private void write(Object msg, boolean flush, ChannelPromise promise) {
            AbstractChannelHandlerContext next = findContextOutbound();
            final Object m = pipeline.touch(msg, next);
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                if (flush) {
                    next.invokeWriteAndFlush(m, promise);
                } else {
                    next.invokeWrite(m, promise);
                }
            } else {
                AbstractWriteTask task;
                if (flush) {
                    task = WriteAndFlushTask.newInstance(next, m, promise);
                }  else {
                    task = WriteTask.newInstance(next, m, promise);
                }
                safeExecute(executor, task, promise, m);
            }
        }
    

    findContextOutbound会找到当前节点前一个OutboundHandler

    private AbstractChannelHandlerContext findContextOutbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.prev;
        } while (!ctx.outbound);
        return ctx;
    }
    

    next.invokeWriteAndFlush还是调用到AbstractChannelHandlerContext.invokeWriteAndFlush

    private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
        if (invokeHandler()) {
            invokeWrite0(msg, promise);
            invokeFlush0();
        } else {
            writeAndFlush(msg, promise);
        }
    }
    

    invokeWrite0也比较简单, 就是调用handler的处理

        private void invokeWrite0(Object msg, ChannelPromise promise) {
            try {
                ((ChannelOutboundHandler) handler()).write(this, msg, promise);
            } catch (Throwable t) {
                notifyOutboundHandlerException(t, promise);
            }
        }
    

    LogicHandler前一个OutboundHandler是MergeHandler, 所以会调用到MergeHandler.write方法, 进行字符串数组合并。

    MergeHandler.write调用ctx.write会调用HeadContext, 注意, HeadContext既是InboundHandler, 也是OutboundHandler

    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        unsafe.write(msg, promise);
    }
    

    这里调用了AbstractUnsafe.write, 将数据write到socket中,具体过程这里不再描述。

    invokeFlush0();也是类似的流程, 这里不再复述。

    那么OutboundHandler的read事件, 是在哪里触发的呢? 其实是在fireChannelReadComplete中
    pipeline.fireChannelReadComplete(); 会调用到DefaultChannelPipeline.channelReadComplete

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelReadComplete();
    
        readIfIsAutoRead();
    }
    
    
    private void readIfIsAutoRead() {
        if (channel.config().isAutoRead()) {
            channel.read();
        }
    }
    

    如果配置为AutoRead, 就会调用channel.read(), 进而调用 pipeline.read(), 最终就会触发MergeHandler.read方法。

    到这里, netty启动, accept, read/write的一个完整流程都讲完了。
    netty是非常优秀的框架, 模块化做到很好, 对jdk的future, buffer这些模块都做了扩展,还自行进行了内存管理。
    对netty流程熟悉后, 就可以继续学习netty的这些闪光点, 网上也有很多优秀的教程了。

    下面是一些非常优秀的netty博客:
    Netty源码分析-占小狼
    Netty那点事-黄亿华
    Netty系列之Netty线程模型-李林锋
    Netty系列之Netty高性能之道-李林锋
    Netty_in_Action-译文

    相关文章

      网友评论

        本文标题:netty 读写过程

        本文链接:https://www.haomeiwen.com/subject/lfqknxtx.html