美文网首页
Netty源码笔记(二)ChannelPipeline的处理机制

Netty源码笔记(二)ChannelPipeline的处理机制

作者: 李亚林1990 | 来源:发表于2019-01-18 11:30 被阅读66次

上篇分析Netty服务端的处理流程,本篇我们来看看Netty是如何通过ChannelPipeline来处理SelectionKey.OP_ACCEPT和SelectionKey.OP_READ事件接收的消息体。
一、先看看几个典型执行器的类继承图


image.png
image.png
image.png

包含两类处理器,分别继承自ChannelInboundHandler和ChannelOutboundHandler。
用户自定义处理器一般继承ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter,避免用户直接继承接口需要实现每一个抽象方法。
二、NioServerSocketChannel对应的pipeline中处理器链

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });

NioServerSocketChannel初始化完毕后处理器链为:
head ==》ServerBootstrapAcceptor==》tail
找到触发的起始代码:

                int size = readBuf.size();
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    pipeline.fireChannelRead(readBuf.get(i));
                }
                readBuf.clear();
                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();

其中readBuf为List<NioSocketChannel>集合,pipeline.fireChannelRead方法逐个处理每一个新建立的连接。
接下来我们按函数调用顺序看执行逻辑:

    @Override
    public final ChannelPipeline fireChannelRead(Object msg) {
        AbstractChannelHandlerContext.invokeChannelRead(head, msg);
        return this;
    }
    //第一步:AbstractChannelHandlerContext.invokeChannelRead为静态方法
    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRead(m);
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRead(m);
                }
            });
        }
    }

    private void invokeChannelRead(Object msg) {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelRead(msg);
        }
    }
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ctx.fireChannelRead(msg);
        }
    //此处就是我们找到的关键逻辑,该方法为head<HeadContext>继承自AbstractChannelHeadlerContext
    @Override
    public ChannelHandlerContext fireChannelRead(final Object msg) {
        //找到下一个处理器,回到第一步
        invokeChannelRead(findContextInbound(), msg);
        return this;
    }
    //查找下一个inbound处理器,从这里也可以看出inbound处理器的执行顺序为从head到tail,即addLast添加顺序
    private AbstractChannelHandlerContext findContextInbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.next;
        } while (!ctx.inbound);
        return ctx;
    }

在此顺便延伸一下outbound的执行顺序,可以看出与inbound相反,代码如下:

    private AbstractChannelHandlerContext findContextOutbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.prev;
        } while (!ctx.outbound);
        return ctx;
    }

三、NioSocketChannel对应的pipeline中处理器链
在ServerBootstrapAcceptor中对每一个新建立的连接进行初始化

        @Override
        @SuppressWarnings("unchecked")
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            final Channel child = (Channel) msg;

            child.pipeline().addLast(childHandler);
            。。。
            childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
         } 

这里的childHandler就是最开始我们在启动引导类配置的bootstrap.childHandler(new ChannelInitializer)。
NioSocketChannel初始化完毕后处理器链为:
head ==》ChannelInitializer==》tail
我们跟踪register方法,最终定位到AbstractChannel.register0函数:

      private void register0(ChannelPromise promise) {
            try {
                。。。
                boolean firstRegistration = neverRegistered;
                doRegister();
                neverRegistered = false;
                registered = true;

                // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
                // user may already fire events through the pipeline in the ChannelFutureListener.
                pipeline.invokeHandlerAddedIfNeeded();

                。。。
            } catch (Throwable t) {
               。。。
            }
        }

这里我们只关注pipeline.invokeHandlerAddedIfNeeded();从注释中我们看到最终应该是调用的handlerAdded(...),
我们继续跟踪代码

    final void invokeHandlerAddedIfNeeded() {
        assert channel.eventLoop().inEventLoop();
        if (firstRegistration) {
            firstRegistration = false;
            // We are now registered to the EventLoop. It's time to call the callbacks for the ChannelHandlers,
            // that were added before the registration was done.
            callHandlerAddedForAllHandlers();
        }
    }

    private void callHandlerAddedForAllHandlers() {
        final PendingHandlerCallback pendingHandlerCallbackHead;
        synchronized (this) {
            assert !registered;

            // This Channel itself was registered.
            registered = true;

            pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
            // Null out so it can be GC'ed.
            this.pendingHandlerCallbackHead = null;
        }

        // This must happen outside of the synchronized(...) block as otherwise handlerAdded(...) may be called while
        // holding the lock and so produce a deadlock if handlerAdded(...) will try to add another handler from outside
        // the EventLoop.
        PendingHandlerCallback task = pendingHandlerCallbackHead;
        while (task != null) {
            task.execute();
            task = task.next;
        }
    }

接下来看看this.pendingHandlerCallbackHead;的实现逻辑

private final class PendingHandlerAddedTask {
        @Override
        void execute() {
            EventExecutor executor = ctx.executor();
            if (executor.inEventLoop()) {
                callHandlerAdded0(ctx);
            } else {
                try {
                    executor.execute(this);
                } catch (RejectedExecutionException e) {
                    remove0(ctx);
                    ctx.setRemoved();
                }
            }
        }
}

private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
        try {
            ctx.handler().handlerAdded(ctx);
            ctx.setAddComplete();
        } catch (Throwable t) {
            boolean removed = false;
            try {
                remove0(ctx);
                try {
                    ctx.handler().handlerRemoved(ctx);
                } finally {
                    ctx.setRemoved();
                }
                removed = true;
            } catch (Throwable t2) {
                if (logger.isWarnEnabled()) {
                    logger.warn("Failed to remove a handler: " + ctx.name(), t2);
                }
            }
        }
}

@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().isRegistered()) {
            // This should always be true with our current DefaultChannelPipeline implementation.
            // The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
            // suprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
            // will be added in the expected order.
            initChannel(ctx);
        }
}

initChannel就是执行的就是我们在引导类覆盖实现的逻辑:

                bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                                pipeline.addLast("decode", new MessageDecoder());
                pipeline.addLast("encode", new ClientMessageEncoder());
            }
        });

这个时候NioSocketChannel的处理器链为:
head ==》ChannelInitializer==》MessageDecoder==》ClientMessageEncoder==》tail
会发现ChannelInitializer的作用只是用来添加处理器链,并不涉及消息体的处理,应该移除。
这就是remove0的处理,移除自身,代码如下:

    private static void remove0(AbstractChannelHandlerContext ctx) {
        AbstractChannelHandlerContext prev = ctx.prev;
        AbstractChannelHandlerContext next = ctx.next;
        prev.next = next;
        next.prev = prev;
    }

最终NioSocketChannel的处理器链为:
head ==》MessageDecoder==》ClientMessageEncoder==》tail
至此我们基本理清了ChannelPipeline的处理机制。

相关文章

网友评论

      本文标题:Netty源码笔记(二)ChannelPipeline的处理机制

      本文链接:https://www.haomeiwen.com/subject/clpbdqtx.html