美文网首页
ChannelPipeline

ChannelPipeline

作者: 水欣 | 来源:发表于2017-11-26 17:19 被阅读0次
  1. Channel 与 ChannelPipeline
    在Netty中每个Channel都有且仅有一个ChannelPipeline 与之对应,它们的组成关系如下:
    5.png
    通过上图我们可以看到,一个Channel包含了一个ChannelPipeline,而ChannelPipeline中又维护了一个由ChannelHandlerContext组成的双向链表。这个链表的头是HeadContext,链表的尾是TailContext,并且每个ChannelHandlerContext中又关联着一个ChannelHandler
    前面已经知道了一个Channel的初始化的基本过程,下面再回顾一下
    下面的代码是AbstractChannel构造器
protected AbstractChannel(Channel parent) {
    this.parent = parent;
    unsafe = newUnsafe();
    pipeline = new DefaultChannelPipeline(this);
}

AbstractChannel有一个pipeline字段,在构造器中会初始化它为DefaultChannelPipeline的实例,这里的代码就印证了一点:每个Channel都有一个ChannelPipeline
接着我们跟踪一下 DefaultChannelPipeline的初始化过程。
首先进入到DefaultChannelPipeline构造器中:

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    unsafe = newUnsafe();
    pipeline = new DefaultChannelPipeline(this);
}

DefaultChannelPipeline构造器中,首先将与之关联的Channel保存到字段channel中,然后实例化两个ChannelHandlerContext,一个是HeadContext实例head,一个是TailContext实例tail。接着将head和tail互相指向,构成一个双向链表。
特别注意到,我们在开始的示意图中,head和tail并没有包含ChannelHandler,这是因为HeadContextTailContext继承于AbstractChannelHandlerContext的同时也实现了ChannelHandler接口了,因此它们有Context和Handler的双重属性。

  1. ChannelPipeline的初始化再探
    在最开始介绍客户端时,我们已经对ChannelPipeline的初始化有了一个大致的了解,不过当时重点毕竟不在ChannelPipeline这里,因此没有深入地分析它的初始化过程。那么下面我们就来看一下具体的ChannelPipeline的初始化都做了那些工作吧
  2. ChannelPipeline 实例化过程
    我们再来回顾一下,在实例化一个Channel时,会伴随着一个ChannelPipeline的实例化,并且此Channel会与这个ChannelPipeline相互关联,这一点可以通过NioSocketChannel的父类AbstractChannel的构造器予以佐证:
protected AbstractChannel(Channel parent) {
    this.parent = parent;
    unsafe = newUnsafe();
    pipeline = new DefaultChannelPipeline(this);
}

当实例化一个Channel(这里以 EchoClient为例,那么Channel就是NioSocketChannel),其pipeline字段就是我们新创建的DefaultChannelPipeline对象,那么我们就来看一下DefaultChannelPipeline的构造方法吧。

public DefaultChannelPipeline(AbstractChannel channel) {
    if (channel == null) {
        throw new NullPointerException("channel");
    }
    this.channel = channel;

    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}

可以看到,在 DefaultChannelPipeline的构造方法中,将传入的channel复制给字段this.channel,接着又实例化了两个特殊的字段,tailhead。这两个字段是一个双向链表的头和尾。其实在DefaultChannelPipeline中,维护了一个以AbstractChannelHandlerContext为节点的双向链表,这个链表是Netty实现Pipeline机制的关键。
再回顾一下head和tail的类层次结构:

6.png
7.png

从类层次结构图中可以很清楚地看到,head实现了ChannelInboundHandler,而tail实现了ChannelInboundHandler接口,并且他们都实现了ChannelHandlerContext接口,因此可以说head和tail既是一个ChannelHandler,又是一个ChannelHandlerContext
接着看一下headContext的构造器

HeadContext(DefaultChannelPipeline pipeline) {
    super(pipeline, null, HEAD_NAME, false, true);
    unsafe = pipeline.channel().unsafe();
}

它调用了父类AbstractChannelHandlerContext的构造器,并传入参数inbound=false,outbound=true。
TailContext的构造器与HeadContext的相反,它调用了父类AbstractChannelHandlerContext的构造器,并传入参数Inbound=true,outbound=false。
即header是一个outboundHandler,而tail是一个`inboundHandler,关于这一点,大家要特别注意,因为在后面的分析中,我们会反复用到inbound和outbound这两个属性。

  1. ChannelInitializer的添加
    上面一小节中,我们已经分析了Channel的组成,其中我们了解到,最开始的时候ChannelPipeline中含有两个ChannelHandlerContext(同时也是ChannelHandler),但是这个Pipeline并不能实现什么特殊的功能,因为我们还没有给它添加自定义的ChannelHandler.
    通常来说,我们在初始号Bootstrap,会添加我们自定义的ChannelHandler,就以我们熟悉的EchoClient来举例吧
Bootstrap b = new Bootstrap();
b.group(group)
 .channel(NioSocketChannel.class)
 .option(ChannelOption.TCP_NODELAY, true)
 .handler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline p = ch.pipeline();
         p.addLast(new EchoClientHandler());
     }
 });

上面代码的初始化过程,相信大家都不陌生。在调用handler时,传入了ChannelInitializer对象,它提供了一个initChannel方法供我们初始化ChannelHandler,那么这个初始化过程是怎么样的呢?
ChannelInitializer实现了ChannelHandler,那么它是在什么时候添加到ChannelPipeline中的呢?进行了一番搜索后,发现它是在Bootstrap.init()中添加到ChannelPipeline中的。
其代码如下:

@Override
@SuppressWarnings("unchecked")
void init(Channel channel) throws Exception {
    ChannelPipeline p = channel.pipeline();
    p.addLast(handler());
    ...
}

上面的代码将handler()返回的ChannelHandler添加到Pipeline中,而handler()返回的是handler其实就是我们在初始化Bootstrap调用handler设置的ChannelInitializer实例,因此这里就是将ChannelInitializer插入到了Pipeline的末端,此时Pipeline的结构如下图所示

8.png
有朋友可能有疑惑了,我明明插入的是一个ChannelInitializer实例,为什么在ChannelPipeline中的双向链表中的元素确实一个ChannelHandlerContext?为了解答这个问题,我们继续在代码中寻找答案吧。
我们刚刚才提到,在Bootstrap.init中会调用p.addLast()方法,将ChannelInitializer插入到链表末端:
@Override
public ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) {
    synchronized (this) {
        checkDuplicateName(name); // 检查此 handler 是否有重复的名字

        AbstractChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler);
        addLast0(name, newCtx);
    }

    return this;
}

addLast有很多重载的方法,我们关注这个比较重要的方法就可以了。
上面的addLast方法中,首先检查这个ChannelHandler的名字是否是重复的,如果不重复的话,则为这个handler创建一个对应的DefaultChannelHandlerContext实例,并与之关联起来(Context中有一个handler属性保存着对应的Handler实例)。判断此Handler是否重名的方法很简单:Netty中有一个name2ctx Map字段,key是handler的名字,而value则是handler本身。因此通过如下代码就可以判断一个handler是否重名了:

private void checkDuplicateName(String name) {
    if (name2ctx.containsKey(name)) {
        throw new IllegalArgumentException("Duplicate handler name: " + name);
    }
}

为了添加一个handler到pipeline中,必须把此handler包装成ChannelHandlerContext,因此在上面的代码中我们可以看到新实例化了一个newCtx,并将handler作为参数传递到构造方法中,那么我们来看一下实例化的DefaultChannelHandlerContext到底有什么玄机吧。
首先看它的构造器

DefaultChannelHandlerContext(
        DefaultChannelPipeline pipeline, EventExecutorGroup group, String name, ChannelHandler handler) {
    super(pipeline, group, name, isInbound(handler), isOutbound(handler));
    if (handler == null) {
        throw new NullPointerException("handler");
    }
    this.handler = handler;
}

DefaultChannelHandlerContext的构造器中,调用了两个很有意思的方法:isInbound 与 isOutbound,这两个方法是做什么的呢?

private static boolean isInbound(ChannelHandler handler) {
    return handler instance ChannelInboundHandler;
}

private static boolean isOutbound(ChannelHandler handler) {
    return handler instance ChannelOutboundHandler;
}

从源码中可以看出,当一个handler实现了ChannelInboundHandler接口,则isInbound返回真;相似地,当一个handler实现了ChannelOutboundHandler接口,则isOutbound就返回真。
而这两个boolean变量会传递到父类AbstractChannelHandlerContext中,并初始化父类的两个字段:inbound与outbound。
那么这里的ChannelInitializer所对应的DefaultChannelHandlerContext的inbound与outbound字段分别是什么呢?那就看一下ChannelInitializer到底实现了哪个接口不就行了?如下是ChannelInitializer的类层次结构图:

1.png
可以清楚地看到。ChannelInitializer仅仅实现了ChannelInboundHandler接口,因此这里实例化的DefaultChannelHandlerContext的inbound=true,outbound=false。
不就是inbound和outbound两个字段吗,为什么需要这么大费周章地分析一番?其实这两个字段关系到pipeline的时间的流向与分类,因此是十分关键的。在这里我们只需要记住,ChannelInitializer所对应的DefaultChannelHandlerContext的inbound=true,outbound=false即可。
当创建好Context后,就将这个Context插入到Pipeline的双向链表中:
private void addLast0(final String name, AbstractChannelHandlerContext newCtx) {
    checkMultiplicity(newCtx);

    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev;
    newCtx.next = tail;
    prev.next = newCtx;
    tail.prev = newCtx;

    name2ctx.put(name, newCtx);

    callHandlerAdded(newCtx);
}

显然,这个胆码就是典型的双向链表的插入操作了。当调用了addLast()后,Netty就会将此handler添加到双向链表中tail元素之前的位置。

  1. 自定义ChannelHandler的添加过程
    在上一小节中,我们已经分析了一个ChannelInitializer如果插入到Pipeline中的,接下来就来探讨一下ChannelInitializer在哪里被调用,ChannelInitializer的作用,以及我们自定义的ChannelHandler是如何插入到Pipeline中的。
    在客户端一章的channel注册过程小节中,我们已经分析过Channel的注册过程了,这里我们再简单的复习一下:
  • 首先在AbstractBootstrap.initAndRegister中,通过group().register(channel),调用MultithreadEventLoopGroup.register()
  • MultithreadEventLoopGroup.register中,通过next()获取一个可用的SingleThreadEventLoop,然后调用它的register
  • AbstractUnsafe.register()中,调用register()注册Channel
  • AbstractUnsafe.register0中,调用AbstractNioChannel#doRegister()
  • AbstractNioChannel.doRegister()通过javaChannel.register(eventLoop.selector(),0,this)将Channel对应的Java NIO SocketChannel注册到一个eventLoop的Selector中,并且将当前的Channel作为attachment.
    而我们自定义的ChannelHandler的添加过程,发生在AbstractUnsafe.register0()中,在这个方法中调用了pipeline.fireChannelRegistered()方法,其实现如下
@Override
public ChannelPipeline fireChannelRegistered() {
   head.fireChannelRegistered();
   return this;
}

上面的代码很简单,就是调用了head.fireChannelRegistered()而已

关于上面代码的 `head.fireXXX`的调用形式,是Netty中Pipeline传递事件的常用方式,我们以后会经常看到

还记得head的类层次结构图不,head是一个AbstractChannelHandlerContext实例,并且它没有重写fireChannelRegister(),因此head.fireChannelRegistered()其实是调用AbstractChannelHandlerContext.fireChannelRegistered():

@Override
public ChannelHandlerContext fireChannelRegistered() {
    final AbstractChannelHandlerContext next = findContextInbound();
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeChannelRegistered();
    } else {
        executor.execute(new OneTimeTask() {
            @Override
            public void run() {
                next.invokeChannelRegistered();
            }
        });
    }
    return this;
}

这个方法的第一句是调用findContextInbound获取一个Context,那么它返回的Context到底是什么呢?

private AbstractChannelHandlerContext findContextInbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.next;
    } while (!ctx.inbound);
    return cox;
}

很显 然,这个代码会从head开始遍历Pipeline的双向链表,然后找到第一个属相inbound 为 true的ChannelHandlerContext实例。我们在前面分析ChannelInitializer时,花了很大的笔墨来分析了inbound 和 outbound的属相,在这里用上了,回想一下,ChannelInitializer实现了ChannelInboundHandler,因此它所对应的ChannelHandlerContext的inbound属相就是true,因此这里返回就是ChannelInitializer实例所对应的ChannelHandlerContext,即 ![2.png](https://img.haomeiwen.com/i6734122/d22406bfdb315a7b.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 当获取到inbound的Context后,就调用它的invokeChannelRegister`方法:

private void invokeChannelRegistered() {
    try {
        ((ChannelInboundHandler) handler()).channelRegistered(this);
    } catch (Throwable t) {
        notifyHandlerException(t);
    }
}

我们已经强调过了,每个ChannelHandler都与一个ChannelHandlerContext关联,我们可以通过ChannelHandlerContext获取对应的ChannelHandler,因此很明显,这里handler()返回的,其实就是一开始我们实例化的ChannelInitializer对象,并接着调用了ChannelInitializer.channelRegistered()。看到这里,读者朋友是否觉得有点眼熟呢?ChannelInitializer.channelRegister这个方法我们在第一章的时候已经大量接触了,但是并没有深入地分析这个方法的调用过程
这个方法中又有什么玄机呢?

@Override
@SuppressWarnings("unchecked")
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    initChannel((C) ctx.channel());
    ctx.pipeline().remove(this);
    ctx.fireChannelRegistered();
}

initChannel()这个方法我们很熟悉了吧,它就是我们在初始化Bootstrap时,调用handler方法传入的匿名内部类所实现的方法

.handler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline p = ch.pipeline();
         p.addLast(new EchoClientHandler());
     }
 });

因此当调用了这个方法后,我们自定义的ChannelHandler就插入到Pipeline了,此时的Pipeline如下图所示

3.png
当添加了自定义的ChannelHandler后,会删除ChannelInitializer这个ChannelHandler,即 cox.pipeline().remove(this),incident最后的Pipeline如下
4.png
  1. ChannelHandler的名字
    我们注意到, pipeline.addXXX都有一个重载的方法,例如addLast,它有一个重载的版本是:
ChannelPipeline addLast(String name, ChannelHandler handler);

第一参数指定了所添加的handler的名字(更准确地说是ChannelHandlerContext的名字,不过我们通常是以handler作为叙述,因此说成handler的名字便于理解),那么handler的名字有什么用呢?如果我们不设置name,那么handler会有怎样的名字,为了解答这些疑惑,还是从源码中找到答案
以addLast()为例

@Override
public ChannelPipeline addLast(String name, ChannelHandler handler) {
    return addLast(null, name, handler);
}

这个方法会调用重载的addLast方法

@Override
public ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) {
    synchronized (this) {
        checkDuplicateName(name);

        AbstractChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler);
        addLast0(name, newCtx);
    }

    return this;
}

第一个参数被设置为null ,我们不关心它,第二个参数就是这个handler的名字,看代码可知,在添加handler之前,需要调用checkDouplicateName方法来确定此handler的名字是否和已添加的handler的名字重复,而这个checkDuplicationName方法在前面已经有提到,这里再回顾一下:

private void checkDuplicateName(String name) {
    if (name2ctx.containsKey(name)) {
        throw new IllegalArgumentException("Duplicate handler name: " + name);
    }
}

Netty判断一个handler的名字是否重复的依据很简答:DefaultChannelPipeline中有一个类型为Map<String,AbstractChannelHandlerContext> 的name2ctx字段,它的key是一个handler的名字,而value则是这个handler所对应的ChannelHandlerContext,每当新添加一个handler时,就会put到name2ctx中。因此检查name2ctx中是否包含这个name即可,当没有重命名的handler时,就为这个handler生成一个关联的DefaultChannelHandlerContext对象,然后就将name和newCtx作为key-value对放到name2Ctx中。

  1. 自动生成handler的名字
    如果我们调动的是如下的addLast()
ChannelPipeline addLast(ChannelHandler... handlers);

那么Netty会调用generateName为我们的handler自动生成一个名字:

private String generateName(ChannelHandler handler) {
    WeakHashMap<Class<?>, String> cache = nameCaches[(int) (Thread.currentThread().getId() % nameCaches.length)];
    Class<?> handlerType = handler.getClass();
    String name;
    synchronized (cache) {
        name = cache.get(handlerType);
        if (name == null) {
            name = generateName0(handlerType);
            cache.put(handlerType, name);
        }
    }

    synchronized (this) {
        // It's not very likely for a user to put more than one handler of the same type, but make sure to avoid
        // any name conflicts.  Note that we don't cache the names generated here.
        if (name2ctx.containsKey(name)) {
            String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.
            for (int i = 1;; i ++) {
                String newName = baseName + i;
                if (!name2ctx.containsKey(newName)) {
                    name = newName;
                    break;
                }
            }
        }
    }

    return name;
}

而generateName会接着调用generateName0来实际产生一个handler的名字

private static String generateName0(Class<?> handlerType) {
    return StringUtil.simpleClassName(handlerType) + "#0";
}

自动生成的名字的规则很简单,就是handler的简单类名加上"#0",因此我们的EchoClientHandler的名字就是"EchoClientHandler#0"。

  1. 关于Pipeline的时间传输机制
    前面章节中,我们知道AbstractChannelHandlerContext中有inbound 和outbound两个boolean变量,分别用于标识Context所对应的handler的类型,即:
  • inbound为真时,表示对应的ChannelHandler实现了ChannelInboundHandler
  • outbound为真时,表示对应的Channelhandler实现了ChannelOutboundHandler
    读者朋友肯定很疑惑吧,那究竟这两个字段有什么作用呢?其实这还要从ChannelPipeline的传输的事件类型说起.
    Netty的事件可以分为Inbound和outbound事件
                                             I/O Request
                                        via Channel or
                                    ChannelHandlerContext
                                                  |
+---------------------------------------------------+---------------+
|                           ChannelPipeline         |               |
|                                                  \|/              |
|    +---------------------+            +-----------+----------+    |
|    | Inbound Handler  N  |            | Outbound Handler  1  |    |
|    +----------+----------+            +-----------+----------+    |
|              /|\                                  |               |
|               |                                  \|/              |
|    +----------+----------+            +-----------+----------+    |
|    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
|    +----------+----------+            +-----------+----------+    |
|              /|\                                  .               |
|               .                                   .               |
| ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
|        [ method call]                       [method call]         |
|               .                                   .               |
|               .                                  \|/              |
|    +----------+----------+            +-----------+----------+    |
|    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
|    +----------+----------+            +-----------+----------+    |
|              /|\                                  |               |
|               |                                  \|/              |
|    +----------+----------+            +-----------+----------+    |
|    | Inbound Handler  1  |            | Outbound Handler  M  |    |
|    +----------+----------+            +-----------+----------+    |
|              /|\                                  |               |
+---------------+-----------------------------------+---------------+
              |                                  \|/
+---------------+-----------------------------------+---------------+
|               |                                   |               |
|       [ Socket.read() ]                    [ Socket.write() ]     |
|                                                                   |
|  Netty Internal I/O Threads (Transport Implementation)            |
+-------------------------------------------------------------------+

从上图可以看到,inbound事件和outbound事件的流向是不一样的,inbound事件的流向是从下至上,而outbound刚好相反,是从上至下。并且inbound的传递方式是通过调用相应的ChannelHandlerContext.fireIN_EVT()方法,而outbound方法的传递方式是通过调用ChannelHandlerContext.OUT_EVT()方法。例如ChannelHandlerCountext.firChannelRegistered()调用会发送一个ChannelRegistered的inbound给下一个ChannelHandlerContext,而ChannelHandlerContext.bind调用会发送一个bind的outbound事件给下一个ChannelHandlerContext
inbound事件传播方法有:

ChannelHandlerContext.fireChannelRegistered()
ChannelHandlerContext.fireChannelActive()
ChannelHandlerContext.fireChannelRead(Object)
ChannelHandlerContext.fireChannelReadComplete()
ChannelHandlerContext.fireExceptionCaught(Throwable)
ChannelHandlerContext.fireUserEventTriggered(Object)
ChannelHandlerContext.fireChannelWritabilityChanged()
ChannelHandlerContext.fireChannelInactive()
ChannelHandlerContext.fireChannelUnregistered()

Outbound事件传输方法有:

ChannelHandlerContext.bind(SocketAddress, ChannelPromise)
ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
ChannelHandlerContext.write(Object, ChannelPromise)
ChannelHandlerContext.flush()
ChannelHandlerContext.read()
ChannelHandlerContext.disconnect(ChannelPromise)
ChannelHandlerContext.close(ChannelPromise)

注意,如果我们捕获了一个事件,并且想让这个事件继续传递下去,那么需要调用Context相应的传播方法
例如:

public class MyInboundHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        System.out.println("Connected!");
        ctx.fireChannelActive();
    }
}

public clas MyOutboundHandler extends ChannelOutboundHandlerAdapter {
    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
        System.out.println("Closing ..");
        ctx.close(promise);
    }
}

上面的例子中,MyInboundHandler收到了一个channelActive事件,它在处理后,如果希望将事件继续传播下去,那么需要接着调用ctx.fireChannelActive()

  1. Outbound操作
    Outbound事件都是请求时间,即请求某件事情的发生,然后通过Outbound事件进行通知。
    Outbound事件的传播方法是tail -> customContext -> head.
    我们接下来以connect事件为例,分析一下Outbound事件的传播机制
    首先,当用户调用了 Bootstrap.connect()时,就会触发一个Connect请求事件,此调用会触发如下调用链
Bootstrap.connect -> Boostracp.doConnect  -> Bootstrap.doConnect0 -> AbstractChannel.connect

继续跟踪的话,我们就发现,AbstractChannel.connect其实由调用了DefaultChennelPipeline.connect方法

@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
    return pipeline.connect(remoteAddress, promise);
}

而pipeline.connect的实现如下

@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
    return tail.connect(remoteAddress, promise);
}

可以看到,当outbound事件(这里是connect事件)传递到Pipeline后,它其实是以tail为起点开始传播的,而tail.connect其实调用的就是AbstractChannelHandlerContext.connect()

@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
    return pipeline.connect(remoteAddress, promise);
}

而pipeline.connect()的实现如下:

@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
    return tail.connect(remoteAddress, promise);
}

可以看到,当outbound事件(这里是connect事件)传递到Pipeline后,它其实是以tail为起点开始传播的。
而tail.connect其实调用的是AbstractChannelHandlerCounxt.connect

@Override
public ChannelFuture connect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    ...
    final AbstractChannelHandlerContext next = findContextOutbound();
    EventExecutor executor = next.executor();
    ...
    next.invokeConnect(remoteAddress, localAddress, promise);
    ...
    return promise;
}

findContextOutbound()顾名思义,它的作用是以当前Context为起点,向Pipeline中的Context双向链表的前端寻找第一个outbound属相为真的Context(即关联着ChannelOutboundHandler的Context),然后返回。
它的实现如下

private AbstractChannelHandlerContext findContextOutbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.prev;
    } while (!ctx.outbound);
    return cox;
}

当我们找到一个outbound的Context后,就调用它的invokeConnect(),这个方法中调用Context所关联着的ChannelHandler的connect方法:

private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
    try {
        ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
    } catch (Throwable t) {
        notifyOutboundHandlerException(t, promise);
    }
}

如果用户没有重写ChannelHandler的connect方法,那么会调用ChannelOutboundHandlerAdapter所实现的方法:

@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
        SocketAddress localAddress, ChannelPromise promise) throws Exception {
    ctx.connect(remoteAddress, localAddress, promise);
}

我们看到,ChannelOutboundHandlerAdapter.connect仅仅调用了cox.connect,而这个调用又回到了

Context.connect -> Connect.findContextOutbound -> next.invokeChannel -> handler.connect -> Context.connect

这样的循环找那个,直到connect时间传递到DefaultChennelPipeline的双向链表的头结点,即head中。为什么会传递到head中呢?回想一下,head实现了 ChannelOutboundHandler,因此它的outbound属性为true.
因为head本身即是一个ChannelHandlerContext,又实现了ChannelOutboundHandler接口,因此当connect消息传递到head后,会将消息传递到对应的ChannelHandler中处理,而恰好,head的header()返回的就是head本身:

@Override
public ChannelHandler handler() {
    return this;
}

因此最终connect事件是在head中处理的。head的connect事件处理方法如下:

@Override
public void connect(
        ChannelHandlerContext cox,
        SocketAddress remoteAddress, SocketAddress localAddress,
        ChannelPromise promise) throws Exception {
    unsafe.connect(remoteAddress, localAddress, promise);
}

到这里,整个Connect请求事件就结束了,下面一幅图来描述整个Connect请求的处理过程:


5.png

我们仅仅以Connect请求事件为例,分析 了Outbound时间的传播过程,但是其实所有的outbound的事件传播都遵循着一样的传播规律。

  1. inbound事件
    Inbound事件和Outbound事件的处理过程有点类似。
    Inbound事件是一个通知时间,即某件事已经发生了,然后通过Inbound事件进行通知。Inbound通常发生在Channel的状态的改变或IO事件就绪。
    Inbound的特点是它传播方向是head -> customContext -> tail。
    既然上面我们分析了Connect这个Outbound事件,那么接着分析Connect事件后会发生什么Inbound事件,并最终找到OutboundInbound事件之间的联系。
    Connect这个Outbound传播到unsafe后,其实是在AbstractNioUnsafe.connect方法中进行处理的。
@Override
public final void connect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    ...
    if (doConnect(remoteAddress, localAddress)) {
        fulfillConnectPromise(promise, wasActive);
    } else {
        ...
    }
    ...
}

AbstractNioUnsafe.connect中,首先调用doConnect方法进行实际上的Socket连接,当连接上后,会调动fulfillConnectPromise():

private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
    ...
    // Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
    // because what happened is what happened.
    if (!wasActive && isActive()) {
        pipeline().fireChannelActive();
    }
    ...
}

我们看到,在fulfillConnectPromise中,会通过调用pipeline().fireChannelActive()将通道激活的消息(即Socket连接成功)发送出去。而这里,当调用pipeline.fireXXX后,就是 Inbound事件的起点。
因此当调用了pipeline().fireChannelActive()后,就产生了一个ChannelActive Inbound事件,我们就从这里开始看看这个Inbound事件是怎么传播的吧。

public ChannelPipeline fireChannelActive() {
    head.fireChannelActive();

    if (channel.config().isAutoRead()) {
        channel.read();
    }

    return this;
}

果然,在fireChannelActive()中,调用的是head.fireChannelActive,因此可以证明了,Inbound事件在Pipeline中传输的起点是head.
那么,在head.fireChannelActive()中又做了什么呢?

@Override
public ChannelHandlerContext fireChannelActive() {
    final AbstractChannelHandlerContext next = findContextInbound();
    EventExecutor executor = next.executor();
    ...
    next.invokeChannelActive();
    ...
    return this;
}

回想一下在Outbound事件(例如Connect事件)的传输过程中时,我们也有类似的操作

  • 首先调用findContextInbound,从 Pipeline的双向链表中找到第一个属性inbound为true的 Context,然后返回
  • 调用这个ContextinvokeChannelActive
    invokeChannelActive方法如下
private void invokeChannelActive() {
    try {
        ((ChannelInboundHandler) handler()).channelActive(this);
    } catch (Throwable t) {
        notifyHandlerException(t);
    }
}

这个方法和Outbound的对应方法(例如invokeConnect)如出一辙。同Outbound一样,如果用户没有重写channelActive(),那么调用ChannelInboundHandlerAdapterchannelActive方法

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    ctx.fireChannelActive();
}

同样地,在ChannelInboundHandlerAdapter.channelActive中,仅仅调用了cox.fireChannelActive方法,因此就会有如下循环

context.fireChannelActive -> Connect.findContextInbound -> nextContext.invokeChannelActive -> nextHandler.channelActive -> nextContext.fireChannelActive

这样的循环中。同理,tail本身即实现了ChannelInboundHander接口,又实现了ChannelHandlerContext接口,因此当channelActive消息传递到tail后,会将消息传递到对应的ChannelHandler中处理,而恰好,tail的handler()返回的就是tail本身

@Override
public ChannelHandler handler() {
    return this;
}

因此, channelActive inbound时间最终是在tail中处理的,我们看一下它的处理方法:

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { }

TailContext.channelActive方法是空的。入股读者自定查看TailContext的inbound处理方法时,会发现,它们的实现都是空的。课件,如果是inbound,当用户没有实现自定义的处理器时,那么默认是不处理的
用一幅图来总结以下Inbound的传输过程吧

6.png
  1. 总结
    对于Outbound事件:
  • Outbound事件是请求事件(由Context发起一个请求,并最终由 Unsafe处理这个请求)
  • Outbound事件的发起者是Channel
  • Outbound事件的处理者是Unsafe
  • Outbound事件在 Pipeline中的传输方向是tail -> head.
  • ChannelHandler中处理事件时,如果这个Handler不是最后一个Handler,则需要调用ctx.XXX(例如ctx.connect)将此时间继续传播下去,如果不这么做,那么此事件的传播会提前终止
  • Outbound事件流:Context.OUT_EVT -> Connect.findCountextOutbound -> nextContext.invokeOUT_EVT -> nextHandler.OUT_EVT -> nextContext.OUT_EVT
    关于inbound事件:
  • Inbound事件是通知事件,当某件事情已经就绪后,通知上层。
  • Inbound事件发起者是unsafe
  • Inbound事件的处理者是 Channel,如果用户没有实现自定义的处理方法,那么Inbound事件默认的处理者是TailContext,并且其实现方法是空实现
  • Inbound事件在Pipeline中传输方法是 head ->tail
  • ChannelHandler处理事件中,如果这个 Handler不是最后一个Handler,则需要调用ctx.fireIN_EVT(例如cox.fireChennalActive)将此事件继续传播下去,如果不这样做,那么此事件的传播会提前终止
  • Outbound事件流 : Context.fireIN_EVT -> Connect.findContextInbound -> nextContext.invokeIN_EVT -> nextHandler.IN_EVT -> nextContext,fireIN_EVT

相关文章

网友评论

      本文标题:ChannelPipeline

      本文链接:https://www.haomeiwen.com/subject/ckqkbxtx.html