美文网首页
JavaNIO(七)ChannelPipeline和Channe

JavaNIO(七)ChannelPipeline和Channe

作者: 清雨季 | 来源:发表于2019-07-28 22:23 被阅读0次

    一 ChannelHandlerContext和ChannelHandler

    在ChannelPipeline这一层,ChannelPipeline会把收到的事件交给ChannelHandler处理。

    在外部,我们可以向ChannelPipeline中添加ChannelHandler处理器,但是添加到Pipeline中后,会被包装成一个ChannelHandlerContext对象:

        public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
                newCtx = newContext(group, filterName(name, handler), handler);
                addLast0(newCtx);
                //...
        }
    

    ChannelHandlerContext主要是做了下面两件事:

    • 封装了ChannelPipeline,Unsafe,Channel,等对象信息,方便处理事件时使用
    • 对ChannelHandler的调用做了异步处理,可以接受一个Executor对象,然后对应ChannelHandler的所有事件都会被这一个Executor对象执行。

    在ChannelPipeline中,把所有的ChannelHandlerContext放在一个链表中,本身记录了链表的头尾节点:

        final AbstractChannelHandlerContext head;
        final AbstractChannelHandlerContext tail;
    

    对于ChannelHandler中每一个事件相关的方法,ChannelHandlerContext中都有一个对应的fireXXX方法,这个方法用于向下一个ChannelHandler传递事件

    二 inbound事件和outbound事件

    Pipeline层把所有的事件分为两种:

    • 建立连接,读数据等客户端向服务端发送的事件称为inbound事件,这类事件会由链头开始向链尾传递处理。
    • 发送数据等由服务端向客户端进行操作的事件,称为outbound事件,这类事件会由链尾开始向链头传递处理。
     *                                                 I/O Request
     *                                            via {@link Channel} or
     *                                        {@link ChannelHandlerContext}
     *                                                      |
     *  +---------------------------------------------------+---------------+
     *  |                           ChannelPipeline         |               |
     *  |                                                  \|/              |
     *  |    +---------------------+            +-----------+----------+    |
     *  |    | Inbound Handler  N  |            | Outbound Handler  1  |    |
     *  |    +----------+----------+            +-----------+----------+    |
     *  |              /|\                                  |               |
     *  |               |                                  \|/              |
     *  |    +----------+----------+            +-----------+----------+    |
     *  |    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
     *  |    +----------+----------+            +-----------+----------+    |
     *  |              /|\                                  .               |
     *  |               .                                   .               |
     *  | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
     *  |        [ method call]                       [method call]         |
     *  |               .                                   .               |
     *  |               .                                  \|/              |
     *  |    +----------+----------+            +-----------+----------+    |
     *  |    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
     *  |    +----------+----------+            +-----------+----------+    |
     *  |              /|\                                  |               |
     *  |               |                                  \|/              |
     *  |    +----------+----------+            +-----------+----------+    |
     *  |    | Inbound Handler  1  |            | Outbound Handler  M  |    |
     *  |    +----------+----------+            +-----------+----------+    |
     *  |              /|\                                  |               |
     *  +---------------+-----------------------------------+---------------+
     *                  |                                  \|/
     *  +---------------+-----------------------------------+---------------+
     *  |               |                                   |               |
     *  |       [ Socket.read() ]                    [ Socket.write() ]     |
     *  |                                                                   |
     *  |  Netty Internal I/O Threads (Transport Implementation)            |
     *  +-------------------------------------------------------------------+
    

    二 ChannelPipeline的初始化过程

    ChannelPipeline使用双向链表维护了ChannlHandler的信息,所以初始化中最关键的,是初始化了双向队列的头尾节点:

        protected DefaultChannelPipeline(Channel channel) {
            //....
            tail = new TailContext(this);
            head = new HeadContext(this);
    
            head.next = tail;
            tail.prev = head;
        }
    

    这里TailContext和HeadContext都是ChannelHandler的实现类
    tail节点基本是空实现,或者仅仅打印一些日志:

        protected void onUnhandledInboundMessage(Object msg) {
            try {
                logger.debug(
                        "Discarded inbound message {} that reached at the tail of the pipeline. " +
                                "Please check your pipeline configuration.", msg);
            } finally {
                ReferenceCountUtil.release(msg);
            }
        }
    

    而这个head节点,也就是HeadContext对象,就做了很多事了。

    例如,上一节说的,当向Channel中写数据时,数据会先被ChannelPipeline处理,然后再交给Unsafe,把数据写出去。 其实就是在HeadContext中,把数据交给Unsafe写出去的:

            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
                unsafe.write(msg, promise);
            }
    

    由于写数据是outbound事件,所以会由tail节点开始处理,数据到达head节点时已经处理完了。

    三 处理一个事件的流程

    以channelRead事件为例:

    在ChannelPipeline#fireChannelRead方法中,首先调用head节点的channelRead方法进行处理:

        public final ChannelPipeline fireChannelRead(Object msg) {
            AbstractChannelHandlerContext.invokeChannelRead(head, msg);
            return this;
        }
    

    AbstractChannelHandlerContext.invokeChannelRead方法的第一个参数是ChannelHandler类型的,方法内部会执行这个ChannelHandler对象的channelRead方法:

        static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
            final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                next.invokeChannelRead(m);
            } else {
                executor.execute(() -> next.invokeChannelRead(m));
            }
        }
    
        private void invokeChannelRead(Object msg) {
            if (invokeHandler()) {
                try {
                    ((ChannelInboundHandler) handler()).channelRead(this, msg);
                } catch (Throwable t) {
                    notifyHandlerException(t);
                }
            } else {
                fireChannelRead(msg);
            }
        }
    

    这段代码的逻辑就是判断是否配置了专门的线程池,如果配置了,就用专门的线程池去调用。

    上面说过,head节点其实就是一个HeadContext对象,因此具体执行到的就是HeadContext#channelRead方法:

            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                ctx.fireChannelRead(msg);
            }
    

    方法内只是简单的调用了ctx.fireChannelRead(msg); 这行代码的作用就是向下一个ChannelHanderl传递事件。因此,如果在想在某个ChannelHandler中拦截事件,那么只需要不加上这一行代码即可。

    然后这个ctx.fireChannelRead方法中,会找到下一个ChannelHandler:

        public ChannelHandlerContext fireChannelRead(final Object msg) {
            invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
            return this;
        }
    

    找到后又调用invokeChannelRead触发下一个ChannelHandler的channelRead方法。这就回到了最开始的地方。

    一直这样循环处理,直到所有的ChannelHandler处理完毕。

    相关文章

      网友评论

          本文标题:JavaNIO(七)ChannelPipeline和Channe

          本文链接:https://www.haomeiwen.com/subject/mtverctx.html