首先来张网上盛传的netty框架参考图,以供跟踪代码参考:
netty框架参考图.jpg
一段标准的Netty服务端启动代码如下:
public NettyTcpServer() {
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup(4);
bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 5)
.childOption(ChannelOption.TCP_NODELAY, true);
}
public void bind(String ip, int port) {
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("decoder", new ProtoDecoder(upLimit))
.addLast("server-handler", new ServerHandler())
.addLast("encoder", new ProtoEncoder(downLimit));
}
});
InetSocketAddress address = new InetSocketAddress(ip, port);
try {
bootstrap.bind(address).sync();
} catch (InterruptedException e) {
log.error("bind "+ip+":"+port+" failed", e);
shutdown();
}
}
Netty的服务端,一般会启动两个NioEventLoopGroup线程组(个人感觉用组比用池更准确,这里组指数组),一个为bossGroup线程组,处理客户端的连接请求;一个workerGroup线程组,用来处理IO事件。很多人都知道Netty服务端就是做这事的,今天我们就用源码来揭示这是如何实现的。
先从NioEventLoopGroup的构造方法着手:
public NioEventLoopGroup() {
this(0);
}
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
一路查看实现,会来到这里:
private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));
}
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
可见,实例化NioEventLoopGroup时,如果在这里没有设置参数,也没有在JVM参数里设置“-Dio.netty.eventLoopThreads=x”,那么这个线程组的默认线程数为CPUx2,否则为设置的参数值,最后,可以看到NioEventLoopGroup的具体实现为:
if (executor == null) {
//后续Netty对各种IO事件的处理就是通过此executor创建线程处理的
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
//创建NIOEventLoop数组
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
//实例化每个NIOEventLoop,每个NIOEventLoop公用Executor、SelectorProvider、EventExecutorChooserFactory、RejectedExecutionHandlers
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
//......
}
}
//选择器,选择由哪个NIOEventLoop处理,注意这里以children作为传参
chooser = chooserFactory.newChooser(children);
在实例化NIOEventLoopGroup时,首先创建了一个Executor,而Executor的作用就是通常被用来代替显示地创建线程的,Executor对象可以用来执行Runnable任务,该接口将“任务提交”从任务的运行机制中解耦出来,包括线程使用、调度等细节。我们看Netty中的它的实现如下,由它的execute方法中threadFactory.newThread(command).start();一句,证实了我们所说的Executor的作用,netty很有可能就是通过这句来创建它的那些IO线程的,我们不妨先猜猜,它是在什么时候执行这句的呢?(彩蛋)
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();//创建NettyIO线程,重点代码,将断点打于此处,可查看整个Netty线程启动步骤
}
再回溯到上面的NIOEventLoopGroup代码段,由children = new EventExecutor[nThreads];一句,NIOEventLoopGroup实则创建了一个new NioEventLoopGroup(4)中参数数量的NIOEventLoop数组,NIOEventLoop的实例就是通过newChild(executor, args)方法添加的,由下面的几段代码可知,每个NIOEventLoop公用Executor、SelectorProvider、EventExecutorChooserFactory、RejectedExecutionHandlers。而newChild(executor, args)方法就是做NIOEventLoop实例的初始化工作,一路跟踪我们还可发现,NIOEventLoop的任务队列用的是LinkedBlockingQueue,大小为Integer.MAX_VALUE,如果没设置JVM参数“-Dio.netty.eventLoop.maxPendingTasks=x”的话:
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
NIOEventLoop构造为:
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
selector = openSelector();
selectStrategy = strategy;
}
再看该构造中的super方法为:
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);
this.executor = ObjectUtil.checkNotNull(executor, "executor");
taskQueue = newTaskQueue(this.maxPendingTasks);
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
由以上代码我们大概知道了NIOEventLoopGroup和NIOEventLoop的实现,NIOEventLoopGroup其实是一个NIOEventLoop的数组,每个NIOEventLoop都公用了一个Executor,后续创建线程的事都由Executor来创建,由threadFactory.newThread(command).start();一句可知。
但是我们还有一个疑问,那就是Netty什么时候开始启动这些线程的呢?上述线程的启动貌似和new NioEventLoopGroup(4)中的参数也没有任何关系,那它怎么知道只创建new NioEventLoopGroup(4)中的参数个数的线程呢?我们只知道这个参数赋值给了children = new EventExecutor[nThreads];但在哪里限制了此参数的线程数量呢?即Netty怎么知道最大要启动这么多个线程呢?线程的启动数量和这个children数组有什么关系呢?
带着这些疑问,我们不妨全局搜索这个children这个变量
children全局引用.png
发现它大多数时候用于关闭和终止判断,这些看不出和启动线程数有什么关系;只有在NioEventLoopGroup的实现中这里将children传参给了一个字面意思叫选择器的类,如下:
//选择器,选择由哪个NIOEventLoop处理,注意这里children的传参
chooser = chooserFactory.newChooser(children);
启动的线程数是否和这个选择器有关呢?我们再跟进去看看
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTowEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
private static boolean isPowerOfTwo(int val) {
return (val & -val) == val;
}
private static final class PowerOfTowEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTowEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
}
发现,netty把这个NIOEventLoop数组赋值给了它的两个内部类Chooser之一,它们的唯一提供的使用方法是next()方法,看这方法实现:executors[Math.abs(idx.getAndIncrement() % executors.length)]貌似这里告诉了netty当前应使用哪个NIOEventLoop,看来可以怀疑启动的线程数量和这个next()方法有关,那只要全局搜下这个next()方法,就应该知道了
next()全局引用.png
有这么多地方调用,要是第一次看Netty源码,从next()反推过去看代码直至看到跟ServerBootstrap或NioEventLoopGroup相关的next()调用,太难找了(我第一次看就是- -),而Netty服务端启动代码中间还有那么多的源码没看,究竟是在哪里限制了线程的启动数量呢?
既然不知道在哪里限制线程数量的,也不知道何时启动线程的,但是线程的创建地方我们是找到了的,即executor的初始化地方,如下
if (executor == null) {
//后续Netty对各种IO事件的处理就是通过此executor创建线程处理的
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
点进ThreadPerTaskExecutor,根据Executor的作用“就是通常被用来代替显示地创建线程的”,我们怀疑是在这里创建线程的,那么我们何不在此打上断点呢?
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();//创建NettyIO线程,重点代码,将断点打于此处,可查看整个Netty线程启动步骤
}
打上断点后,启动ServerBootstrap,发现确实调到这里来了,由此证实了我们猜想的正确性,通过查看调用堆栈,如下图所示:
服务端启动启动boss线程.png
一下就可以发现,原来Netty第一个线程的启动是在绑定地址和端口开始的,再看里面的NioEventLoopGroup(MultithreadEventLoopGroup).register(Channel) 行: 85 这句,
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel); //先选一个NIOEventLoop出来,然后再执行它的register(channel)方法
}
它就是我们在查找next()全局引用图中(见上一图next()全局引用)的最小红框标记的MultithreadEventLoopGroup类下的register(Channel)方法,我们开始不知道netty启动线程是如何限制数量的,只怀疑这数量限制跟next()方法有关,现在前后呼应起来了,知道是这么个流程了,即Netty是在绑定服务器地址和ip时,先启动一个线程去接受客户端连接的,这个线程的启动过程就如上图所示(客户端的启动线程是在connect方法中开始的)。
再回到Netty的服务端启动代码,现在我们换种方式看源码,之前我们只看了NioEventLoopGroup实现,其余的还都没看,现在我们跳过中间那些源码,从bootstrap.bind(address).sync();中bind方法开始。
一直跟踪方法堆栈,暂且先别管其他源码,我们要先弄懂Netty是如何启动boss线程组,去接受客户端的连接,如何启动worker线程组,去处理各类IO事件。点进看register实现:
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
...
}
}
}
跟踪源码时可知NioEventLoop继承自SingleThreadEventLoop,而SingleThreadEventLoop继承自SingleThreadEventExecutor,SingleThreadEventExecutor最终又继承自Executor,所以在这里最终会进入我们刚打的断点,再看SingleThreadEventExecutor中的execute方法:
@Override
public void execute(Runnable task) {
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
}
进入startThread()最终来到这里:
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {//这里就是每个NIOEventLoop中的executor,它会执行NIOEventLoop中的executor中的threadFactory.newThread(command).start();这句
@Override
public void run() { //注意这里是线程启动后需执行的代码
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
}
}
});
}
这便创建并启动了netty的处理客户端连接的线程。线程启动后需要做什么事呢?根据我们的了解,Netty的bossGroup线程组和workerGroup线程组启动后,要分别时刻处理客户端的连接和IO事件,那么这些线程应该具备某种功能,需要时时刻刻知道是否有客户端连接或IO事件,根据以往经验,这个实现往往是用无限for循环实现的,我们只要找到哪里有for死循环的地方或类似功能的地方即可。再点进SingleThreadEventExecutor.this.run(),来到NioEventLoop的run方法,我们要找到哪里具备如此功能:
@Override
protected void run() {
for (;;) {
try {
//可以在此处打上断点,验证启动后是否会进这里无限循环
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
default:
// fallthrough
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
processSelectedKeys();
runAllTasks();
} else {
final long ioStartTime = System.nanoTime();
processSelectedKeys();
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} catch (Throwable t) {
}
}
}
一进来就发现是无限循环了,根据以往对无限循环的认知,在无限循环里往往是可以时时刻刻做某种事的,再点进processSelectedKeys()方法,查看哪里在做跟连接和处理IO相关的事了,最终找到这里:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
try {
int readyOps = k.readyOps();
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
return;
}
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
由SelectionKey.OP_CONNECT、SelectionKey.OP_ACCEPT、SelectionKey.OP_READ、SelectionKey.OP_WRITE字面猜想,很可能就是这里处理连接和IO事件的,我们先在上面run方法里的switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) 这句打上断点,然后再分别在这些if分支里再打上断点,验证我们的猜想,重新启动netty服务端:
启动netty服务端进入run方法无限循环.png
果然进入了此断点,验证了我们猜想的正确性,但这里只是启动了处理客户端连接的线程,这里还无法进入后面打的那些if分支的,因为还没有客户端请求连接和发送数据操作,因此,先放开run里的断点,我们再启动一个netty客户端(netty服务端和netty客户端分别用的是《使用Netty+Protobuf实现游戏TCP通信》的源码):
netty客户端在connect方法中启动了自己的线程.png
可见,客户端在自己的connect()方法中启动了自己的线程,请求后,客户端进入了SelectionKey.OP_CONNECT分支,即发起请求连接
客户端进入SelectionKey.OP_CONNECT分支发起请求连接.png
而服务端,则进入了SelectionKey.OP_ACCEPT分支,此时readyOps=16,接受连接请求
服务端进入SelectionKey.OP_ACCEPT分支接受连接.png
若我们提前在threadFactory.newThread(command).start()打上断点的话,如下:
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
会发现,服务端会再次(第二次)进入此断点,第一次服务端进入此断点,是启动boss线程,第二次进入是启动worker线程,若客户端还有数据发给服务端,则服务端还会再次进入SelectionKey.OP_READ分支,此时readyOps=1:
服务端再次进入SelectionKey.OP_READ读取IO数据.png
最终来个简单流程回顾:
1)首先服务端启动netty -> 服务端netty会启动boss线程;
2)客户端启动netty -> 客户端netty会启动自己请求连接线程,客户端进入SelectionKey.OP_CONNECT分支;
3)服务端进入SelectionKey.OP_ACCEPT分支;
4)服务端netty -> 启动worker线程,接受客户端的连接;
如果客户端再发送数据给服务端:
5)服务端进入SelectionKey.OP_READ,读取客户端发送的数据;
如果服务端也发送数据给客户端:
6)客户端进入SelectionKey.OP_READ,读取服务端发送的数据;
我们现在已经大致知道了是如何启动boss线程的了,那么worker线程又是如何启动的呢?超过worker线程上限,netty又如何知道不需要再启动线程了呢?
限于篇幅,将在下篇讲解,敬请期待...
网友评论