美文网首页Magic Netty
Netty中的ChannelPipeline

Netty中的ChannelPipeline

作者: 逅弈 | 来源:发表于2018-05-15 09:40 被阅读2次

数据在网络中从端到端进行传输时,就是一次IO的过程,在这个过程中一般会有一个Channel。所有的IO操作例如bind、connect、read、write都会在这个Channel中进行。在Netty中每当一个Channel被创建时,系统会自动为其创建一个ChannelPipeline,整个IO的过程会在ChannelPipeline中贯穿。

什么是ChannelPipeline

ChannelPipeline是一个在 {@link Channel} 上处理或者拦截入站事件/出站操作的 {@link ChannelHandler}s 的集合
它为用户提供了全部的能力来控制事件的处理,并且保证了在pipeline中 {@link ChannelHandler}s 之间是怎样相互联系的

ChannelPipeline和Channel是紧密相连的,在Channel的生命周期中ChannelPipeline是唯一的。

除此以外,ChannelPipeline中还有Channel赖以处理各种IO事件的ChannelHandler。并且ChannelHandler是有方向的,包括InboundChannelHandlerOutboundChannelHandler,他们共同组成了ChannelPipeline中处理IO事件的基础。

ChannelPipeline中的ChannelHandler

ChannelPipeline中ChannelHandler之间的关系如下图所示:


img1.png

如上图所示,当入站事件到来的时候,Pipeline中的ChannelHandler的处理顺序是InboundHandler1-->InboundHandler2-->InboundOutboundHandler5;而出站事件的处理顺序与入站是相反的,顺序为:InboundOutboundHandler5-->OutboundHandler4-->OutboundHandler3

ChannelPipeline就像一个管道一样,数据ByteBuf就像管道中的,而处理数据的ChannelHandler就像管道中各种各样的过滤器。可是当一个过滤器处理过之后他是如何知道该传给其他哪个过滤器的呢?

上面我们已经知道了在ChannelPipeline中ChannelHandler是有方向的,当处理Inbound事件时是会跳过Outbound事件的,同理处理Outbound事件时也一样。但是仅仅根据ChannelHandler的类型是不够的,在ChannelPipeline中还有另外一个非常重要的类:ChannelHandlerContext

ChannelPipeline中的ChannelHandlerContext

正如皮管可以将一个个的水龙头连接起来,形成一条长长的水管一样。ChannelHandlerContext所做的就是将散落在ChannelPipeline中的ChannelHandler连接起来,使得每个ChannelHandler都不再孤立无援。

ChannelHandlerContext在ChannelPipeline中的结构可以用下图表示:


img2.png

首先每个ChannelPipeline在初始化的时候会构造两个特殊的ChannelHandlerContext:HeadContext,TailContext,在上图中分别用headtail表示。为什么说他们是特殊的两个ChannelHandlerContext呢?看一下他们的类构造:

final class HeadContext extends AbstractChannelHandlerContext 
          implements ChannelOutboundHandler, ChannelInboundHandler {
  private final Unsafe unsafe;
  HeadContext(DefaultChannelPipeline pipeline) {
      // inbound=false, outbound=true
      super(pipeline, null, HEAD_NAME, false, true);
      unsafe = pipeline.channel().unsafe();
      setAddComplete();
  }
}

final class TailContext extends AbstractChannelHandlerContext 
        implements ChannelInboundHandler {
    TailContext(DefaultChannelPipeline pipeline) {
        // inbound=true, outbound=false
        super(pipeline, null, TAIL_NAME, true, false);
        setAddComplete();
    }
}

可以看到HeadContext和TailContext都继承了AbstractChannelHandlerContext,HeadContext实现了ChannelOutboundHandler和ChannelInboundHandler,TailContext实现了ChannelInboundHandler。并且他们在调用父类的构造方法时,传入的参数TailContext为:inbound=true, outbound=false,HeadContext为:inbound=false, outbound=true,由此可见HeadContext是一个outbound类型的ChannelHandler,TailContext是一个inbound类型的ChannelHandler。

再看ChannelPipeline的默认实现类DefaultChannelPipeline的构造方法:

protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);

    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}

由构造函数可见,head指向了tail,tail指向了head,这样就构成了一个双向的链表,如下图所示:

img3.png

另外需要注意的是,这两个Context是Netty默认添加到Pipeline中的,当我们不往Pipeline中添加任何ChannelHandler的时候,Pipeline仍然能保证一个完整的链表。当我们需要往Pipeline中添加一些ChannelHandler的时候,此时调用的其实是这个方法:

@Override
public final ChannelPipeline addFirst(String name, ChannelHandler handler) {
    return addFirst(null, name, handler);
}

最终调用的是以下的方法:

@Override
public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        checkMultiplicity(handler);
        name = filterName(name, handler);
        // 将ChannelHandler包装为一个ChannelHandlerContext
        newCtx = newContext(group, name, handler);
        // 将新的Context插入到链表中
        addFirst0(newCtx);
        // 以下省略部分代码
    }
    callHandlerAdded0(newCtx);
    return this;
}

private void addFirst0(AbstractChannelHandlerContext newCtx) {
    // 获得原本链表中表头的下一个节点
    AbstractChannelHandlerContext nextCtx = head.next;
    // 将新节点插入到表头后面
    newCtx.prev = head;
    // 将原来表头的下一个节点插入到新节点的后面
    newCtx.next = nextCtx;
    // 表头的next指向新节点
    head.next = newCtx;
    // 原表头的下一个节点的prev指向新节点
    nextCtx.prev = newCtx;
}

可以看到调用了addFirst方法后,会将ChannelHandler包装成一个ChannelHandlerContext,然后添加到链表中head节点的后面。添加完之后整个ChannelPipeline变成下图这样:

img4.png

相应的调用addLast方法,会把ChannelHandler包装成一个ChannelHandlerContext,然后把它添加到链表中tail节点的前面。

数据在ChannelPipeline中的传递

知道了ChannelPipeline中ChannelHandlerContext和ChannelHandler的关系以及结构之后,我们就很容量理解数据在ChannelPipeline中是如何进行传递的了。
我们以channelRead方法为例,在ChannelInboundHandlerAdapter中channelRead方法,如下所示:

public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter 
      implements ChannelInboundHandler {

  @Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
      // 触发ctx的fireChannelRead方法
      ctx.fireChannelRead(msg);
  }
}

可以看到实际调用的是ChannelHandlerContext的fireChannelRead方法,我们看一下这个方法:

@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
    // 获取下一个inboundContext并向后传递msg
    invokeChannelRead(findContextInbound(), msg);
    return this;
}

private AbstractChannelHandlerContext findContextInbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        // 获取pipeline中下一个inbound的ctx
        ctx = ctx.next;
    } while (!ctx.inbound);
    return ctx;
}

static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    // touch一个msg
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    // 获取ctx的执行器,并执行
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        // 调用ctx的invokeChannelRead(Object msg)方法
        next.invokeChannelRead(m);
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}

private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
        try {
            // 获取ChannelHandler之后,调用handler的channelRead方法
            // 至此就把数据从一个ChannelHandler传递到了下一个ChannelHandler
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        fireChannelRead(msg);
    }
}

简单点说,就是InboundChannelHandlerA想把数据msg传递给他的下一个InboundChannelHandlerB,那么整个流程就如下图所示:

img5.png

我是逅弈,如果文章对您有帮助,欢迎您点赞加关注,并欢迎您关注我的公众号:

欢迎关注微信公众号

相关文章

网友评论

    本文标题:Netty中的ChannelPipeline

    本文链接:https://www.haomeiwen.com/subject/cmtnrftx.html