美文网首页
4-netty源码分析之Pipeline

4-netty源码分析之Pipeline

作者: 致虑 | 来源:发表于2018-09-25 19:29 被阅读0次

    4-netty源码分析之Pipeline

    先用一张图来描叙下netty的piepline


    image.png

    由图可以看得出来,netty的Pipeline其实是由ChannelHandlerContext组成的一个双向链表,ChannelHandlerContext这个又是什么呢,由名字就可以知道是ChannelHandler的context,也就可以理解为channel handler执行的上下文,提供给handler执行时所需要的资源。

    那么进一步理解,其实netty 的 Pipeline其实是一个由Handler组成的双向链表。那么我们一步步分析下这个Pipeline。


    1.Pipeline初始化

    其实这里很简单,Pipeline是什么时候初始话的呢?初始化的时候又是个什么样子呢?
    其实在介绍server时就讲到了,先看一段代码[AbstractChannel]:

    protected AbstractChannel(Channel parent) {
        this.parent = parent;
    
        /** AbstractNioByteChannel.NioByteUnsafe 内部类,newUnsafe()具体的子类实现*/
        unsafe = newUnsafe();
    
        /** Each channel has its own pipeline and it is created automatically when a new channel is created. */
        pipeline = newChannelPipeline();
    }
    

    这个就是在server启动的时初始化channel所要执行的代码。其中就初始化了ChannelPipeline

    pipeline = newChannelPipeline();
    

    注释也很清楚

    Each channel has its own pipeline and it is created automatically when a new channel is created.

    我们继续:

    protected DefaultChannelPipeline newChannelPipeline() {
        return new DefaultChannelPipeline(this);
    }
    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
    
        /** 维护了一个以 AbstractChannelHandlerContext 为节点的双向链表 */
    
        tail = new TailContext(this);
        head = new HeadContext(this);
    
        head.next = tail;
        tail.prev = head;
    }
    

    可以看到这里创建的是DefaultChannelPipeline,也看到了我们想要看到的双向链表的“指针”。
    DefaultChannelPipeline 实现于 ChannelPipeline,可以看到ChannelPipeline的注释中画的很清楚:

     *                                                 I/O Request
     *                                            via {@link Channel} or
     *                                        {@link ChannelHandlerContext}
     *                                                      |
     *  +---------------------------------------------------+---------------+
     *  |                           ChannelPipeline         |               |
     *  |                                                  \|/              |
     *  |    +---------------------+            +-----------+----------+    |
     *  |    | Inbound Handler  N  |            | Outbound Handler  1  |    |
     *  |    +----------+----------+            +-----------+----------+    |
     *  |              /|\                                  |               |
     *  |               |                                  \|/              |
     *  |    +----------+----------+            +-----------+----------+    |
     *  |    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
     *  |    +----------+----------+            +-----------+----------+    |
     *  |              /|\                                  .               |
     *  |               .                                   .               |
     *  | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
     *  |        [ method call]                       [method call]         |
     *  |               .                                   .               |
     *  |               .                                  \|/              |
     *  |    +----------+----------+            +-----------+----------+    |
     *  |    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
     *  |    +----------+----------+            +-----------+----------+    |
     *  |              /|\                                  |               |
     *  |               |                                  \|/              |
     *  |    +----------+----------+            +-----------+----------+    |
     *  |    | Inbound Handler  1  |            | Outbound Handler  M  |    |
     *  |    +----------+----------+            +-----------+----------+    |
     *  |              /|\                                  |               |
     *  +---------------+-----------------------------------+---------------+
     *                  |                                  \|/
     *  +---------------+-----------------------------------+---------------+
     *  |               |                                   |               |
     *  |       [ Socket.read() ]                    [ Socket.write() ]     |
     *  |                                                                   |
     *  |  Netty Internal I/O Threads (Transport Implementation)            |
     *  +-------------------------------------------------------------------+
    

    这里InBound和OutBound前面有讲过,代表pipeline中handler的流向,基本分为从socket中读数据 到 ByteBuf 与 将ByteBuf中的数据写入Socket。

    我们继续回到上面的构造器:

    • 1.先将channel赋值
    • 2.创造tail节点
    • 3.创建head节点
    • 4.将头尾节点先后串起来形成双向链表

    那么刚刚初始化好的Pipeline是这个样子的:


    image.png

    那么可以猜想,链表既然已经形成,我们的业务有需要很多各种Handler,那么这里是哪里取添加的呢?这个问题后面再解释,但大概已经知道添加无非就是形成一个HandlerContext,然后在head与tail之间改变prev与next指针的指向了。

    那么我们在细看下HeadContext与TailContext


    image.png

    从图中就首先关注下Unsafe成员,前面讲过,netty底层的所有相关操作都是由Unsafe去完成的,那么自然可以猜想,当pipeline中处理器流向head时最终会调用Unsafe去处理底层相关的操作,那么究竟是不是这样的呢?

    我们从业务角度解释一下这个猜想:
    Pipeline是所有Handler的链路汇总,业务在写自己的Handler时一般只与业务相关,比如编解码、序列化等,那么数据最终会向下一个handler传递,最终会落到head或者tail 节点,那么这里自然会将数据写出或者读入了,因此这里的Unsafe存在自然就很合适了。

    那么head作为第一个节点,数据首次流入自然第一个经过head了,我们跟着上篇的NioEventLoop进行debug一下,还记得处理事件的入口:

    processSelectedKeys();
    
    /** boos reactor处理新的连接   或者 worker reactor 处理 已存在的连接有数据可读 */
    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
    
        /** AbstractNioByteChannel中实现,重点 */
        unsafe.read();
    }
    

    前面讲过NioMessageUnsafe处理连接相关事件,NioByteUnsafe处理后续的读写事件,那么我们跟踪数据读取:

    @Override
    public final void read() {
        final ChannelConfig config = config();
        if (!config.isAutoRead() && !isReadPending()) {
            // ChannelConfig.setAutoRead(false) was called in the meantime
            removeReadOp();
            return;
        }
    
        final ChannelPipeline pipeline = pipeline();
    
        /** 创建ByteBuf分配器 */
        final ByteBufAllocator allocator = config.getAllocator();
        final int maxMessagesPerRead = config.getMaxMessagesPerRead();
        RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
        if (allocHandle == null) {
            this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
        }
    
        ByteBuf byteBuf = null;
        int messages = 0;
        boolean close = false;
        try {
            int totalReadAmount = 0;
            boolean readPendingReset = false;
            do {
    
                /** 分配一个ByteBuf */
                byteBuf = allocHandle.allocate(allocator);
                int writable = byteBuf.writableBytes();
    
                /** 委托到外部类NioSocketChannel读, 将数据读取到分配的ByteBuf中去 */
                int localReadAmount = doReadBytes(byteBuf);
                if (localReadAmount <= 0) {
                    // not was read release the buffer
                    byteBuf.release();
                    byteBuf = null;
                    close = localReadAmount < 0;
                    if (close) {
                        // There is nothing left to read as we received an EOF.
                        setReadPending(false);
                    }
                    break;
                }
                if (!readPendingReset) {
                    readPendingReset = true;
                    setReadPending(false);
                }
    
                /**
                 * pipeline.fireChannelRead 正好 ChannelPipeline 中的 inbound 事件起点.
                 * 当调用了 pipeline.fireIN_EVT() 后, 那么就产生了一个 inbound 事件, 此事件会以 head -> customContext -> tail 的方向依次流经 ChannelPipeline 中的各个 handler.
                 * 调用了 pipeline.fireChannelRead 后, 就是 ChannelPipeline 中所需要做的工作了
                 */
                pipeline.fireChannelRead(byteBuf);
                byteBuf = null;
    
                if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
                    // Avoid overflow.
                    totalReadAmount = Integer.MAX_VALUE;
                    break;
                }
    
                totalReadAmount += localReadAmount;
    
                // stop reading
                if (!config.isAutoRead()) {
                    break;
                }
    
                if (localReadAmount < writable) {
                    break;
                }
            } while (++ messages < maxMessagesPerRead);
    
            /**
             * @see DefaultChannelPipeline#fireChannelRead
             */
            pipeline.fireChannelReadComplete();
            allocHandle.record(totalReadAmount);
    
            if (close) {
                closeOnRead(pipeline);
                close = false;
            }
        } catch (Throwable t) {
            handleReadException(pipeline, byteBuf, t, close);
        } finally {
            ...
        }
    }
    

    划重点:

    pipeline.fireChannelRead(byteBuf);
    

    pipeline传递读事件开始

    @Override
    public final ChannelPipeline fireChannelRead(Object msg) {
        AbstractChannelHandlerContext.invokeChannelRead(head, msg);
        return this;
    }
    

    可以看到事件第一流转的head节点。看看head做了什么?

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ctx.fireChannelRead(msg);
    }
    

    此时head也就是简单的传递下读事件,没做其他事情。
    继续回到主代码中:

    pipeline.fireChannelReadComplete();
    

    经过一些列的传递之后:

    /** 继续向reactor线程注册读事件 */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelReadComplete();
        readIfIsAutoRead();
    }
    private void readIfIsAutoRead() {
        if (channel.config().isAutoRead()) {
            channel.read();
        }
    }
    @Override
    public final ChannelPipeline read() {
        tail.read();
        return this;
    }
    @Override
    public ChannelHandlerContext read() {
        final AbstractChannelHandlerContext next = findContextOutbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeRead();
        } else {
            Runnable task = next.invokeReadTask;
            if (task == null) {
                next.invokeReadTask = task = new Runnable() {
                    @Override
                    public void run() {
                        next.invokeRead();
                    }
                };
            }
            executor.execute(task);
        }
    
        return this;
    }
    @Override
    public void read(ChannelHandlerContext ctx) {
        unsafe.beginRead();
    }
    

    上面代码以调用链形式按顺序贴出,可以看到read开始从tail节点传递,传递完事件之后,进行了读注册,也就代表当前channel对读事件感兴趣,紧接着开启自动读取模式的,然后channel只要是活着,就可以连续读数据了。可以看到事件最后传递到head节点,最终会通过UnSafe#beginRead进行实际的读操作。

    @Override
    protected void doBeginRead() throws Exception {
        // Channel.read() or ChannelHandlerContext.read() was called
        if (inputShutdown) {
            return;
        }
    
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }
    
        readPending = true;
    
        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }
    

    可以看到这里doBeginRead也就是将readInterestOp在有必要的时候加上。
    head介绍到这里,其实tail也类似。这里比较简单,至于如何传播的细节,我们后面会记录


    2.pipeline中handler的添加或移除

    其实明白了pipeline的结构,就应该清楚添加或者移除节点的内部原理,那么何时触发?
    我们以启动server为例看一下:

    ServerBootstrap b = new ServerBootstrap();
    b.group(bossGroup, workerGroup)
     .channel(NioServerSocketChannel.class)
     .option(ChannelOption.SO_BACKLOG, 100)
     .handler(new LoggingHandler(LogLevel.INFO))
     .childHandler(new ChannelInitializer<SocketChannel>() {
         @Override
         public void initChannel(SocketChannel ch) throws Exception {
             ChannelPipeline p = ch.pipeline();
             if (sslCtx != null) {
                 p.addLast(sslCtx.newHandler(ch.alloc()));
             }
             p.addLast(new EchoServerHandler());
         }
     });
    

    这段demo启动时,在initChannel中有个

    p.addLast(new EchoServerHandler());
    

    这段逻辑触发时间前面有讲过,就是连接进入的时候,启动workGroup时机会进入这段逻辑。
    我们重点跟踪下本次的核心,添加节点。

    @Override
    public final ChannelPipeline addLast(ChannelHandler... handlers) {
        return addLast(null, handlers);
    }
    
    @Override
    public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
        if (handlers == null) {
            throw new NullPointerException("handlers");
        }
    
        for (ChannelHandler h: handlers) {
            if (h == null) {
                break;
            }
            addLast(executor, null, h);
        }
    
        return this;
    }
    

    找到核心代码:

    @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
    
            /** 检查此 handler 是否有重复的名字 */
            checkMultiplicity(handler);
    
            /** 为这个 Handler 创建一个对应的 DefaultChannelHandlerContext 实例, 并与之关联起来 */
            newCtx = newContext(group, filterName(name, handler), handler);
    
            addLast0(newCtx);
    
            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }
    
            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }
    
        /** 开始回调用户代码  */
        callHandlerAdded0(newCtx);
        return this;
    }
    

    顺序分解:

    • 1.检查是否重复添加
    • 2.检查是否重名,并生成一个名字
    • 2.new 一个context,将本次handler给context
    • 3.将context add 进piepline
    • 4.回调用户扩展代码

    逻辑再简单不过了,详细如下:

    private static void checkMultiplicity(ChannelHandler handler) {
        if (handler instanceof ChannelHandlerAdapter) {
            ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
    
            /**
             * 如果当前要添加的Handler是非共享的,并且已经添加过,那就抛出异常,否则,标识该handler已经添加
             * 一个Handler如果是sharable的,就可以无限次被添加到pipeline中,我们客户端代码如果要让一个Handler被共用,只需要加一个@Sharable标注即可,注解见:
             * @see ChannelHandlerAdapter#isSharable
             */
            if (!h.isSharable() && h.added) {
                throw new ChannelPipelineException(h.getClass().getName() + " is not a @Sharable handler, so can't be added or removed multiple times.");
            }
            h.added = true;
        }
    }
    

    如果想重复添加,只需要标识改handler是共享的,直接@Sharable即可。

    private String filterName(String name, ChannelHandler handler) {
        if (name == null) {
            return generateName(handler);
        }
    
        /**
         * 如果用户代码在添加Handler的时候指定了一个name,那么要做到事仅仅为检查一下是否有重复
         */
        checkDuplicateName(name);
        return name;
    }
    private String generateName(ChannelHandler handler) {
        Map<Class<?>, String> cache = nameCaches.get();
        Class<?> handlerType = handler.getClass();
        String name = cache.get(handlerType);
        if (name == null) {
            name = generateName0(handlerType);
            cache.put(handlerType, name);
        }
    
        // It's not very likely for a user to put more than one handler of the same type, but make sure to avoid
        // any name conflicts.  Note that we don't cache the names generated here.
        if (context0(name) != null) {
            String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.
            for (int i = 1;; i ++) {
                String newName = baseName + i;
    
                /**
                 * 检查name是否和已有的name有冲突,调用context0(),查找pipeline里面有没有对应的context
                 * 如果有,则一直往上生成name,比如:"类名#1","类名#2",...
                 */
                if (context0(newName) == null) {
                    name = newName;
                    break;
                }
            }
        }
        return name;
    }
    private AbstractChannelHandlerContext context0(String name) {
        AbstractChannelHandlerContext context = head.next;
        while (context != tail) {
            if (context.name().equals(name)) {
                return context;
            }
            context = context.next;
        }
        return null;
    }
    

    在生成context之前,先检查重名,若没有传入名字直接生成一个名字,eg;类名#0,
    但如果冲突,,就会一直往下生成,eg:类名#1...
    上面context方法也简单,就是遍历双向链表,看看是否名字有冲突。
    如果有指定名字,那就直接检查一下是否名字冲突就OK。

    private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }
    

    最后add进链表,跟之前讲的一样,无非是该表prev跟next的指向罢了。此时的piepiline就是这样


    image.png

    移除原理类似,这里不多记录。

    可以看到add跟remove是可以动态调用的,也就是这个Piepline是可以动态编排的,这一点即简单又强大。


    3.pipeline事件传播机制

    记录完了piepline的初始化跟后期维护,那么接下来就是piepline事件传播的原理了。

    依然从下面的这行代码出发:

    pipeline.fireChannelReadComplete();
    

    其实在netty整个个代码里面,fireXXX之类的方法基本就属于触发事件的传递了,我们简单跟踪下逻辑。

    @Override
    public final ChannelPipeline fireChannelReadComplete() {
        AbstractChannelHandlerContext.invokeChannelReadComplete(head);
        return this;
    }
    
    static void invokeChannelReadComplete(final AbstractChannelHandlerContext next) {
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelReadComplete();
        } else {
            Runnable task = next.invokeChannelReadCompleteTask;
            if (task == null) {
                next.invokeChannelReadCompleteTask = task = new Runnable() {
                    @Override
                    public void run() {
                        next.invokeChannelReadComplete();
                    }
                };
            }
            executor.execute(task);
        }
    }
    
    private void invokeChannelReadComplete() {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelReadComplete(this);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelReadComplete();
        }
    }
    

    可以看到上叙逻辑第一个就找到了head节点,然后判断是否是EventLoop线程,如果是直接调用head的invokeChannelReadComplete,否则建立一个任务,扔进任务队列,等待EventLoop去执行处理。

    那直接到head中看看执行的逻辑:

    /** 继续向reactor线程注册读事件 */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelReadComplete();
        readIfIsAutoRead();
    }
    

    可以看到这里就是简单的传递事件,也就是说此时head不做任何处理,仅仅是传递Pipeline事件的开始,那么核心逻辑就在AbstractChannelHandlerContext里了,我们看看这里怎么去找下一个节点的。

    @Override
    public ChannelHandlerContext fireChannelReadComplete() {
        invokeChannelReadComplete(findContextInbound());
        return this;
    }
    

    主体逻辑递归回来了,这里只要记得递归处理就好,那么我们看看findContextInbound()找到的是哪个节点:

    /**
     * 从 head 开始遍历 Pipeline 的双向链表, 然后找到第一个属性 inbound 为 true 的 ChannelHandlerContext 实例 --> ChannelInitializer
     */
    private AbstractChannelHandlerContext findContextInbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.next;
        } while (!ctx.inbound);
        return ctx;
    }
    

    此处逻辑就是循环遍历双向链表,找到第一个inbound节点,找到之后继续执行如下逻辑:

     next.invokeChannelReadComplete();
    

    如此反复递归,找到知道实际处理的节点,执行真正的读写操作。

    因此事件传播在netty中还是比较容易理解的。

    其实异常的传递也是其中一个重点,道理类似,可以线下详细跟踪一番。

    相关文章

      网友评论

          本文标题:4-netty源码分析之Pipeline

          本文链接:https://www.haomeiwen.com/subject/ypaloftx.html