一 ChannelHandlerContext和ChannelHandler
在ChannelPipeline这一层,ChannelPipeline会把收到的事件交给ChannelHandler处理。
在外部,我们可以向ChannelPipeline中添加ChannelHandler处理器,但是添加到Pipeline中后,会被包装成一个ChannelHandlerContext对象:
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
//...
}
ChannelHandlerContext主要是做了下面两件事:
- 封装了ChannelPipeline,Unsafe,Channel,等对象信息,方便处理事件时使用
- 对ChannelHandler的调用做了异步处理,可以接受一个Executor对象,然后对应ChannelHandler的所有事件都会被这一个Executor对象执行。
在ChannelPipeline中,把所有的ChannelHandlerContext放在一个链表中,本身记录了链表的头尾节点:
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
对于ChannelHandler中每一个事件相关的方法,ChannelHandlerContext中都有一个对应的fireXXX方法,这个方法用于向下一个ChannelHandler传递事件
二 inbound事件和outbound事件
Pipeline层把所有的事件分为两种:
- 建立连接,读数据等客户端向服务端发送的事件称为inbound事件,这类事件会由链头开始向链尾传递处理。
- 发送数据等由服务端向客户端进行操作的事件,称为outbound事件,这类事件会由链尾开始向链头传递处理。
* I/O Request
* via {@link Channel} or
* {@link ChannelHandlerContext}
* |
* +---------------------------------------------------+---------------+
* | ChannelPipeline | |
* | \|/ |
* | +---------------------+ +-----------+----------+ |
* | | Inbound Handler N | | Outbound Handler 1 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* | | \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler N-1 | | Outbound Handler 2 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ . |
* | . . |
* | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
* | [ method call] [method call] |
* | . . |
* | . \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler 2 | | Outbound Handler M-1 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* | | \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler 1 | | Outbound Handler M | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* +---------------+-----------------------------------+---------------+
* | \|/
* +---------------+-----------------------------------+---------------+
* | | | |
* | [ Socket.read() ] [ Socket.write() ] |
* | |
* | Netty Internal I/O Threads (Transport Implementation) |
* +-------------------------------------------------------------------+
二 ChannelPipeline的初始化过程
ChannelPipeline使用双向链表维护了ChannlHandler的信息,所以初始化中最关键的,是初始化了双向队列的头尾节点:
protected DefaultChannelPipeline(Channel channel) {
//....
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
这里TailContext和HeadContext都是ChannelHandler的实现类
tail节点基本是空实现,或者仅仅打印一些日志:
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. " +
"Please check your pipeline configuration.", msg);
} finally {
ReferenceCountUtil.release(msg);
}
}
而这个head节点,也就是HeadContext对象,就做了很多事了。
例如,上一节说的,当向Channel中写数据时,数据会先被ChannelPipeline处理,然后再交给Unsafe,把数据写出去。 其实就是在HeadContext中,把数据交给Unsafe写出去的:
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
unsafe.write(msg, promise);
}
由于写数据是outbound事件,所以会由tail节点开始处理,数据到达head节点时已经处理完了。
三 处理一个事件的流程
以channelRead事件为例:
在ChannelPipeline#fireChannelRead方法中,首先调用head节点的channelRead方法进行处理:
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
AbstractChannelHandlerContext.invokeChannelRead方法的第一个参数是ChannelHandler类型的,方法内部会执行这个ChannelHandler对象的channelRead方法:
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(() -> next.invokeChannelRead(m));
}
}
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
这段代码的逻辑就是判断是否配置了专门的线程池,如果配置了,就用专门的线程池去调用。
上面说过,head节点其实就是一个HeadContext对象,因此具体执行到的就是HeadContext#channelRead方法:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.fireChannelRead(msg);
}
方法内只是简单的调用了ctx.fireChannelRead(msg); 这行代码的作用就是向下一个ChannelHanderl传递事件。因此,如果在想在某个ChannelHandler中拦截事件,那么只需要不加上这一行代码即可。
然后这个ctx.fireChannelRead方法中,会找到下一个ChannelHandler:
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
return this;
}
找到后又调用invokeChannelRead触发下一个ChannelHandler的channelRead方法。这就回到了最开始的地方。
一直这样循环处理,直到所有的ChannelHandler处理完毕。
网友评论