美文网首页
Netty源码笔记(二)ChannelPipeline的处理机制

Netty源码笔记(二)ChannelPipeline的处理机制

作者: 李亚林1990 | 来源:发表于2019-01-18 11:30 被阅读66次

    上篇分析Netty服务端的处理流程,本篇我们来看看Netty是如何通过ChannelPipeline来处理SelectionKey.OP_ACCEPT和SelectionKey.OP_READ事件接收的消息体。
    一、先看看几个典型执行器的类继承图


    image.png
    image.png
    image.png

    包含两类处理器,分别继承自ChannelInboundHandler和ChannelOutboundHandler。
    用户自定义处理器一般继承ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter,避免用户直接继承接口需要实现每一个抽象方法。
    二、NioServerSocketChannel对应的pipeline中处理器链

                    ch.eventLoop().execute(new Runnable() {
                        @Override
                        public void run() {
                            pipeline.addLast(new ServerBootstrapAcceptor(
                                    currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                        }
                    });
    

    NioServerSocketChannel初始化完毕后处理器链为:
    head ==》ServerBootstrapAcceptor==》tail
    找到触发的起始代码:

                    int size = readBuf.size();
                    for (int i = 0; i < size; i ++) {
                        readPending = false;
                        pipeline.fireChannelRead(readBuf.get(i));
                    }
                    readBuf.clear();
                    allocHandle.readComplete();
                    pipeline.fireChannelReadComplete();
    

    其中readBuf为List<NioSocketChannel>集合,pipeline.fireChannelRead方法逐个处理每一个新建立的连接。
    接下来我们按函数调用顺序看执行逻辑:

        @Override
        public final ChannelPipeline fireChannelRead(Object msg) {
            AbstractChannelHandlerContext.invokeChannelRead(head, msg);
            return this;
        }
        //第一步:AbstractChannelHandlerContext.invokeChannelRead为静态方法
        static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
            final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                next.invokeChannelRead(m);
            } else {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        next.invokeChannelRead(m);
                    }
                });
            }
        }
    
        private void invokeChannelRead(Object msg) {
            if (invokeHandler()) {
                try {
                    ((ChannelInboundHandler) handler()).channelRead(this, msg);
                } catch (Throwable t) {
                    notifyHandlerException(t);
                }
            } else {
                fireChannelRead(msg);
            }
        }
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                ctx.fireChannelRead(msg);
            }
        //此处就是我们找到的关键逻辑,该方法为head<HeadContext>继承自AbstractChannelHeadlerContext
        @Override
        public ChannelHandlerContext fireChannelRead(final Object msg) {
            //找到下一个处理器,回到第一步
            invokeChannelRead(findContextInbound(), msg);
            return this;
        }
        //查找下一个inbound处理器,从这里也可以看出inbound处理器的执行顺序为从head到tail,即addLast添加顺序
        private AbstractChannelHandlerContext findContextInbound() {
            AbstractChannelHandlerContext ctx = this;
            do {
                ctx = ctx.next;
            } while (!ctx.inbound);
            return ctx;
        }
    

    在此顺便延伸一下outbound的执行顺序,可以看出与inbound相反,代码如下:

        private AbstractChannelHandlerContext findContextOutbound() {
            AbstractChannelHandlerContext ctx = this;
            do {
                ctx = ctx.prev;
            } while (!ctx.outbound);
            return ctx;
        }
    

    三、NioSocketChannel对应的pipeline中处理器链
    在ServerBootstrapAcceptor中对每一个新建立的连接进行初始化

            @Override
            @SuppressWarnings("unchecked")
            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                final Channel child = (Channel) msg;
    
                child.pipeline().addLast(childHandler);
                。。。
                childGroup.register(child).addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (!future.isSuccess()) {
                                forceClose(child, future.cause());
                            }
                        }
                    });
             } 
    

    这里的childHandler就是最开始我们在启动引导类配置的bootstrap.childHandler(new ChannelInitializer)。
    NioSocketChannel初始化完毕后处理器链为:
    head ==》ChannelInitializer==》tail
    我们跟踪register方法,最终定位到AbstractChannel.register0函数:

          private void register0(ChannelPromise promise) {
                try {
                    。。。
                    boolean firstRegistration = neverRegistered;
                    doRegister();
                    neverRegistered = false;
                    registered = true;
    
                    // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
                    // user may already fire events through the pipeline in the ChannelFutureListener.
                    pipeline.invokeHandlerAddedIfNeeded();
    
                    。。。
                } catch (Throwable t) {
                   。。。
                }
            }
    

    这里我们只关注pipeline.invokeHandlerAddedIfNeeded();从注释中我们看到最终应该是调用的handlerAdded(...),
    我们继续跟踪代码

        final void invokeHandlerAddedIfNeeded() {
            assert channel.eventLoop().inEventLoop();
            if (firstRegistration) {
                firstRegistration = false;
                // We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers,
                // that were added before the registration was done.
                callHandlerAddedForAllHandlers();
            }
        }
    
        private void callHandlerAddedForAllHandlers() {
            final PendingHandlerCallback pendingHandlerCallbackHead;
            synchronized (this) {
                assert !registered;
    
                // This Channel itself was registered.
                registered = true;
    
                pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
                // Null out so it can be GC'ed.
                this.pendingHandlerCallbackHead = null;
            }
    
            // This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while
            // holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside
            // the EventLoop.
            PendingHandlerCallback task = pendingHandlerCallbackHead;
            while (task != null) {
                task.execute();
                task = task.next;
            }
        }
    

    接下来看看this.pendingHandlerCallbackHead;的实现逻辑

    private final class PendingHandlerAddedTask {
            @Override
            void execute() {
                EventExecutor executor = ctx.executor();
                if (executor.inEventLoop()) {
                    callHandlerAdded0(ctx);
                } else {
                    try {
                        executor.execute(this);
                    } catch (RejectedExecutionException e) {
                        remove0(ctx);
                        ctx.setRemoved();
                    }
                }
            }
    }
    
    private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
            try {
                ctx.handler().handlerAdded(ctx);
                ctx.setAddComplete();
            } catch (Throwable t) {
                boolean removed = false;
                try {
                    remove0(ctx);
                    try {
                        ctx.handler().handlerRemoved(ctx);
                    } finally {
                        ctx.setRemoved();
                    }
                    removed = true;
                } catch (Throwable t2) {
                    if (logger.isWarnEnabled()) {
                        logger.warn("Failed to remove a handler: " + ctx.name(), t2);
                    }
                }
            }
    }
    
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            if (ctx.channel().isRegistered()) {
                // This should always be true with our current DefaultChannelPipeline implementation.
                // The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
                // suprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
                // will be added in the expected order.
                initChannel(ctx);
            }
    }
    

    initChannel就是执行的就是我们在引导类覆盖实现的逻辑:

                    bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                                    pipeline.addLast("decode", new MessageDecoder());
                    pipeline.addLast("encode", new ClientMessageEncoder());
                }
            });
    

    这个时候NioSocketChannel的处理器链为:
    head ==》ChannelInitializer==》MessageDecoder==》ClientMessageEncoder==》tail
    会发现ChannelInitializer的作用只是用来添加处理器链,并不涉及消息体的处理,应该移除。
    这就是remove0的处理,移除自身,代码如下:

        private static void remove0(AbstractChannelHandlerContext ctx) {
            AbstractChannelHandlerContext prev = ctx.prev;
            AbstractChannelHandlerContext next = ctx.next;
            prev.next = next;
            next.prev = prev;
        }
    

    最终NioSocketChannel的处理器链为:
    head ==》MessageDecoder==》ClientMessageEncoder==》tail
    至此我们基本理清了ChannelPipeline的处理机制。

    相关文章

      网友评论

          本文标题:Netty源码笔记(二)ChannelPipeline的处理机制

          本文链接:https://www.haomeiwen.com/subject/clpbdqtx.html