美文网首页
netty源码分析(五) - pipeline

netty源码分析(五) - pipeline

作者: 进击的蚂蚁zzzliu | 来源:发表于2020-10-12 17:29 被阅读0次

概述

pipeline结构

DefaultChannelPipeline.png
  • ChannelInboundInvoker:主要是触发Pipeline上的下一个Inbound通道处理器ChannelInboundHandler的相关方法
  • ChannelOutboundInvoker:主要是触发Pipeline上的下一个Outbound通道处理器ChannelOutboundHandler的相关方法
  • DefaultChannelPipeline:pipeline默认实现,主要有两方面的功能,一是:操作由AbstractChannelHandlerContext作为结点的双向链表;二是:实现Inbound/Outbound接口方法作为pipeline处理的入口方法;
  • HeadContext:pipeline内部类,pipeline默认提供的头节点
  • TailContext:pipeline内部类,pipeline默认提供的尾节点

context/handler结构

Context.png
  • AbstractChannelHandlerContext:context基础类,由成员变量volatile AbstractChannelHandlerContext next; volatile AbstractChannelHandlerContext prev;构成双向链表结构;
  • ChannelHandler:context持有handler,具体业务逻辑委托给handler进行处理
  • 默认HeadContext/TailContext都实现了handler接口(即也是一个handler),因此其内部不需要handler的成员变量;用户自定义的handler最终都会被包装成DefaultChannelHandlerContext,会持有private final ChannelHandler handler;

下面开始详细讲解pipeline,主要分为一下三部分:

  1. pipeline初始化
  2. 添加/删除ChannelHannel
  3. 事件的传播

1. pipeline初始化

之前在分析NioServerSocketChannel和NioSocketChannel创建时都看到过pipeline初始化的代码pipeline = newChannelPipeline();

protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);
    tail = new TailContext(this);
    head = new HeadContext(this);
    head.next = tail;
    tail.prev = head;
}
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, Class<? extends ChannelHandler> handlerClass) {
    //name命名:DefaultChannelPipeline$TailContext或HeadContext#0
    this.name = ObjectUtil.checkNotNull(name, "name");
    this.pipeline = pipeline;
    this.executor = executor;
    this.executionMask = mask(handlerClass)
    //true
    ordered = executor == null || executor instanceof OrderedEventExecutor;
}
  • HeadContext:主要处理两部分,inbound相关(例如:read/write/flush等)会委托给unsafe进行操作;outbound会传递给下一个节点处理;需要注意的是channelActive/channelReadComplete会执行readIfIsAutoRead
  • TailContext:只实现了ChannelInboundHandler,基本都是空实现,主要做一些收尾工作,例如ReferenceCountUtil.release(cause)释放内存

2. 添加/删除ChannelHandler

DefaultChannelPipeline # addLast

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        //1. 重复性校验
        checkMultiplicity(handler);
        //2. 创建节点并添加至链表, filterName: name为空会重新生成(handler类名 + $x + #x), name不为空会检查name是否重复(从Head节点遍历比对name是否重复)
        newCtx = newContext(group, filterName(name, handler), handler);
        //3. 把新创建的newCtx添加到双向链表中
        addLast0(newCtx);
        EventExecutor executor = newCtx.executor();
        if (!executor.inEventLoop()) {
            //4. 回调添加完成事件
            callHandlerAddedInEventLoop(newCtx, executor);
            return this;
        }
    }
    //4. 回调添加完成事件
    callHandlerAdded0(newCtx);
    return this;
}
  1. checkMultiplicity重复性校验
  2. 创建节点并添加至链表, filterName: name为空会重新生成(handler类名 + $x + #x), name不为空会检查name是否重复(从Head节点遍历比对name是否重复)
  3. 把新创建的newCtx添加到双向链表中
  4. 回调添加完成事件

DefaultChannelPipeline # checkMultiplicity

private static void checkMultiplicity(ChannelHandler handler) {
    if (handler instanceof ChannelHandlerAdapter) {
        ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
        //如果没有isSharable注解,并且已经添加过,则报错
        if (!h.isSharable() && h.added) {
            throw new ChannelPipelineException(h.getClass().getName() + " is not a @Sharable handler, so can't be added or removed multiple times.");
        }
        h.added = true;
    }
}
  • 如果没有isSharable注解,并且已经添加过,则报错

AbstractChannelHandlerContext # callHandlerAdded 回调添加完成事件

final void callHandlerAdded() throws Exception {
    //cas设置状态为addComplete
    if (setAddComplete()) {
        //执行用户handler(ChannelInitializer)的handlerAdded方法
        handler().handlerAdded(this);
    }
}
  • cas设置状态为addComplete后执行用户handler(ChannelInitializer)的handlerAdded方法
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    if (ctx.channel().isRegistered()) {
        if (initChannel(ctx)) {
            removeState(ctx);
        }
    }
}

private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
    if (initMap.add(ctx)) {
        try {
            //执行用户ChannelInitializer中重写的initChannel方法
            initChannel((C) ctx.channel());
        } catch (Throwable cause) {
            exceptionCaught(ctx, cause);
        } finally {
            ChannelPipeline pipeline = ctx.pipeline();
            if (pipeline.context(this) != null) {
                //执行后remove保证只会在新连接(channel)初始化时执行一次
                pipeline.remove(this);
            }
        }
        return true;
    }
    return false;
}
  • 执行用户ChannelInitializer中重写的initChannel方法(通常是添加child handler)
  • 执行后remove掉ChannelInitializer保证只会在新连接(channel)初始化时执行一次
    删除ChannelHandler逻辑类似,这里就不做分析

3. 事件的传播

handler和adapter结构

ChannelInboundHandlerAdapter.png
  • ChannelHandler:3个回调方法handlerAdded/handlerRemoved/exceptionCaught
  • ChannelHandlerAdapter: ChannelHandler接口的默认实现
  • ChannelInboundHandlerAdapter:提供ChannelInboundHandler的默认实现,调用context中对应方法,用户自定义inboundHandler一般继承改类
  • ChannelOutboundHandlerAdapter:提供ChannelOutboundHandler的默认实现,调用context中对应方法,用户自定义outboundHandler一般继承改类
  • SimpleChannelInboundHandler:主要重写了ChannelInboundHandlerAdapte#channelRead方法,提供自动释放byteBuf功能

事件和异常传播方向

pipeline (2).png
  • ChannelDuplexHandler:双向事件
  • Inbound事件顺序为:Head > 1 > 2 > 5 > 6 > Tail,跟add顺序一致,且跳过Outbound事件
  • Outbound事件顺序为:Tail > 6 > 5 > 4 > 3 > Head,跟add顺序相反,且跳过Inbound事件
  • 异常事件顺序为:Head > 1 > 2 > 3 > 4 > 5 > 6 > Tail,跟add顺序一致,且经过所有事件

以如下EchoServer为例
read事件:

EchoServer:

.childHandler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline p = ch.pipeline();
         p.addLast(new LoggingHandler(LogLevel.INFO), serverHandler);
     }
});
@Sharable
public class EchoServerHandler extends ChannelDuplexHandler {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        System.out.println("----EchoServerHandler.channelReads-------");
        ctx.pipeline().write(msg);
    }

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        System.out.println("----EchoServerHandler.write-------");
        ctx.write(msg, promise);
    }
}

channelRead调用栈如下:

channelRead调用栈.png
  1. read事件首先在NioEventLoop的run方法中被从selector轮询到;
  2. 交给processSelectedKeys处理;
  3. 在AbstractNioByteChannel中调用pipeline.read;
  4. 在pipeline中执行inbound事件处理流程:HeadContext > LoggingHandler > EchoServerHandler
  • 注意:由于EchoServerHandler # channelRead中没有继续往下传播,所以调用栈中没有显示TailContext

write事件:

write调用栈.png
  1. EchoServerHandler # channelRead中调用执行ctx.pipeline().write(msg);作为write入口
  2. DefaultChannelPipeline中调用tail.write
  3. 在pipeline中执行outbound事件处理流程:TailContext > EchoServerHandler > LoggingHandler > HeadContext
  4. 最后在HeadContext中通过NioSocketChannelUnsafe # write出去
  • 注意:ctx.fireChannelRead等事件是从当前结点开始传播;ctx.channel().pipeline().fireChannelRead()等事件是从Head或Tail节点开始传播
    至此pipeline流程分析完毕

相关文章

网友评论

      本文标题:netty源码分析(五) - pipeline

      本文链接:https://www.haomeiwen.com/subject/drnspktx.html