- Channel 与 ChannelPipeline
在Netty中每个Channel都有且仅有一个ChannelPipeline 与之对应,它们的组成关系如下:
5.png
通过上图我们可以看到,一个Channel
包含了一个ChannelPipeline
,而ChannelPipeline
中又维护了一个由ChannelHandlerContext
组成的双向链表。这个链表的头是HeadContext
,链表的尾是TailContext
,并且每个ChannelHandlerContext
中又关联着一个ChannelHandler
。
前面已经知道了一个Channel
的初始化的基本过程,下面再回顾一下
下面的代码是AbstractChannel
构造器
protected AbstractChannel(Channel parent) {
this.parent = parent;
unsafe = newUnsafe();
pipeline = new DefaultChannelPipeline(this);
}
AbstractChannel
有一个pipeline字段,在构造器中会初始化它为DefaultChannelPipeline
的实例,这里的代码就印证了一点:每个Channel
都有一个ChannelPipeline
接着我们跟踪一下 DefaultChannelPipeline
的初始化过程。
首先进入到DefaultChannelPipeline构造器中:
protected AbstractChannel(Channel parent) {
this.parent = parent;
unsafe = newUnsafe();
pipeline = new DefaultChannelPipeline(this);
}
在DefaultChannelPipeline
构造器中,首先将与之关联的Channel
保存到字段channel中,然后实例化两个ChannelHandlerContext
,一个是HeadContext
实例head,一个是TailContext
实例tail。接着将head和tail互相指向,构成一个双向链表。
特别注意到,我们在开始的示意图中,head和tail并没有包含ChannelHandler
,这是因为HeadContext
和TailContext
继承于AbstractChannelHandlerContext的同时也实现了ChannelHandler
接口了,因此它们有Context和Handler的双重属性。
- ChannelPipeline的初始化再探
在最开始介绍客户端时,我们已经对ChannelPipeline的初始化有了一个大致的了解,不过当时重点毕竟不在ChannelPipeline这里,因此没有深入地分析它的初始化过程。那么下面我们就来看一下具体的ChannelPipeline的初始化都做了那些工作吧 - ChannelPipeline 实例化过程
我们再来回顾一下,在实例化一个Channel
时,会伴随着一个ChannelPipeline
的实例化,并且此Channel
会与这个ChannelPipeline
相互关联,这一点可以通过NioSocketChannel
的父类AbstractChannel
的构造器予以佐证:
protected AbstractChannel(Channel parent) {
this.parent = parent;
unsafe = newUnsafe();
pipeline = new DefaultChannelPipeline(this);
}
当实例化一个Channel
(这里以 EchoClient
为例,那么Channel
就是NioSocketChannel
),其pipeline字段就是我们新创建的DefaultChannelPipeline
对象,那么我们就来看一下DefaultChannelPipeline
的构造方法吧。
public DefaultChannelPipeline(AbstractChannel channel) {
if (channel == null) {
throw new NullPointerException("channel");
}
this.channel = channel;
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
可以看到,在 DefaultChannelPipeline
的构造方法中,将传入的channel
复制给字段this.channel
,接着又实例化了两个特殊的字段,tail
与head
。这两个字段是一个双向链表的头和尾。其实在DefaultChannelPipeline
中,维护了一个以AbstractChannelHandlerContext
为节点的双向链表,这个链表是Netty实现Pipeline机制的关键。
再回顾一下head和tail的类层次结构:
7.png
从类层次结构图中可以很清楚地看到,head实现了ChannelInboundHandler
,而tail实现了ChannelInboundHandler
接口,并且他们都实现了ChannelHandlerContext
接口,因此可以说head和tail既是一个ChannelHandler
,又是一个ChannelHandlerContext
。
接着看一下headContext的构造器
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, false, true);
unsafe = pipeline.channel().unsafe();
}
它调用了父类AbstractChannelHandlerContext
的构造器,并传入参数inbound=false,outbound=true。
TailContext
的构造器与HeadContext
的相反,它调用了父类AbstractChannelHandlerContext
的构造器,并传入参数Inbound=true,outbound=false。
即header是一个outboundHandler
,而tail是一个`inboundHandler,关于这一点,大家要特别注意,因为在后面的分析中,我们会反复用到inbound和outbound这两个属性。
- ChannelInitializer的添加
上面一小节中,我们已经分析了Channel
的组成,其中我们了解到,最开始的时候ChannelPipeline
中含有两个ChannelHandlerContext
(同时也是ChannelHandler
),但是这个Pipeline并不能实现什么特殊的功能,因为我们还没有给它添加自定义的ChannelHandler
.
通常来说,我们在初始号Bootstrap
,会添加我们自定义的ChannelHandler
,就以我们熟悉的EchoClient
来举例吧
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new EchoClientHandler());
}
});
上面代码的初始化过程,相信大家都不陌生。在调用handler时,传入了ChannelInitializer
对象,它提供了一个initChannel
方法供我们初始化ChannelHandler
,那么这个初始化过程是怎么样的呢?
ChannelInitializer
实现了ChannelHandler,那么它是在什么时候添加到ChannelPipeline
中的呢?进行了一番搜索后,发现它是在Bootstrap.init()
中添加到ChannelPipeline
中的。
其代码如下:
@Override
@SuppressWarnings("unchecked")
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast(handler());
...
}
上面的代码将handler()返回的ChannelHandler
添加到Pipeline中,而handler()返回的是handler其实就是我们在初始化Bootstrap
调用handler设置的ChannelInitializer
实例,因此这里就是将ChannelInitializer
插入到了Pipeline的末端,此时Pipeline的结构如下图所示
有朋友可能有疑惑了,我明明插入的是一个
ChannelInitializer
实例,为什么在ChannelPipeline
中的双向链表中的元素确实一个ChannelHandlerContext
?为了解答这个问题,我们继续在代码中寻找答案吧。我们刚刚才提到,在Bootstrap.init中会调用p.addLast()方法,将ChannelInitializer插入到链表末端:
@Override
public ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) {
synchronized (this) {
checkDuplicateName(name); // 检查此 handler 是否有重复的名字
AbstractChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler);
addLast0(name, newCtx);
}
return this;
}
addLast有很多重载的方法,我们关注这个比较重要的方法就可以了。
上面的addLast方法中,首先检查这个ChannelHandler
的名字是否是重复的,如果不重复的话,则为这个handler创建一个对应的DefaultChannelHandlerContext
实例,并与之关联起来(Context中有一个handler属性保存着对应的Handler实例)。判断此Handler是否重名的方法很简单:Netty中有一个name2ctx Map字段,key是handler的名字,而value则是handler本身。因此通过如下代码就可以判断一个handler是否重名了:
private void checkDuplicateName(String name) {
if (name2ctx.containsKey(name)) {
throw new IllegalArgumentException("Duplicate handler name: " + name);
}
}
为了添加一个handler到pipeline中,必须把此handler包装成ChannelHandlerContext
,因此在上面的代码中我们可以看到新实例化了一个newCtx,并将handler作为参数传递到构造方法中,那么我们来看一下实例化的DefaultChannelHandlerContext
到底有什么玄机吧。
首先看它的构造器
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutorGroup group, String name, ChannelHandler handler) {
super(pipeline, group, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
}
DefaultChannelHandlerContext
的构造器中,调用了两个很有意思的方法:isInbound 与 isOutbound,这两个方法是做什么的呢?
private static boolean isInbound(ChannelHandler handler) {
return handler instance ChannelInboundHandler;
}
private static boolean isOutbound(ChannelHandler handler) {
return handler instance ChannelOutboundHandler;
}
从源码中可以看出,当一个handler实现了ChannelInboundHandler
接口,则isInbound返回真;相似地,当一个handler实现了ChannelOutboundHandler
接口,则isOutbound就返回真。
而这两个boolean变量会传递到父类AbstractChannelHandlerContext
中,并初始化父类的两个字段:inbound与outbound。
那么这里的ChannelInitializer
所对应的DefaultChannelHandlerContext
的inbound与outbound字段分别是什么呢?那就看一下ChannelInitializer
到底实现了哪个接口不就行了?如下是ChannelInitializer
的类层次结构图:
可以清楚地看到。ChannelInitializer仅仅实现了
ChannelInboundHandler
接口,因此这里实例化的DefaultChannelHandlerContext
的inbound=true,outbound=false。不就是inbound和outbound两个字段吗,为什么需要这么大费周章地分析一番?其实这两个字段关系到pipeline的时间的流向与分类,因此是十分关键的。在这里我们只需要记住,
ChannelInitializer
所对应的DefaultChannelHandlerContext
的inbound=true,outbound=false即可。当创建好
Context
后,就将这个Context
插入到Pipeline
的双向链表中:
private void addLast0(final String name, AbstractChannelHandlerContext newCtx) {
checkMultiplicity(newCtx);
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
name2ctx.put(name, newCtx);
callHandlerAdded(newCtx);
}
显然,这个胆码就是典型的双向链表的插入操作了。当调用了addLast()后,Netty就会将此handler添加到双向链表中tail元素之前的位置。
- 自定义ChannelHandler的添加过程
在上一小节中,我们已经分析了一个ChannelInitializer
如果插入到Pipeline
中的,接下来就来探讨一下ChannelInitializer
在哪里被调用,ChannelInitializer
的作用,以及我们自定义的ChannelHandler
是如何插入到Pipeline
中的。
在客户端一章的channel注册过程小节中,我们已经分析过Channel
的注册过程了,这里我们再简单的复习一下:
- 首先在
AbstractBootstrap.initAndRegister
中,通过group().register(channel),调用MultithreadEventLoopGroup.register()
- 在
MultithreadEventLoopGroup.register
中,通过next()获取一个可用的SingleThreadEventLoop
,然后调用它的register - 在
AbstractUnsafe.register()
中,调用register()注册Channel - 在
AbstractUnsafe.register0
中,调用AbstractNioChannel#doRegister()
-
AbstractNioChannel.doRegister()
通过javaChannel.register(eventLoop.selector(),0,this)
将Channel对应的Java NIO SocketChannel注册到一个eventLoop的Selector中,并且将当前的Channel作为attachment.
而我们自定义的ChannelHandler
的添加过程,发生在AbstractUnsafe.register0()
中,在这个方法中调用了pipeline.fireChannelRegistered()
方法,其实现如下
@Override
public ChannelPipeline fireChannelRegistered() {
head.fireChannelRegistered();
return this;
}
上面的代码很简单,就是调用了head.fireChannelRegistered()而已
关于上面代码的 `head.fireXXX`的调用形式,是Netty中Pipeline传递事件的常用方式,我们以后会经常看到
还记得head的类层次结构图不,head是一个AbstractChannelHandlerContext
实例,并且它没有重写fireChannelRegister()
,因此head.fireChannelRegistered()
其实是调用AbstractChannelHandlerContext.fireChannelRegistered()
:
@Override
public ChannelHandlerContext fireChannelRegistered() {
final AbstractChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
} else {
executor.execute(new OneTimeTask() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
return this;
}
这个方法的第一句是调用findContextInbound
获取一个Context
,那么它返回的Context
到底是什么呢?
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return cox;
}
很显 然,这个代码会从head开始遍历Pipeline
的双向链表,然后找到第一个属相inbound 为 true的ChannelHandlerContext
实例。我们在前面分析ChannelInitializer
时,花了很大的笔墨来分析了inbound 和 outbound的属相,在这里用上了,回想一下,ChannelInitializer
实现了ChannelInboundHandler,因此它所对应的
ChannelHandlerContext的inbound属相就是true,因此这里返回就是
ChannelInitializer实例所对应的
ChannelHandlerContext,即 ![2.png](https://img.haomeiwen.com/i6734122/d22406bfdb315a7b.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240) 当获取到inbound的Context后,就调用它的
invokeChannelRegister`方法:
private void invokeChannelRegistered() {
try {
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
我们已经强调过了,每个ChannelHandler
都与一个ChannelHandlerContext
关联,我们可以通过ChannelHandlerContext
获取对应的ChannelHandler
,因此很明显,这里handler()返回的,其实就是一开始我们实例化的ChannelInitializer
对象,并接着调用了ChannelInitializer.channelRegistered()
。看到这里,读者朋友是否觉得有点眼熟呢?ChannelInitializer.channelRegister
这个方法我们在第一章的时候已经大量接触了,但是并没有深入地分析这个方法的调用过程
这个方法中又有什么玄机呢?
@Override
@SuppressWarnings("unchecked")
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
initChannel((C) ctx.channel());
ctx.pipeline().remove(this);
ctx.fireChannelRegistered();
}
initChannel()
这个方法我们很熟悉了吧,它就是我们在初始化Bootstrap
时,调用handler方法传入的匿名内部类所实现的方法
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new EchoClientHandler());
}
});
因此当调用了这个方法后,我们自定义的ChannelHandler
就插入到Pipeline
了,此时的Pipeline如下图所示
当添加了自定义的
ChannelHandler
后,会删除ChannelInitializer
这个ChannelHandler,即 cox.pipeline().remove(this)
,incident最后的Pipeline如下4.png
- ChannelHandler的名字
我们注意到,pipeline.addXXX
都有一个重载的方法,例如addLast,它有一个重载的版本是:
ChannelPipeline addLast(String name, ChannelHandler handler);
第一参数指定了所添加的handler的名字(更准确地说是ChannelHandlerContext
的名字,不过我们通常是以handler作为叙述,因此说成handler的名字便于理解),那么handler的名字有什么用呢?如果我们不设置name,那么handler会有怎样的名字,为了解答这些疑惑,还是从源码中找到答案
以addLast()为例
@Override
public ChannelPipeline addLast(String name, ChannelHandler handler) {
return addLast(null, name, handler);
}
这个方法会调用重载的addLast方法
@Override
public ChannelPipeline addLast(EventExecutorGroup group, final String name, ChannelHandler handler) {
synchronized (this) {
checkDuplicateName(name);
AbstractChannelHandlerContext newCtx = new DefaultChannelHandlerContext(this, group, name, handler);
addLast0(name, newCtx);
}
return this;
}
第一个参数被设置为null ,我们不关心它,第二个参数就是这个handler的名字,看代码可知,在添加handler之前,需要调用checkDouplicateName方法来确定此handler的名字是否和已添加的handler的名字重复,而这个checkDuplicationName方法在前面已经有提到,这里再回顾一下:
private void checkDuplicateName(String name) {
if (name2ctx.containsKey(name)) {
throw new IllegalArgumentException("Duplicate handler name: " + name);
}
}
Netty判断一个handler的名字是否重复的依据很简答:DefaultChannelPipeline中有一个类型为Map<String,AbstractChannelHandlerContext> 的name2ctx字段,它的key是一个handler的名字,而value则是这个handler所对应的ChannelHandlerContext,每当新添加一个handler时,就会put到name2ctx中。因此检查name2ctx中是否包含这个name即可,当没有重命名的handler时,就为这个handler生成一个关联的DefaultChannelHandlerContext
对象,然后就将name和newCtx作为key-value对放到name2Ctx中。
- 自动生成handler的名字
如果我们调动的是如下的addLast()
ChannelPipeline addLast(ChannelHandler... handlers);
那么Netty会调用generateName为我们的handler自动生成一个名字:
private String generateName(ChannelHandler handler) {
WeakHashMap<Class<?>, String> cache = nameCaches[(int) (Thread.currentThread().getId() % nameCaches.length)];
Class<?> handlerType = handler.getClass();
String name;
synchronized (cache) {
name = cache.get(handlerType);
if (name == null) {
name = generateName0(handlerType);
cache.put(handlerType, name);
}
}
synchronized (this) {
// It's not very likely for a user to put more than one handler of the same type, but make sure to avoid
// any name conflicts. Note that we don't cache the names generated here.
if (name2ctx.containsKey(name)) {
String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.
for (int i = 1;; i ++) {
String newName = baseName + i;
if (!name2ctx.containsKey(newName)) {
name = newName;
break;
}
}
}
}
return name;
}
而generateName会接着调用generateName0来实际产生一个handler的名字
private static String generateName0(Class<?> handlerType) {
return StringUtil.simpleClassName(handlerType) + "#0";
}
自动生成的名字的规则很简单,就是handler的简单类名加上"#0",因此我们的EchoClientHandler的名字就是"EchoClientHandler#0"。
- 关于Pipeline的时间传输机制
前面章节中,我们知道AbstractChannelHandlerContext
中有inbound 和outbound两个boolean变量,分别用于标识Context所对应的handler的类型,即:
- inbound为真时,表示对应的
ChannelHandler
实现了ChannelInboundHandler
- outbound为真时,表示对应的
Channelhandler
实现了ChannelOutboundHandler
读者朋友肯定很疑惑吧,那究竟这两个字段有什么作用呢?其实这还要从ChannelPipeline
的传输的事件类型说起.
Netty的事件可以分为Inbound和outbound事件
I/O Request
via Channel or
ChannelHandlerContext
|
+---------------------------------------------------+---------------+
| ChannelPipeline | |
| \|/ |
| +---------------------+ +-----------+----------+ |
| | Inbound Handler N | | Outbound Handler 1 | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
| | \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler N-1 | | Outbound Handler 2 | |
| +----------+----------+ +-----------+----------+ |
| /|\ . |
| . . |
| ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
| [ method call] [method call] |
| . . |
| . \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler 2 | | Outbound Handler M-1 | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
| | \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler 1 | | Outbound Handler M | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
+---------------+-----------------------------------+---------------+
| \|/
+---------------+-----------------------------------+---------------+
| | | |
| [ Socket.read() ] [ Socket.write() ] |
| |
| Netty Internal I/O Threads (Transport Implementation) |
+-------------------------------------------------------------------+
从上图可以看到,inbound事件和outbound事件的流向是不一样的,inbound事件的流向是从下至上,而outbound刚好相反,是从上至下。并且inbound的传递方式是通过调用相应的ChannelHandlerContext.fireIN_EVT()
方法,而outbound方法的传递方式是通过调用ChannelHandlerContext.OUT_EVT()
方法。例如ChannelHandlerCountext.firChannelRegistered()
调用会发送一个ChannelRegistered的inbound给下一个ChannelHandlerContext,而ChannelHandlerContext.bind
调用会发送一个bind的outbound事件给下一个ChannelHandlerContext
inbound事件传播方法有:
ChannelHandlerContext.fireChannelRegistered()
ChannelHandlerContext.fireChannelActive()
ChannelHandlerContext.fireChannelRead(Object)
ChannelHandlerContext.fireChannelReadComplete()
ChannelHandlerContext.fireExceptionCaught(Throwable)
ChannelHandlerContext.fireUserEventTriggered(Object)
ChannelHandlerContext.fireChannelWritabilityChanged()
ChannelHandlerContext.fireChannelInactive()
ChannelHandlerContext.fireChannelUnregistered()
Outbound事件传输方法有:
ChannelHandlerContext.bind(SocketAddress, ChannelPromise)
ChannelHandlerContext.connect(SocketAddress, SocketAddress, ChannelPromise)
ChannelHandlerContext.write(Object, ChannelPromise)
ChannelHandlerContext.flush()
ChannelHandlerContext.read()
ChannelHandlerContext.disconnect(ChannelPromise)
ChannelHandlerContext.close(ChannelPromise)
注意,如果我们捕获了一个事件,并且想让这个事件继续传递下去,那么需要调用Context相应的传播方法
例如:
public class MyInboundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("Connected!");
ctx.fireChannelActive();
}
}
public clas MyOutboundHandler extends ChannelOutboundHandlerAdapter {
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
System.out.println("Closing ..");
ctx.close(promise);
}
}
上面的例子中,MyInboundHandler收到了一个channelActive
事件,它在处理后,如果希望将事件继续传播下去,那么需要接着调用ctx.fireChannelActive()
。
- Outbound操作
Outbound事件都是请求时间,即请求某件事情的发生,然后通过Outbound事件进行通知。
Outbound事件的传播方法是tail -> customContext -> head.
我们接下来以connect事件为例,分析一下Outbound事件的传播机制
首先,当用户调用了 Bootstrap.connect()时,就会触发一个Connect请求事件,此调用会触发如下调用链
Bootstrap.connect -> Boostracp.doConnect -> Bootstrap.doConnect0 -> AbstractChannel.connect
继续跟踪的话,我们就发现,AbstractChannel.connect其实由调用了DefaultChennelPipeline.connect
方法
@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return pipeline.connect(remoteAddress, promise);
}
而pipeline.connect的实现如下
@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return tail.connect(remoteAddress, promise);
}
可以看到,当outbound事件(这里是connect事件)传递到Pipeline后,它其实是以tail为起点开始传播的,而tail.connect其实调用的就是AbstractChannelHandlerContext.connect()
@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return pipeline.connect(remoteAddress, promise);
}
而pipeline.connect()的实现如下:
@Override
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return tail.connect(remoteAddress, promise);
}
可以看到,当outbound事件(这里是connect事件)传递到Pipeline后,它其实是以tail为起点开始传播的。
而tail.connect其实调用的是AbstractChannelHandlerCounxt.connect
:
@Override
public ChannelFuture connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
...
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
...
next.invokeConnect(remoteAddress, localAddress, promise);
...
return promise;
}
findContextOutbound()
顾名思义,它的作用是以当前Context为起点,向Pipeline中的Context双向链表的前端寻找第一个outbound属相为真的Context(即关联着ChannelOutboundHandler
的Context),然后返回。
它的实现如下
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return cox;
}
当我们找到一个outbound的Context后,就调用它的invokeConnect(),这个方法中调用Context所关联着的ChannelHandler
的connect方法:
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
如果用户没有重写ChannelHandler
的connect方法,那么会调用ChannelOutboundHandlerAdapter
所实现的方法:
@Override
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception {
ctx.connect(remoteAddress, localAddress, promise);
}
我们看到,ChannelOutboundHandlerAdapter.connect
仅仅调用了cox.connect,而这个调用又回到了
Context.connect -> Connect.findContextOutbound -> next.invokeChannel -> handler.connect -> Context.connect
这样的循环找那个,直到connect时间传递到DefaultChennelPipeline
的双向链表的头结点,即head中。为什么会传递到head中呢?回想一下,head实现了 ChannelOutboundHandler
,因此它的outbound属性为true.
因为head本身即是一个ChannelHandlerContext
,又实现了ChannelOutboundHandler
接口,因此当connect消息传递到head后,会将消息传递到对应的ChannelHandler中处理,而恰好,head的header()返回的就是head本身:
@Override
public ChannelHandler handler() {
return this;
}
因此最终connect事件是在head中处理的。head的connect事件处理方法如下:
@Override
public void connect(
ChannelHandlerContext cox,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
unsafe.connect(remoteAddress, localAddress, promise);
}
到这里,整个Connect请求事件就结束了,下面一幅图来描述整个Connect请求的处理过程:
5.png
我们仅仅以Connect请求事件为例,分析 了Outbound时间的传播过程,但是其实所有的outbound的事件传播都遵循着一样的传播规律。
- inbound事件
Inbound事件和Outbound事件的处理过程有点类似。
Inbound
事件是一个通知时间,即某件事已经发生了,然后通过Inbound
事件进行通知。Inbound
通常发生在Channel
的状态的改变或IO事件就绪。
Inbound
的特点是它传播方向是head -> customContext -> tail。
既然上面我们分析了Connect
这个Outbound
事件,那么接着分析Connect事件后会发生什么Inbound
事件,并最终找到Outbound
和Inbound
事件之间的联系。
当Connect
这个Outbound
传播到unsafe后,其实是在AbstractNioUnsafe.connect
方法中进行处理的。
@Override
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
...
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
...
}
...
}
在AbstractNioUnsafe.connect
中,首先调用doConnect方法进行实际上的Socket
连接,当连接上后,会调动fulfillConnectPromise()
:
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
...
// Regardless if the connection attempt was cancelled, channelActive() event should be triggered,
// because what happened is what happened.
if (!wasActive && isActive()) {
pipeline().fireChannelActive();
}
...
}
我们看到,在fulfillConnectPromise
中,会通过调用pipeline().fireChannelActive()
将通道激活的消息(即Socket连接成功)发送出去。而这里,当调用pipeline.fireXXX
后,就是 Inbound
事件的起点。
因此当调用了pipeline().fireChannelActive()
后,就产生了一个ChannelActive Inbound事件,我们就从这里开始看看这个Inbound
事件是怎么传播的吧。
public ChannelPipeline fireChannelActive() {
head.fireChannelActive();
if (channel.config().isAutoRead()) {
channel.read();
}
return this;
}
果然,在fireChannelActive()
中,调用的是head.fireChannelActive
,因此可以证明了,Inbound事件在Pipeline中传输的起点是head.
那么,在head.fireChannelActive()
中又做了什么呢?
@Override
public ChannelHandlerContext fireChannelActive() {
final AbstractChannelHandlerContext next = findContextInbound();
EventExecutor executor = next.executor();
...
next.invokeChannelActive();
...
return this;
}
回想一下在Outbound
事件(例如Connect
事件)的传输过程中时,我们也有类似的操作
- 首先调用
findContextInbound
,从Pipeline
的双向链表中找到第一个属性inbound为true的Context
,然后返回 - 调用这个
Context
的invokeChannelActive
invokeChannelActive
方法如下
private void invokeChannelActive() {
try {
((ChannelInboundHandler) handler()).channelActive(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
这个方法和Outbound的对应方法(例如invokeConnect)如出一辙。同Outbound
一样,如果用户没有重写channelActive()
,那么调用ChannelInboundHandlerAdapter
的channelActive
方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
}
同样地,在ChannelInboundHandlerAdapter.channelActive
中,仅仅调用了cox.fireChannelActive
方法,因此就会有如下循环
context.fireChannelActive -> Connect.findContextInbound -> nextContext.invokeChannelActive -> nextHandler.channelActive -> nextContext.fireChannelActive
这样的循环中。同理,tail
本身即实现了ChannelInboundHander
接口,又实现了ChannelHandlerContext
接口,因此当channelActive
消息传递到tail
后,会将消息传递到对应的ChannelHandler
中处理,而恰好,tail
的handler()返回的就是tail
本身
@Override
public ChannelHandler handler() {
return this;
}
因此, channelActive
inbound时间最终是在tail
中处理的,我们看一下它的处理方法:
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { }
TailContext.channelActive
方法是空的。入股读者自定查看TailContext
的inbound处理方法时,会发现,它们的实现都是空的。课件,如果是inbound,当用户没有实现自定义的处理器时,那么默认是不处理的
用一幅图来总结以下Inbound
的传输过程吧
- 总结
对于Outbound
事件:
-
Outbound
事件是请求事件(由Context
发起一个请求,并最终由Unsafe
处理这个请求) -
Outbound
事件的发起者是Channel
-
Outbound
事件的处理者是Unsafe
-
Outbound
事件在Pipeline
中的传输方向是tail -> head
. - 在
ChannelHandler
中处理事件时,如果这个Handler
不是最后一个Handler
,则需要调用ctx.XXX(例如ctx.connect)将此时间继续传播下去,如果不这么做,那么此事件的传播会提前终止 -
Outbound
事件流:Context.OUT_EVT -> Connect.findCountextOutbound -> nextContext.invokeOUT_EVT -> nextHandler.OUT_EVT -> nextContext.OUT_EVT
关于inbound事件: -
Inbound
事件是通知事件,当某件事情已经就绪后,通知上层。 -
Inbound
事件发起者是unsafe
-
Inbound
事件的处理者是Channel
,如果用户没有实现自定义的处理方法,那么Inbound
事件默认的处理者是TailContext
,并且其实现方法是空实现 -
Inbound
事件在Pipeline
中传输方法是 head ->tail - 在
ChannelHandler
处理事件中,如果这个Handler
不是最后一个Handler
,则需要调用ctx.fireIN_EVT
(例如cox.fireChennalActive
)将此事件继续传播下去,如果不这样做,那么此事件的传播会提前终止 -
Outbound
事件流 : Context.fireIN_EVT -> Connect.findContextInbound -> nextContext.invokeIN_EVT -> nextHandler.IN_EVT -> nextContext,fireIN_EVT
网友评论