在上篇文章《Netty的启动过程一》中,我们讲述了Netty服务端boss线程的启动过程,但是worker线程是如何启动的还是未知的。我们知道了boss线程是在ServerBootstrap的bind方法中启动的,再回到上篇文章中Netty的启动代码段,在NioEventLoopGroup的初始化方法和ServerBootstrap的bind方法中间还隔了很多代码,这些源码都还没看的,我们现在来看看这些源码。
继NioEventLoopGroup初始化后,服务端便创建了一个ServerBootstrap实例,这个类是服务端Netty特有的启动类,客户端的为Bootstrap;接下来便把boss线程组和worker线程组分别赋给了ServerBootstrap的group和childGroup变量,注意worker线程组是赋给了childGroup;接下来便是设置一些参数,比如channel,option,childOption,handler,childHandler,注意带child的和没带child的区别:带child的基本是设置 ServerChannel 的子 channel 的选项,即没带child的基本都是对boss线程而言的,而带child的基本都是对worker线程而言的。
这里需要注意channel(NioServerSocketChannel.class)一句,它是指设置boss线程channel类型。
接下来要了解下Netty的ChannelPipeline和ChannelHandler的关系了,这里引用《游戏之网络进阶》的一幅图:
数据在ChannelPipeline中流程.png
pipeline 是一个负责处理网络事件的职责链,负责管理和执行 ChannelHandler,即负责消息入站和出站的流程。
在上篇文章中,我们知道了在启动boss线程后,虽然boss线程在for循环中无限循环,但是是没有进入到后面的if分支的SelectionKey.OP_ACCEPT中的,只有先进了这里,才会启动服务端的worker线程:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
try {
int readyOps = k.readyOps();
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
return;
}
}
...
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
因此,我们再把断点打在threadFactory.newThread(command).start()中,然后启动客户端去连接服务端,看下它的调用堆栈是怎样的:
客户端连接启动worker线程.png
从上篇可知,当每次有客户端连接时,此时readyOps=16,继而启动worker线程;每次读取客户端数据时,此时readyOps=1,继而worker线程读取数据;很明显,Netty是以readyOps的值区分连接和读写数据的,那么readyOps又是如何设置的呢?看代码,readyOps取自于SelectionKey,而SelectionKey取自于SelectionKey[]数组,而SelectionKey[]
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
来自selectedKeys.flip(),flip()实现如下:
SelectionKey[] flip() {
if (isA) {
isA = false;
keysA[keysASize] = null;
keysBSize = 0;
return keysA;
} else {
isA = true;
keysB[keysBSize] = null;
keysASize = 0;
return keysB;
}
}
即SelectionKey[]来自keysA或keysB地址,而上述processSelectedKeys方法处于NioEventLoop的无限循环中,即boss线程(worker线程)的无限循环中:
@Override
protected void run() {
for (;;) {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
...
}
processSelectedKeys();
}
}
也就是说,boss线程在无限循环SelectionKey[]即keysA或keysB的值,当读到SelectionKey不为空时,也就读到了readyOps值,根据readyOps值,就知道客户端是什么操作了,证据如下:
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
for (int i = 0;; i ++) {
final SelectionKey k = selectedKeys[i];
if (k == null) {
break;
}
selectedKeys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
}
}
}
现在知道了worker线程启动和读写数据跟这个readyOps值有关,那这个值又是如何设置进去的呢?我们已知SelectionKey[]来自于keysA或keysB,那么我们全局搜索这两个变量看怎么用的,就知道它是如何设置值的了。
keysA全局引用.png
可见,keysA或keysB唯一设置值的地方是在add方法中,因此我们在add方法中打上断点,启动客户端去连接,就应该知道SelectionKey[]值是如何设置的了。
SelectionKey[]设置SelectionKey的readyOps为16.png
果然,当客户端请求连接服务端时,在boss线程中,进入了此断点,而且SelectionKey的readyOps设置成了16,后续在processSelectedKey方法中,boss线程就是根据此readyOps值再启动worker线程的。而且由调用堆栈可知,它正是在boss无限循环的run()方法中进入了select(wakenUp.getAndSet(false))方法,查询是否有就绪的IO事件(读写,连接等),有即设置keysA或keysB的SelectionKey值。而这些SelectionKey值是Netty监听到了这些IO事件,封装进SelectionKey的。根据操作系统的不同而封装过程不同。
Netty 基于 Selector 对象实现 I/O 多路复用,通过 Selector 一个线程可以监听多个连接的 Channel 事件。
当向一个 Selector 中注册 Channel 后,Selector 内部的机制就可以自动不断地查询(Select) 这些注册的 Channel 是否有已就绪的 I/O 事件(例如可读,可写,网络连接完成等),这样程序就可以很简单地使用一个线程高效地管理多个 Channel 。
摘自:《新手入门:目前为止最透彻的的Netty高性能原理和框架架构解析》
同理,当客户端发数据给服务端时,也进入了此断点,而且SelectionKey的readyOps设置成了1,只是此时是在worker线程中了。
SelectionKey[]设置SelectionKey的readyOps为1.png
现在知道了worker线程启动的原因,但是过程是怎样的呢?
我们仍在threadFactory.newThread(command).start()处打上断点,由上篇可知,第一次进入此断点,Netty启动了boss线程,第二次进入此断点即启动了worker线程,现在我们来看下第二次进入此断点的情况(请查看上图->客户端连接启动worker线程.png):
由图片堆栈打印所知,在boss线程中,首先由readyOps=16,进入了NioMessageUnsafe.read()方法,如下:
@Override
public void read() {
...
do {
//读取SocketChannel消息/事件,封装进readBuf,实际上是封装worker线程channel,供后续worker线程注册此channel
int localRead = doReadMessages(readBuf);
...
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
//处理readBuf事件,实际上是为worker线程添加新channel,初始化childHandler,pipeline及参数等信息
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
}
这里有两个重要方法,一为doReadMessages(readBuf),主要是封装NioSocketChannel,以供worker线程添加channel和监听SelectionKey.OP_READ事件用:
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = javaChannel().accept();
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
return 0;
}
再看boss线程中NioSocketChannel继承关系:
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}
NioSocketChannel继承自AbstractNioByteChannel,注意在这里先定义了SelectionKey.OP_READ操作,以供worker线程监听此事件:
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);//为后续worker线程监听SelectionKey.OP_READ事件
}
另一为pipeline.fireChannelRead(readBuf.get(i))方法,在经历NioServerSocketChannel的pipeline中首尾handler的read方法,最终来到了ServerBootstrapAcceptor的
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
for (Entry<ChannelOption<?>, Object> e: childOptions) {
if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
}
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
}
由此,在childGroup.register(child)中,注册了此channel(NioSocketChannel),并设置了pipeline,参数等其他信息。
boss线程中的childGroup.png
此后,在后续的register方法中,由eventLoop.execute方法,启动了worker线程,也是由MultithreadEventLoopGroup中的register方法,以next()限制了worker线程数量。
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
AbstractChannel.this.eventLoop = eventLoop;//将channel和eventLoop关联起来,即将channel和worker线程关联起来
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
}
}
并在register0方法中,将netty的niochannel绑定到java原生的selectkey参数上,并告知worker线程pipeline各handler channel的注册和激活事件。
private void register0(ChannelPromise promise) {
try {
boolean firstRegistration = neverRegistered;
doRegister();//将netty的niochannel绑定到java原生的selectkey参数上
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();//告知pipeline中各handler有channel注册
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();//告知pipeline中各handler有channel激活
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
}
}
看doRegister()方法,在AbstractNioChannel下内部抽象类AbstractNioUnsafe的doRegister()方法中:
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
//如果触发了读事件的SelectKey,netty通过调用 SelectKey的attachment()方法就可以获取channel了
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
return;
} catch (CancelledKeyException e) {
}
}
}
将netty的channel绑定到java原生的selectkey参数上,如果触发了读事件的SelectKey,netty通过调用 SelectKey的attachment()方法就可以获取channel了(见processSelectedKeysOptimized方法k.attachment())。
现在,worker线程如何启动的也知道了,那么worker线程是如何读取数据的呢?
这次,我们把断点打在if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0)一句,然后启动客户端连接服务端并给服务端发数据,这时堆栈为:
服务端读取客户端数据时
把该图与上面“客户端连接启动worker线程.png”对比,启动worker线程前,readyOps=16,此时是在boss线程中,实际用的unsafe是NioMessageUnsafe.read();读取客户端数据时,readyOps=1,此时是在worker线程中,实际用的是NioByteUnsafe.read()。此后,经历worker线程的pipeline,将数据发至用户自定义的handler,这便完成了对客户端数据的读取。
那NioMessageUnsafe是如何来的呢?
其实NioMessageUnsafe来自ServerBootstrap的bind方法,跟下去,在AbstractBootstrap的initAndRegister()方法中,调用channelFactory.newChannel()方法用反射实例化了boss线程的NioServerSocketChannel。
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
...
}
ChannelFuture regFuture = config().group().register(channel);
...
return regFuture;
}
证据如下,在初始化ServerBootstrap时,有这样一句bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class),它是指设置boss线程channel类型。
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
在上面设置了ServerBootstrap的channelFactory,反射类为NioServerSocketChannel,再以newChannel()方法实例化了NioServerSocketChannel,最终会来到这里:
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
在这里设置了boss线程将监听SelectionKey.OP_ACCEPT事件,再看它的super方法,NioServerSocketChannel继承自AbstractNioMessageChannel,而AbstractNioMessageChannel也继承自AbstractNioChannel,AbstractNioChannel又继承自AbstractChannel,最终也会来到这里:
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
在这里unsafe = newUnsafe(),调用本身抽象方法newUnsafe()实例化了本身Unsafe属性,从以上的继承关系链中有个AbstractNioMessageChannel类,因此此处实际调用的是AbstractNioMessageChannel的newUnsafe() 方法,该方法中new了一个内部类NioMessageUnsafe实例,该内部类继承了AbstractNioUnsafe。NioMessageUnsafe即来自于此。
@Override
protected AbstractNioUnsafe newUnsafe() {
return new NioMessageUnsafe();
}
private final class NioMessageUnsafe extends AbstractNioUnsafe {
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {
...
do {
//读取SocketChannel消息/事件,封装进readBuf,实际上是封装worker线程channel,供后续worker线程注册此channel
int localRead = doReadMessages(readBuf);
...
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
//处理readBuf事件,实际上是为worker线程添加新channel,初始化childHandler,pipeline及参数等信息
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
}
}
NioByteUnsafe又是如何来的呢?
其实NioByteUnsafe来自于NioMessageUnsafe.read()方法,该方法中有两个重要方法之一doReadMessages(readBuf),作用主要是封装NioSocketChannel,以供worker线程添加channel和监听SelectionKey.OP_READ事件用,我们在前面将它跟踪至了AbstractNioByteChannel,继续跟下去会发现AbstractNioByteChannel又继承自AbstractNioChannel:
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
ch.configureBlocking(false);
}
AbstractNioChannel继承自AbstractChannel:
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
在这里unsafe = newUnsafe(),调用本身抽象方法newUnsafe()实例化了本身Unsafe属性,从以上的继承关系链中有个NioSocketChannel类,因此此处实际调用的是NioSocketChannel的newUnsafe() 方法,该方法中new了一个内部类NioSocketChannelUnsafe实例,该内部类继承了NioByteUnsafe。NioByteUnsafe即来自于此。
@Override
protected AbstractNioUnsafe newUnsafe() {
return new NioSocketChannelUnsafe();
}
private final class NioSocketChannelUnsafe extends NioByteUnsafe {
}
这样,worker线程的启动过程也讲完了。
包括上篇文章《Netty的启动过程一》,大致讲解了Netty服务端是如何启动boss线程和worker线程的,如何读取数据的,但也仅是主要的枝干代码,细节之处还有很多没讲全,还有很多重要组件,它们的功能及实现都没讲的。这两篇文章的主要目的,是以一个Netty新手的角度讲解如何看Netty源码,那就是大胆去猜,去验证,去查资料,去看别人思路,还有就是多打断点去调试,不要想着一次全搞懂,而是多看多查多验证去弥补以前没看到的,没看懂的,并不断纠正以前错误认识的,所谓Netty之大,一锅炖不下,其余的只能在后续文章慢慢讲解了,这里先弄懂个大概即可。
网友评论