上篇介绍accept连接的过程中已经介绍服务端在收到一个客户端请求后,会将客户端和服务端建立的NioSocketChannel注册到NioEventLoopGroup,由NioEventLoop完成这个channel上的事件监听,由于Netty在接收到消息的处理流程大概如下:
netty时序图.png
(1)客户端发送消息到服务端。
(2)服务端监听对应channel上的io事件,监听到读事件后读取消息。
(3)将读取到的字节数据放入ByteBuf中,然后调用pipline的fireChannelRead方法,将这个ByteBuf中保存的数据传递给pipline,pipline中保存了一个链表,这个链表中每个节点里都存储一个channelHandler,在channelHandler中可以对ByteBuf中的数据进行校验、解码等操作。
这是一个非常粗的消息交互过程,主要是先清楚一下消息的大概处理流程,从上面的三个步骤来看,pipline是一个非常重要的对象,如果想要搞清楚服务器消息处理的流程,有必要研究下这个对象的源码实现。对于pipline这个对象,我想要搞清楚的主要是下面三个问题:
(1)channelHandler是怎么绑定到pipline中的?
(2) pipline的数据结构是怎样的?
(3)消息是怎么由一个channelHandler传递到另一个channelHandler的?
首先分析第一个问题,为了便于理解,先给出demo程序。
public void start() {
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new InBoundHandler1());
ch.pipeline().addLast(new InBoundHandler2());
ch.pipeline().addLast(new InBoundHandler3());
}
});
....
}
在上面server的启动程序中,.childHandler的作用就是将用户自定义的channelHandler添加到pipeline中。我们主要看下是在什么时候用户自定义的channelHandler添加到pipeline中的。
.childHandler的作用主要是初始化了ServerBootstrap这个对象的childHandler对象,将这个对象的引用指向ChannelInitializer的实现类。看下这个childHandler对象在哪使用用的,通过追溯代码发现是在ServerBootstrap的init方法中使用的。
@Override
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
final ChannelHandler currentChildHandler = childHandler;
....
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
....
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
从上面代码中可以看到,在new ServerBootstrapAcceptor时将这个childHandler对象传递给了ServerBootstrapAcceptor类中的childHandler成员变量。继续看下ServerBootstrapAcceptor类中的childHandler在哪使用的,发现是在ServerBootstrapAcceptor类中的channelRead方法中使用的。
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
......
}
前面的文章已经介绍这个方法是在接收到一个新连接时,在这个方法中将客户端和服务端建立的NioSocketChannel注册到NioEventLoopGroup,由NioEventLoop完成这个channel上的事件监听。本文关心的是pipline和channelHandler的关系,因此我们主要看下child.pipeline().addLast(childHandler)这句代码,跟下这句代码的实现。
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
....
}
具体实现在DefaultChannelPipeline的addLast方法中,主要看下面这两句代码:
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
首先创建一个DefaultChannelHandlerContext对象,childHandler的引用传给了这个对象,那么这个对象就持有了对childHandler的引用,然后调用addLast0方法。
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
从这个方法就可以很清楚的看到DefaultChannelPipeline保存了一个双向链表的数据结构,用于记录channelHandler的处理顺序。下图说明了DefaultChannelPipeline初始化时的数据结构。
DefaultChannelPipeline初始化链表结构.png
从上图可以看出,DefaultChannelPipeline初始化时有一个head节点和tail节点,每个节点对应的都是一个AbstractChannelHandlerContext类,如果对双向链表很清楚的话,AbstractChannelHandlerContext类中应该会有两个指针,一个指向前一个节点,一个指向后一个节点。
上图是初始化时的数据结构,那么执行addLast0方法之后的数据结构如下图所示:
DefaultChannelPipeline add last.png
这里为了直观一些,插入的节点用了channelHandler表示,其实这个节点也是一个AbstractChannelHandlerContext对象,这个对象持有了对childHandler的引用。我们知道这个childHandler指向的只是在demo程序中实现ChannelInitializer类initChannel方法的一个实现类,demo程序中InBoundHandler1、InBoundHandler2、InBoundHandler3这三个具体的channelHandler还没有添加到这个链表里,那这几个handler是在哪添加的呢,我们继续回到addLast这个方法的实现。
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
....
}
这个方法首先将childHandler放入了一个新建的AbstractChannelHandlerContext对象,然后将这个AbstractChannelHandlerContext对象又放入了DefaultChannelPipeline的链表中。
然后根据registered这个布尔变量,判断是否进入if条件,registered这个布尔变量表示的是这个DefaultChannelPipeline对应的channel是否已经注册到NioEventLoopGroup,由于接收到一个新连接调用channelRead方法时先将childHandler放入DefaultChannelPipeline,然后才将客户端和服务端建立的NioSocketChannel注册到NioEventLoopGroup,所以此时registered布尔变量为false,进入if条件。进入if条件后,看下 callHandlerCallbackLater这个方法。
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
assert !registered;
PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
PendingHandlerCallback pending = pendingHandlerCallbackHead;
if (pending == null) {
pendingHandlerCallbackHead = task;
} else {
// Find the tail of the linked-list.
while (pending.next != null) {
pending = pending.next;
}
pending.next = task;
}
}
这个方法中将刚才新建的AbstractChannelHandlerContext对象放入了PendingHandlerAddedTask这个线程类中,看下这个线程类。
private final class PendingHandlerAddedTask extends PendingHandlerCallback {
@Override
public void run() {
callHandlerAdded0(ctx);
}
@Override
void execute() {
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
callHandlerAdded0(ctx);
} else {
try {
executor.execute(this);
} catch (RejectedExecutionException e) {
if (logger.isWarnEnabled()) {
logger.warn(
"Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.",
executor, ctx.name(), e);
}
remove0(ctx);
ctx.setRemoved();
}
}
}
}
这个线程类中一个实现了Runnable的run方法,一个实现了execute方法,这两个方法中调用了callHandlerAdded0这个方法,我们继续跟下这个方法的实现。
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
ctx.setAddComplete();
ctx.handler().handlerAdded(ctx);
} catch (Throwable t) {
...
} catch (Throwable t2) {
....
}
这个方法中调用了 ctx.handler().handlerAdded(ctx)方法,其中 ctx.handler()返回的是实现ChannelInitializer类initChannel方法的一个实现类,说明这里调用的是ChannelInitializer类中的handlerAdded方法,进入handlerAdded方法看下具体实现。
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
initChannel(ctx);
}
}
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
try {
initChannel((C) ctx.channel());
} catch (Throwable cause) {
exceptionCaught(ctx, cause);
} finally {
remove(ctx);
}
return true;
}
return false;
}
在handlerAdded方法中调用initChannel(ChannelHandlerContext ctx)方法,我们在这个方法中发现调用了initChannel((C) ctx.channel())这个方法,这个方法是个抽象方法,具体实现就是在demo程序中。我们在demo中这个方法的具体实现如下:
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel ch) {
ch.pipeline().addLast(new InBoundHandler1());
ch.pipeline().addLast(new InBoundHandler2());
ch.pipeline().addLast(new InBoundHandler3());
}
});
通过调用ch.pipeline().addLast这个方法将InBoundHandler1、InBoundHandler2、InBoundHandler3这三个具体的channelHandler添加DefaultChannelPipeline的链表中,这步操作完成后,DefaultChannelPipeline的链表结构如下所示:
pipline init.png
我们继续看下initChannel(ChannelHandlerContext ctx)方法,我们发现在将这三个handler放入链表中,最后调用了 remove(ctx)方法,这个方法是将实现ChannelInitializer类initChannel方法的一个实现类对应的AbstractChannelHandlerContext从链表中删除,这样操作后,DefaultChannelPipeline的链表结构就变成了下面这样。
pipline finish.png
经过上面的分析,我们就把前面两个问题给解决了,明白了channelHandler是怎么绑定到pipline中的以及pipline的数据结构是怎样的。
第三个问题关于消息是怎么由一个channelHandler传递到另一个channelHandler的,放到下一篇服务端消息处理来具体讲。
网友评论