Pipeline
设计模式中有一种设计模式叫做责任链模式,netty pipeline就是责任链模式的一种实现,链上每个节点按照不同的添加方式和添加顺序排列在链上不同的位置,这条链是一条双向链,在netty中用户创建的handler的都会通过DefaultChannelHandlerContext包装成链上的节点。
DefaultChannelPipeline
netty默认创建的pipeline类型是DefaultChannelPipeline,DefaultChannelPipeline默认会创建一个head节点和一个tail节点,用户根据业务需求创建的业务处理handler都会被添加到这两个节点中间,我们知道java io事件包含两种类型:读,写
netty定义了ChannelInboundHandler和ChannelOutboundHandler作为这两种事件处理的抽象接口提供给开发者,实现ChannelInboundHandler的handler用来处理读事件,实现ChannelOutboundHandler的handler用来处理写事件。这里需要注意一点:在pipeline链上对读写事件的处理方向是不同的
- 对于读事件netty从pipeline的head开始向tail方向依次把读事件交给实现了ChannelInboundHandler的handler处理
- 对于写事件netty从pipeline的tail开始向head方向依次把写事件交给实现了ChannelOutboundHandler的handler处理
Handler
handler是pipeline上处理事件的基本单元,用户处理IO事件的业务逻辑都是写在自定义的handler中
ChannelHandlerContext
ChannelHandlerContext是handler上下文信息类,实际上pipeline链上节点的类型是ChannelHandlerContext,默认实现是DefaultChannelHandlerContext
我们看下DefaultChannelHandlerContext源代码
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
//DefaultChannelHandlerContext绑定了用户定义的handler
private final ChannelHandler handler;
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, handler.getClass());
this.handler = handler;
}
@Override
public ChannelHandler handler() {
return handler;
}
}
DefaultChannelHandlerContext作为pipeline链上的节点,它的父类AbstractChannelHandlerContext有pre和next两个属性分别指向前一个节点和后一个节点,可以看出pipeline是一个双向链表。
DefaultChannelHandlerContext和handler绑定
用户定义的handler是如何绑定到一个DefaultChannelHandlerContext的呢?
我们使用pipleline.addLast()来解析,当我使用pipeline.addLast去添加自定义的handler的时候,最终会调用下面所示的代码
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
//newContex通过创建DefaultChannelHandlerContext的方式实现了context和handler的绑定
newCtx = newContext(group, filterName(name, handler), handler);
//把新建的DefaultChannelHandlerContext添加到pipeline的尾部
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventLoop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
executionMask
在创建DefaultChannelHandlerContext的时候会初始化它的父类AbstractChannelHandlerContext,这个类的executionMask属性得我们去研究下。
pipeline上会触发很多类型的事件,每个handler处理的事件类型是不同的,executionMask的作用就是标识不同handler能响应的事件类型,下面是handler executionMask计算方式如下:
private static int mask0(Class<? extends ChannelHandler> handlerType) {
int mask = MASK_EXCEPTION_CAUGHT;
try {
if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
mask |= MASK_ALL_INBOUND;
if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_REGISTERED;
}
if (isSkippable(handlerType, "channelUnregistered", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_UNREGISTERED;
}
if (isSkippable(handlerType, "channelActive", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_ACTIVE;
}
if (isSkippable(handlerType, "channelInactive", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_INACTIVE;
}
if (isSkippable(handlerType, "channelRead", ChannelHandlerContext.class, Object.class)) {
mask &= ~MASK_CHANNEL_READ;
}
if (isSkippable(handlerType, "channelReadComplete", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_READ_COMPLETE;
}
if (isSkippable(handlerType, "channelWritabilityChanged", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_WRITABILITY_CHANGED;
}
if (isSkippable(handlerType, "userEventTriggered", ChannelHandlerContext.class, Object.class)) {
mask &= ~MASK_USER_EVENT_TRIGGERED;
}
}
if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {
mask |= MASK_ALL_OUTBOUND;
if (isSkippable(handlerType, "bind", ChannelHandlerContext.class,
SocketAddress.class, ChannelPromise.class)) {
mask &= ~MASK_BIND;
}
if (isSkippable(handlerType, "connect", ChannelHandlerContext.class, SocketAddress.class,
SocketAddress.class, ChannelPromise.class)) {
mask &= ~MASK_CONNECT;
}
if (isSkippable(handlerType, "disconnect", ChannelHandlerContext.class, ChannelPromise.class)) {
mask &= ~MASK_DISCONNECT;
}
if (isSkippable(handlerType, "close", ChannelHandlerContext.class, ChannelPromise.class)) {
mask &= ~MASK_CLOSE;
}
if (isSkippable(handlerType, "deregister", ChannelHandlerContext.class, ChannelPromise.class)) {
mask &= ~MASK_DEREGISTER;
}
if (isSkippable(handlerType, "read", ChannelHandlerContext.class)) {
mask &= ~MASK_READ;
}
if (isSkippable(handlerType, "write", ChannelHandlerContext.class,
Object.class, ChannelPromise.class)) {
mask &= ~MASK_WRITE;
}
if (isSkippable(handlerType, "flush", ChannelHandlerContext.class)) {
mask &= ~MASK_FLUSH;
}
}
if (isSkippable(handlerType, "exceptionCaught", ChannelHandlerContext.class, Throwable.class)) {
mask &= ~MASK_EXCEPTION_CAUGHT;
}
} catch (Exception e) {
// Should never reach here.
PlatformDependent.throwException(e);
}
return mask;
}
可以看到executionMask的值是根据handle类实现的不同事件响应方法计算得到
事件是如何在pipeline上传播的
从上面我们知道pipeline是一条双向链表,那么事件是如何在pipeline的节点之间传播的呢?我们拿channelRegistered事件来分析
当一个channel在NioEventLoop上注册完成后会触发channelRegistered事件,触发事件的入口是DefaultPipeline.fireChannelRegistered()方法
public final ChannelPipeline fireChannelRegistered() {
//head是pipeline链表上的的第一节点,表示从链表的头部开始依次向尾部去处理registered事件
AbstractChannelHandlerContext.invokeChannelRegistered(head);
return this;
}
我们看下AbstractChannelHandlerContext.invokeChannelRegistered源代码
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
//触发pipeline上当前节点的invokeChannelRegistered方法
next.invokeChannelRegistered();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
}
我继续分析AbstractChannelHandlerContext.invokeChannelRegistered源代码
private void invokeChannelRegistered() {
if (invokeHandler()) {
try {
//调用当前DefaultChannelHandlerContext绑定的handler的channelRegistered方法,
//用户根据自己业务逻辑重写的channelRegistered方法会被调用
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRegistered();
}
}
用户自定义的handler在处理完事件之后如何继续触发下一个节点执行呢?一般用户自定义handler重写的channelRegistered方法会在处理完业务逻辑后调用AbstractChannelHandlerContext.fireChannelRegistered
我们看下 fireChannelRegistered源代码
public ChannelHandlerContext fireChannelRegistered() {
//findContextInbound在pipeline上从本节点开始向pipeline的尾部找到下一个能处理channel registered事件的handler,
invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
return this;
}
我们给出上面执行的逻辑图,XXX代表的是对应的事件,比如registered,add,YY代表的是事件的类型,YY有两种值:in 和 out
pipeline事件触发流程.png
ChannelInitializer
ChannelInitializer是netty内置的一个特殊的Inboundhandler,这个handler是netty提供给开发者实现向pipeline中添加自定义handler的工具类,它一般会首先被添加到pipeline中,开发者通过实现ChannelInitializer的initChannel方法去添加业务handler到pipeline中。那么initChannel方法是如何被调用的呢?开发者在向pipeline添加handler的时候会触发handlerAdded事件,handlerAdded事件会触发相应handler的handlerAdded方法,我们看下ChannelInitializer handlerAdded方法的源码
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
//判断channel是不是已经注册了,如果在channel还没有注册的情况下向pipeline添加了handler,
//这个时候触发的handlerAdded事件会被保存在一个链表中,
//将来当channel注册完成的时候会去查看这个链表有没有pending的handlerAdded事件需要触发,
//如果有pending的handlerAdded事件,那么就调用相应handler的handlerAdded方法
if (ctx.channel().isRegistered()) {
// This should always be true with our current DefaultChannelPipeline implementation.
// The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
// surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
// will be added in the expected order.
//这里会执行用户重写的ChannelInitializer的initChannel方法,
//这样就实现了把用户定义的各种业务handler添加到pipeline中
//当成功执行用户重写的initChannel方法后,ChannelInitializer会把自己从pipeline中删除,
//这样pipeline中包含handlers除了head和tail剩下的都是用户自己定义的业务handler
if (initChannel(ctx)) {
// We are done with init the Channel, removing the initializer now.
removeState(ctx);
}
}
}
上面就是对netty pipeline的解析
网友评论