美文网首页
netty源码分析(五) - pipeline

netty源码分析(五) - pipeline

作者: 进击的蚂蚁zzzliu | 来源:发表于2020-10-12 17:29 被阅读0次

    概述

    pipeline结构

    DefaultChannelPipeline.png
    • ChannelInboundInvoker:主要是触发Pipeline上的下一个Inbound通道处理器ChannelInboundHandler的相关方法
    • ChannelOutboundInvoker:主要是触发Pipeline上的下一个Outbound通道处理器ChannelOutboundHandler的相关方法
    • DefaultChannelPipeline:pipeline默认实现,主要有两方面的功能,一是:操作由AbstractChannelHandlerContext作为结点的双向链表;二是:实现Inbound/Outbound接口方法作为pipeline处理的入口方法;
    • HeadContext:pipeline内部类,pipeline默认提供的头节点
    • TailContext:pipeline内部类,pipeline默认提供的尾节点

    context/handler结构

    Context.png
    • AbstractChannelHandlerContext:context基础类,由成员变量volatile AbstractChannelHandlerContext next; volatile AbstractChannelHandlerContext prev;构成双向链表结构;
    • ChannelHandler:context持有handler,具体业务逻辑委托给handler进行处理
    • 默认HeadContext/TailContext都实现了handler接口(即也是一个handler),因此其内部不需要handler的成员变量;用户自定义的handler最终都会被包装成DefaultChannelHandlerContext,会持有private final ChannelHandler handler;

    下面开始详细讲解pipeline,主要分为一下三部分:

    1. pipeline初始化
    2. 添加/删除ChannelHannel
    3. 事件的传播

    1. pipeline初始化

    之前在分析NioServerSocketChannel和NioSocketChannel创建时都看到过pipeline初始化的代码pipeline = newChannelPipeline();

    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);
        tail = new TailContext(this);
        head = new HeadContext(this);
        head.next = tail;
        tail.prev = head;
    }
    
    AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, Class<? extends ChannelHandler> handlerClass) {
        //name命名:DefaultChannelPipeline$TailContext或HeadContext#0
        this.name = ObjectUtil.checkNotNull(name, "name");
        this.pipeline = pipeline;
        this.executor = executor;
        this.executionMask = mask(handlerClass)
        //true
        ordered = executor == null || executor instanceof OrderedEventExecutor;
    }
    
    • HeadContext:主要处理两部分,inbound相关(例如:read/write/flush等)会委托给unsafe进行操作;outbound会传递给下一个节点处理;需要注意的是channelActive/channelReadComplete会执行readIfIsAutoRead
    • TailContext:只实现了ChannelInboundHandler,基本都是空实现,主要做一些收尾工作,例如ReferenceCountUtil.release(cause)释放内存

    2. 添加/删除ChannelHandler

    DefaultChannelPipeline # addLast

    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            //1. 重复性校验
            checkMultiplicity(handler);
            //2. 创建节点并添加至链表, filterName: name为空会重新生成(handler类名 + $x + #x), name不为空会检查name是否重复(从Head节点遍历比对name是否重复)
            newCtx = newContext(group, filterName(name, handler), handler);
            //3. 把新创建的newCtx添加到双向链表中
            addLast0(newCtx);
            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                //4. 回调添加完成事件
                callHandlerAddedInEventLoop(newCtx, executor);
                return this;
            }
        }
        //4. 回调添加完成事件
        callHandlerAdded0(newCtx);
        return this;
    }
    
    1. checkMultiplicity重复性校验
    2. 创建节点并添加至链表, filterName: name为空会重新生成(handler类名 + $x + #x), name不为空会检查name是否重复(从Head节点遍历比对name是否重复)
    3. 把新创建的newCtx添加到双向链表中
    4. 回调添加完成事件

    DefaultChannelPipeline # checkMultiplicity

    private static void checkMultiplicity(ChannelHandler handler) {
        if (handler instanceof ChannelHandlerAdapter) {
            ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
            //如果没有isSharable注解,并且已经添加过,则报错
            if (!h.isSharable() && h.added) {
                throw new ChannelPipelineException(h.getClass().getName() + " is not a @Sharable handler, so can't be added or removed multiple times.");
            }
            h.added = true;
        }
    }
    
    • 如果没有isSharable注解,并且已经添加过,则报错

    AbstractChannelHandlerContext # callHandlerAdded 回调添加完成事件

    final void callHandlerAdded() throws Exception {
        //cas设置状态为addComplete
        if (setAddComplete()) {
            //执行用户handler(ChannelInitializer)的handlerAdded方法
            handler().handlerAdded(this);
        }
    }
    
    • cas设置状态为addComplete后执行用户handler(ChannelInitializer)的handlerAdded方法
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().isRegistered()) {
            if (initChannel(ctx)) {
                removeState(ctx);
            }
        }
    }
    
    private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        if (initMap.add(ctx)) {
            try {
                //执行用户ChannelInitializer中重写的initChannel方法
                initChannel((C) ctx.channel());
            } catch (Throwable cause) {
                exceptionCaught(ctx, cause);
            } finally {
                ChannelPipeline pipeline = ctx.pipeline();
                if (pipeline.context(this) != null) {
                    //执行后remove保证只会在新连接(channel)初始化时执行一次
                    pipeline.remove(this);
                }
            }
            return true;
        }
        return false;
    }
    
    • 执行用户ChannelInitializer中重写的initChannel方法(通常是添加child handler)
    • 执行后remove掉ChannelInitializer保证只会在新连接(channel)初始化时执行一次
      删除ChannelHandler逻辑类似,这里就不做分析

    3. 事件的传播

    handler和adapter结构

    ChannelInboundHandlerAdapter.png
    • ChannelHandler:3个回调方法handlerAdded/handlerRemoved/exceptionCaught
    • ChannelHandlerAdapter: ChannelHandler接口的默认实现
    • ChannelInboundHandlerAdapter:提供ChannelInboundHandler的默认实现,调用context中对应方法,用户自定义inboundHandler一般继承改类
    • ChannelOutboundHandlerAdapter:提供ChannelOutboundHandler的默认实现,调用context中对应方法,用户自定义outboundHandler一般继承改类
    • SimpleChannelInboundHandler:主要重写了ChannelInboundHandlerAdapte#channelRead方法,提供自动释放byteBuf功能

    事件和异常传播方向

    pipeline (2).png
    • ChannelDuplexHandler:双向事件
    • Inbound事件顺序为:Head > 1 > 2 > 5 > 6 > Tail,跟add顺序一致,且跳过Outbound事件
    • Outbound事件顺序为:Tail > 6 > 5 > 4 > 3 > Head,跟add顺序相反,且跳过Inbound事件
    • 异常事件顺序为:Head > 1 > 2 > 3 > 4 > 5 > 6 > Tail,跟add顺序一致,且经过所有事件

    以如下EchoServer为例
    read事件:

    EchoServer:
    
    .childHandler(new ChannelInitializer<SocketChannel>() {
         @Override
         public void initChannel(SocketChannel ch) throws Exception {
             ChannelPipeline p = ch.pipeline();
             p.addLast(new LoggingHandler(LogLevel.INFO), serverHandler);
         }
    });
    
    @Sharable
    public class EchoServerHandler extends ChannelDuplexHandler {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            System.out.println("----EchoServerHandler.channelReads-------");
            ctx.pipeline().write(msg);
        }
    
        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
            System.out.println("----EchoServerHandler.write-------");
            ctx.write(msg, promise);
        }
    }
    

    channelRead调用栈如下:

    channelRead调用栈.png
    1. read事件首先在NioEventLoop的run方法中被从selector轮询到;
    2. 交给processSelectedKeys处理;
    3. 在AbstractNioByteChannel中调用pipeline.read;
    4. 在pipeline中执行inbound事件处理流程:HeadContext > LoggingHandler > EchoServerHandler
    • 注意:由于EchoServerHandler # channelRead中没有继续往下传播,所以调用栈中没有显示TailContext

    write事件:

    write调用栈.png
    1. EchoServerHandler # channelRead中调用执行ctx.pipeline().write(msg);作为write入口
    2. DefaultChannelPipeline中调用tail.write
    3. 在pipeline中执行outbound事件处理流程:TailContext > EchoServerHandler > LoggingHandler > HeadContext
    4. 最后在HeadContext中通过NioSocketChannelUnsafe # write出去
    • 注意:ctx.fireChannelRead等事件是从当前结点开始传播;ctx.channel().pipeline().fireChannelRead()等事件是从Head或Tail节点开始传播
      至此pipeline流程分析完毕

    相关文章

      网友评论

          本文标题:netty源码分析(五) - pipeline

          本文链接:https://www.haomeiwen.com/subject/drnspktx.html