美文网首页
Netty源码4--pipline

Netty源码4--pipline

作者: xian_cheng | 来源:发表于2018-11-05 15:44 被阅读0次

    上篇介绍accept连接的过程中已经介绍服务端在收到一个客户端请求后,会将客户端和服务端建立的NioSocketChannel注册到NioEventLoopGroup,由NioEventLoop完成这个channel上的事件监听,由于Netty在接收到消息的处理流程大概如下:


    netty时序图.png

    (1)客户端发送消息到服务端。
    (2)服务端监听对应channel上的io事件,监听到读事件后读取消息。
    (3)将读取到的字节数据放入ByteBuf中,然后调用pipline的fireChannelRead方法,将这个ByteBuf中保存的数据传递给pipline,pipline中保存了一个链表,这个链表中每个节点里都存储一个channelHandler,在channelHandler中可以对ByteBuf中的数据进行校验、解码等操作。

    这是一个非常粗的消息交互过程,主要是先清楚一下消息的大概处理流程,从上面的三个步骤来看,pipline是一个非常重要的对象,如果想要搞清楚服务器消息处理的流程,有必要研究下这个对象的源码实现。对于pipline这个对象,我想要搞清楚的主要是下面三个问题:
    (1)channelHandler是怎么绑定到pipline中的?
    (2) pipline的数据结构是怎样的?
    (3)消息是怎么由一个channelHandler传递到另一个channelHandler的?

    首先分析第一个问题,为了便于理解,先给出demo程序。

        public void start() {
            NioEventLoopGroup bossGroup = new NioEventLoopGroup();
            NioEventLoopGroup workerGroup = new NioEventLoopGroup();
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .localAddress(new InetSocketAddress(port))
                    .childHandler(new ChannelInitializer<NioSocketChannel>() {
                        protected void initChannel(NioSocketChannel ch) {
                            ch.pipeline().addLast(new InBoundHandler1());
                            ch.pipeline().addLast(new InBoundHandler2());
                            ch.pipeline().addLast(new InBoundHandler3());
                        }
                    });
           ....
        }
    

    在上面server的启动程序中,.childHandler的作用就是将用户自定义的channelHandler添加到pipeline中。我们主要看下是在什么时候用户自定义的channelHandler添加到pipeline中的。
    .childHandler的作用主要是初始化了ServerBootstrap这个对象的childHandler对象,将这个对象的引用指向ChannelInitializer的实现类。看下这个childHandler对象在哪使用用的,通过追溯代码发现是在ServerBootstrap的init方法中使用的。

        @Override
        void init(Channel channel) throws Exception {
            ChannelPipeline p = channel.pipeline();
            final ChannelHandler currentChildHandler = childHandler;
            ....
            p.addLast(new ChannelInitializer<Channel>() {
                @Override
                public void initChannel(final Channel ch) throws Exception {
                    ....
                    ch.eventLoop().execute(new Runnable() {
                        @Override
                        public void run() {
                            pipeline.addLast(new ServerBootstrapAcceptor(
                                    ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                        }
                    });
                }
            });
        }
    

    从上面代码中可以看到,在new ServerBootstrapAcceptor时将这个childHandler对象传递给了ServerBootstrapAcceptor类中的childHandler成员变量。继续看下ServerBootstrapAcceptor类中的childHandler在哪使用的,发现是在ServerBootstrapAcceptor类中的channelRead方法中使用的。

           @Override
            @SuppressWarnings("unchecked")
            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                final Channel child = (Channel) msg;
                child.pipeline().addLast(childHandler);
                ......
            }
    

    前面的文章已经介绍这个方法是在接收到一个新连接时,在这个方法中将客户端和服务端建立的NioSocketChannel注册到NioEventLoopGroup,由NioEventLoop完成这个channel上的事件监听。本文关心的是pipline和channelHandler的关系,因此我们主要看下child.pipeline().addLast(childHandler)这句代码,跟下这句代码的实现。

        @Override
        public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
            final AbstractChannelHandlerContext newCtx;
            synchronized (this) {
                checkMultiplicity(handler);
    
                newCtx = newContext(group, filterName(name, handler), handler);
    
                addLast0(newCtx);
    
                if (!registered) {
                    newCtx.setAddPending();
                    callHandlerCallbackLater(newCtx, true);
                    return this;
                }
            ....
        }
    

    具体实现在DefaultChannelPipeline的addLast方法中,主要看下面这两句代码:

          newCtx = newContext(group, filterName(name, handler), handler);
          addLast0(newCtx);
    

    首先创建一个DefaultChannelHandlerContext对象,childHandler的引用传给了这个对象,那么这个对象就持有了对childHandler的引用,然后调用addLast0方法。

        private void addLast0(AbstractChannelHandlerContext newCtx) {
            AbstractChannelHandlerContext prev = tail.prev;
            newCtx.prev = prev;
            newCtx.next = tail;
            prev.next = newCtx;
            tail.prev = newCtx;
        }
    

    从这个方法就可以很清楚的看到DefaultChannelPipeline保存了一个双向链表的数据结构,用于记录channelHandler的处理顺序。下图说明了DefaultChannelPipeline初始化时的数据结构。


    DefaultChannelPipeline初始化链表结构.png

    从上图可以看出,DefaultChannelPipeline初始化时有一个head节点和tail节点,每个节点对应的都是一个AbstractChannelHandlerContext类,如果对双向链表很清楚的话,AbstractChannelHandlerContext类中应该会有两个指针,一个指向前一个节点,一个指向后一个节点。
    上图是初始化时的数据结构,那么执行addLast0方法之后的数据结构如下图所示:


    DefaultChannelPipeline add last.png

    这里为了直观一些,插入的节点用了channelHandler表示,其实这个节点也是一个AbstractChannelHandlerContext对象,这个对象持有了对childHandler的引用。我们知道这个childHandler指向的只是在demo程序中实现ChannelInitializer类initChannel方法的一个实现类,demo程序中InBoundHandler1、InBoundHandler2、InBoundHandler3这三个具体的channelHandler还没有添加到这个链表里,那这几个handler是在哪添加的呢,我们继续回到addLast这个方法的实现。

        @Override
        public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
            final AbstractChannelHandlerContext newCtx;
            synchronized (this) {
                checkMultiplicity(handler);
    
                newCtx = newContext(group, filterName(name, handler), handler);
    
                addLast0(newCtx);
    
                if (!registered) {
                    newCtx.setAddPending();
                    callHandlerCallbackLater(newCtx, true);
                    return this;
                }
            ....
        }
    

    这个方法首先将childHandler放入了一个新建的AbstractChannelHandlerContext对象,然后将这个AbstractChannelHandlerContext对象又放入了DefaultChannelPipeline的链表中。
    然后根据registered这个布尔变量,判断是否进入if条件,registered这个布尔变量表示的是这个DefaultChannelPipeline对应的channel是否已经注册到NioEventLoopGroup,由于接收到一个新连接调用channelRead方法时先将childHandler放入DefaultChannelPipeline,然后才将客户端和服务端建立的NioSocketChannel注册到NioEventLoopGroup,所以此时registered布尔变量为false,进入if条件。进入if条件后,看下 callHandlerCallbackLater这个方法。

        private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
            assert !registered;
    
            PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
            PendingHandlerCallback pending = pendingHandlerCallbackHead;
            if (pending == null) {
                pendingHandlerCallbackHead = task;
            } else {
                // Find the tail of the linked-list.
                while (pending.next != null) {
                    pending = pending.next;
                }
                pending.next = task;
            }
        }
    

    这个方法中将刚才新建的AbstractChannelHandlerContext对象放入了PendingHandlerAddedTask这个线程类中,看下这个线程类。

        private final class PendingHandlerAddedTask extends PendingHandlerCallback {
            @Override
            public void run() {
                callHandlerAdded0(ctx);
            }
    
            @Override
            void execute() {
                EventExecutor executor = ctx.executor();
                if (executor.inEventLoop()) {
                    callHandlerAdded0(ctx);
                } else {
                    try {
                        executor.execute(this);
                    } catch (RejectedExecutionException e) {
                        if (logger.isWarnEnabled()) {
                            logger.warn(
                                    "Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.",
                                    executor, ctx.name(), e);
                        }
                        remove0(ctx);
                        ctx.setRemoved();
                    }
                }
            }
        }
    

    这个线程类中一个实现了Runnable的run方法,一个实现了execute方法,这两个方法中调用了callHandlerAdded0这个方法,我们继续跟下这个方法的实现。

        private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
            try {
                ctx.setAddComplete();
                ctx.handler().handlerAdded(ctx);
            } catch (Throwable t) {
                ...
                } catch (Throwable t2) {
                ....
        }
    

    这个方法中调用了 ctx.handler().handlerAdded(ctx)方法,其中 ctx.handler()返回的是实现ChannelInitializer类initChannel方法的一个实现类,说明这里调用的是ChannelInitializer类中的handlerAdded方法,进入handlerAdded方法看下具体实现。

        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            if (ctx.channel().isRegistered()) {
                initChannel(ctx);
            }
        }
    
        private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
            if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
                try {
                    initChannel((C) ctx.channel());
                } catch (Throwable cause) {          
                    exceptionCaught(ctx, cause);
                } finally {
                    remove(ctx);
                }
                return true;
            }
            return false;
        }
    

    在handlerAdded方法中调用initChannel(ChannelHandlerContext ctx)方法,我们在这个方法中发现调用了initChannel((C) ctx.channel())这个方法,这个方法是个抽象方法,具体实现就是在demo程序中。我们在demo中这个方法的具体实现如下:

    .childHandler(new ChannelInitializer<NioSocketChannel>() {
                        protected void initChannel(NioSocketChannel ch) {
                            ch.pipeline().addLast(new InBoundHandler1());
                            ch.pipeline().addLast(new InBoundHandler2());
                            ch.pipeline().addLast(new InBoundHandler3());
                        }
                    });
    

    通过调用ch.pipeline().addLast这个方法将InBoundHandler1、InBoundHandler2、InBoundHandler3这三个具体的channelHandler添加DefaultChannelPipeline的链表中,这步操作完成后,DefaultChannelPipeline的链表结构如下所示:


    pipline init.png

    我们继续看下initChannel(ChannelHandlerContext ctx)方法,我们发现在将这三个handler放入链表中,最后调用了 remove(ctx)方法,这个方法是将实现ChannelInitializer类initChannel方法的一个实现类对应的AbstractChannelHandlerContext从链表中删除,这样操作后,DefaultChannelPipeline的链表结构就变成了下面这样。


    pipline finish.png

    经过上面的分析,我们就把前面两个问题给解决了,明白了channelHandler是怎么绑定到pipline中的以及pipline的数据结构是怎样的。

    第三个问题关于消息是怎么由一个channelHandler传递到另一个channelHandler的,放到下一篇服务端消息处理来具体讲。

    相关文章

      网友评论

          本文标题:Netty源码4--pipline

          本文链接:https://www.haomeiwen.com/subject/cvcfxqtx.html