美文网首页
Netty源码分析5--服务端消息处理流程

Netty源码分析5--服务端消息处理流程

作者: xian_cheng | 来源:发表于2018-11-05 19:55 被阅读0次

    上篇文章中已经介绍了pipline相关的内容pipline,如果对这部分内容比较熟悉的话,理解这部分内容就很简单了。为了容易说明,还是把上一节的demo程序先放到这里。

        public void start() {
            NioEventLoopGroup bossGroup = new NioEventLoopGroup();
            NioEventLoopGroup workerGroup = new NioEventLoopGroup();
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .localAddress(new InetSocketAddress(port))
                    .childHandler(new ChannelInitializer<NioSocketChannel>() {
                        protected void initChannel(NioSocketChannel ch) {
                            ch.pipeline().addLast(new InBoundHandler1());
                            ch.pipeline().addLast(new InBoundHandler2());
                            ch.pipeline().addLast(new InBoundHandler3());
                        }
                    });
           ....
        }
    

    服务端能够接收消息的前提是已经和客户端建立一个channel通道,想要了解这个channel怎么建立的可以参考这篇文章Netty源码--accept连接,这里不再赘述。
    通道建立后,是在NioEventLoop类中监听这个channel的读写事件,具体过程之前已经在这篇文章Netty源码--accept连接中分析,这里直接跳到processSelectedKey这个方法的实现:

        private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
            final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
            try {
                int readyOps = k.readyOps();
                if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                    int ops = k.interestOps();
                    ops &= ~SelectionKey.OP_CONNECT;
                    k.interestOps(ops);
                    unsafe.finishConnect();
                }
                if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                    ch.unsafe().forceFlush();
                }
             
                if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                    unsafe.read();
                }
            } catch (CancelledKeyException ignored) {
                unsafe.close(unsafe.voidPromise());
            }
        }
    

    在这个方法中,当监听到读事件后,会调用unsafe的read方法,那么就看下这个unsafe的具体类型是啥。
    AbstractNioChannel.NioUnsafe unsafe = ch.unsafe()这句代码返回了一个NioUnsafe对象,NioUnsafe 是一个接口,具体实现类主要有两个NioByteUnsafe和NioMessageUnsafe。由于这里的unsafe是通过调用ch.unsafe生成的,ch具体类型是NioSocketChannel,通过追溯代码这个unsafe是在NioSocketChannel的构造函数中通过调用这个类的newUnsafe方法初始化的。

        @Override
        protected AbstractNioUnsafe newUnsafe() {
            return new NioSocketChannelUnsafe();
        }
    
        private final class NioSocketChannelUnsafe extends NioByteUnsafe {
    

    从上面代码可以看到,这个unsafe是一个NioByteUnsafe类型的,因此监听到读事件后调用的unsafe.read()这个方法具体实现就是在NioByteUnsafe这个类中。

            @Override
            public final void read() {
                final ChannelConfig config = config();
                final ChannelPipeline pipeline = pipeline();
                final ByteBufAllocator allocator = config.getAllocator();
                final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
                allocHandle.reset(config);
    
                ByteBuf byteBuf = null;
                boolean close = false;
                try {
                    do {
                        byteBuf = allocHandle.allocate(allocator);
                        allocHandle.lastBytesRead(doReadBytes(byteBuf));
                        if (allocHandle.lastBytesRead() <= 0) {
                            // nothing was read. release the buffer.
                            byteBuf.release();
                            byteBuf = null;
                            close = allocHandle.lastBytesRead() < 0;
                            break;
                        }
    
                        allocHandle.incMessagesRead(1);
                        readPending = false;
                        pipeline.fireChannelRead(byteBuf);
                        byteBuf = null;
                    } while (allocHandle.continueReading());
    
                    allocHandle.readComplete();
                    pipeline.fireChannelReadComplete();
    
                    if (close) {
                        closeOnRead(pipeline);
                    }
                } catch (Throwable t) {
                    handleReadException(pipeline, byteBuf, t, close, allocHandle);
                } finally {
                  
                    if (!readPending && !config.isAutoRead()) {
                        removeReadOp();
                    }
                }
            }
        }
    

    这个方法首先调用doReadBytes这个方法读取数据到ByteBuf中,然后调用 pipeline.fireChannelRead(byteBuf)将ByteBuf中数据发送到pipeline中保存的第一个handler中,看下具体调用过程。

        @Override
        public final ChannelPipeline fireChannelRead(Object msg) {
            AbstractChannelHandlerContext.invokeChannelRead(head, msg);
            return this;
        }
    

    首先调用DedaultChannelPipline类中的fireChannelRead方法,在这个方法中调用了AbstractChannelHandlerContext这个类的invokeChannelRead方法,并将DedaultChannelPipline的指向链表首节点的head指针作为这个方法的参数传递进去。

        static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
            final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                next.invokeChannelRead(m);
            } else {
              
                });
            }
        }
    

    最终调用next.invokeChannelRead(m)方法,handler()返回的是HeadContext类,看下这个类中invokeChannelRead方法的实现。

        private void invokeChannelRead(Object msg) {
            if (invokeHandler()) {
                try {
                    ((ChannelInboundHandler) handler()).channelRead(this, msg);
                } catch (Throwable t) {
                    notifyHandlerException(t);
                }
            } else {
                fireChannelRead(msg);
            }
        }
              @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                ctx.fireChannelRead(msg);
            }
    

    然后调用ctx.fireChannelRead(msg)这个方法,其实选择channel的逻辑主要在这个方法实现。

        @Override
        public ChannelHandlerContext fireChannelRead(final Object msg) {
            invokeChannelRead(findContextInbound(), msg);
            return this;
        }
        private AbstractChannelHandlerContext findContextInbound() {
            AbstractChannelHandlerContext ctx = this;
            do {
                ctx = ctx.next;
            } while (!ctx.inbound);
            return ctx;
        }
    

    findContextInbound这个方法其实返回的就是DefaultChannelPipeline中链表中下一个需要处理的channelHandler,通过这个方法使消息能够在多个channelHandler传递。选择好下一个channelHandler所对应的AbstractChannelHandlerContext类后,调用invokeChannelRead(final AbstractChannelHandlerContext next, Object msg)方法。

        static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
            final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                next.invokeChannelRead(m);
            } else {
              
                });
            }
        }
    
        private void invokeChannelRead(Object msg) {
            if (invokeHandler()) {
                try {
                    ((ChannelInboundHandler) handler()).channelRead(this, msg);
                } catch (Throwable t) {
                    notifyHandlerException(t);
                }
            } else {
                fireChannelRead(msg);
            }
        }
    

    主要看这句 ((ChannelInboundHandler) handler()).channelRead(this, msg),handler()返回的是当前AbstractChannelHandlerContext 对应的channelHandler,这个channelHandler其实就是我们在demo程序中初始化时添加的InBoundHandler1、InBoundHandler2、InBoundHandler3。这三个类都继承ChannelInboundHandlerAdapter,实现了channelRead方法,这样我们就可以在这个channelRead方法根据自己的协议以及业务特点,对数据做特定的处理,这也是netty作为一个网络通信框架非常灵活的一点。

    相关文章

      网友评论

          本文标题:Netty源码分析5--服务端消息处理流程

          本文链接:https://www.haomeiwen.com/subject/jhmsxqtx.html