(*文章基于Netty4.1.22版本)
Netty的线程模式网上很多文章都有介绍了,很多文章写得也好,加上我的表达能力不太好,这块线程模型的就不分析了,这篇文章主要讲一下Netty线程模型底层实现的细节。
线程线程,肯定就是有线程去处理的,但是Netty的线程不是简简单单用一个Thread或者ThreadPool是去实现那样的一个线程模型,其核心是一个叫做EventLoop的东西,这个可以看成是一个Thread的封装、抽象,以NIO为例,用到的就是NioEventLoop了。
上面说了EventLoop是一个Thread,在一般的应用程序中,使用Thread不会说就直接使用的,而是通过ThreadPool去使用Thread,那么EventLoop也有一个类似的东西,叫做EventLoopGroup,看名字就可以知道其是多个EventLoop的集合,也就是多个Thread的集合,也就是类似ThreadPool的概念。
源码分析
在Netty的demo中,开头需要初始化两个EventLoopGroup
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
这里分成了两个线程组,一个叫boss线程组,一个叫worker线程组(构造方法参数为空,代表创建cpu核心数*2个线程)。
有三种线程模型,单线程模型,多线程模型,主从多线程模型,Netty中代表如下:
- 单线程模型:bossGroup和workerGroup共用,设置一个线程
- 多线程模型:bossGroup设置一个线程,workerGroup设置多个线程
- 主从多线程模型:bossGroup和workerGroup均使用多个线程
注意:在Netty中,服务启动的时候,调用bind方法,会将Channel注册到一个EventLoop上,所以一般我们调用bind一次,只会创建一个线程去接收请求,即实际是使用第二种模型。
NioEventLoopGroup
看下构造方法
public NioEventLoopGroup() {
this(0);
}
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}
public NioEventLoopGroup(
int nThreads, Executor executor, final SelectorProvider selectorProvider) {
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,
final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
这里将线程数设置为0,其他的都是默认值,然后调用父类的构造方法,其父类为MultithreadEventLoopGroup
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
当没有设置的话,就设置为CPU核心是*2,然后继续调用父类的构造方法
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
// ....
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {// 初始化nThreads个线程
boolean success = false;
try {
children[i] = newChild(executor, args);// 初始化一个EventLoop
success = true;
} catch (Exception e) {
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
//....
}
}
}
// 创建一个选择器,用于从数组中选择一个EventLoop进行使用
chooser = chooserFactory.newChooser(children);
//....
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
newChild方法在NioEventLoopGroup中实现
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
EventLoopGroup在初始化的时候,会初始化多个EeventLoop,作为一个数组存在,数量是指定的或者默认cpu核心数*2
代码中chooser这个东西是干嘛的,回顾一下服务启动中有句代码,调用EventLoopGroup去注册一个Channel的时候
final ChannelFuture initAndRegister() {
// ....
ChannelFuture regFuture = config().group().register(channel);
//....
return regFuture;
}
//MultithreadEventLoopGroup.java
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
首先会调用next方法,该方法返回一个EventLoop,实现如下:
public EventExecutor next() {
return chooser.next();
}
即如何从数组中获取EeventLooph是有策略的,这个策略就是选择器chooser,再看下chooserFactory.newChooser和如何创建一个新的选择器的,由于EventExecutorChooserFactory只有一个实现DefaultEventExecutorChooserFactory,直接看下这个类的方法
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {// 数量是2的n次方
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
根据数量是否是2的n次方有两种策略,那么看下两种策略的next方法有什么不同
// ....
private final AtomicInteger idx = new AtomicInteger();
// ....
// PowerOfTwoEventExecutorChooser
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
//GenericEventExecutorChooser
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
两种算法的结果是一样的,从0开始轮询选择,但是如果是2的n次方,会选择位运算,因为这种方式性能更好,在Netty的内存分配也运用了大量的位运算,而这的前提就是在一开始已经限制了内容分配的大小为2的n次方
NioEventLoop
分析完NioEventLoopGroup后,知道NioEventLoopGroup有多个NioEventLoop,NioEventLoopGroup只是负责分配和保存NioEventLoop,实际处理事情的是NioEventLoop,看下其构造方法
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
//....
provider = selectorProvider;
// 打开一个Selector并封装成SelectorTuple
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
NioEventLoop构造方法主要是初始化selector,NIO中将Channel注册到selector上,对于Netty来说就是将Channel注册到NioEventLoop上
看下其父类SingleThreadEventLoop的构造方法
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler);
// 这个队列后面会和另外几个队列一起分析
tailTasks = newTaskQueue(maxPendingTasks);
}
父类SingleThreadEventLoop的构造方法
protected SingleThreadEventExecutor(
EventExecutorGroup parent, ThreadFactory threadFactory,
boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
this(parent, new ThreadPerTaskExecutor(threadFactory), addTaskWakesUp, maxPendingTasks, rejectedHandler);
}
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, int maxPendingTasks,
RejectedExecutionHandler rejectedHandler) {
super(parent);// 保存EventLoopGroup
// 如果为false,那么在有新任务的时候会唤醒Selector
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = Math.max(16, maxPendingTasks);//队列数量
this.executor = ObjectUtil.checkNotNull(executor, "executor");//executor负责创建线程执行任务
taskQueue = newTaskQueue(this.maxPendingTasks);//任务队列
rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler");
}
总结一下,NioEventLoop初始化的时候做了几件事:
- 初始化Selector
- 初始化任务队列
- 保存线程相关的一些属性
一开始在介绍的时候说过EventLoop是类似Thread的概念,那么EventLoop是如何与线程关联的呢?
在NioEventLoop的构造方法中,有个叫Executor的东西,这个在NioEventLoopGroup的父类MultithreadEventExecutorGroup构造方法中初始化
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
// ....
protected ThreadFactory newDefaultThreadFactory() {
return new DefaultThreadFactory(getClass());
}
看下ThreadPerTaskExecutor
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}
这个东西就是类似ThreadPoolExecutor,调用execute方法的时候,会创建一个线程去执行任务。
- 想一下,线程池中,会限制线程数量,Netty如何去限制线程数量呢?
我们已经知道,NioEventLoopGroup有限制了NioEventLoop的数量,那么这样的话,只需要NioEventLoop对应一定数量的Thread,Netty就相当于实现了线程池的作用(当然机制不一样)
NioEventLoop对应几个Thread,要看一个NioEventLoop中的ThreadPerTaskExecutor的execute方法执行了几遍,找到其使用到的地方
//io.netty.util.concurrent.SingleThreadEventExecutor.doStartThread()
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
//....
}
});
}
doStartThread调用了一次execute方法,找到doStartThread调用的地方(注意execute执行后会将初始化Thread属性,设置其为当前线程,从这里也可以看出一个EventLoop对应一个Thread)
//io.netty.util.concurrent.SingleThreadEventExecutor.execute(Runnable)
public void execute(Runnable task) {
//....
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
//....
} else {
startThread();// 这里调用了doStartThread
//....
}
//....
}
当inEventLoop为false调用startThread一次,Netty有两个地方限制了executor.execute只会调用一次,一个是doStartThread里第一句代码,当thread为空才能继续执行,另外一个是startThread方法
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
try {
doStartThread();
} catch (Throwable cause) {
//....
}
}
}
}
可以看到,这里在state为ST_NOT_STARTED的时候才会调用doStartThread方法,且这之前还会使用cas将状态变化
总结一下:
- NioEventLoop对应一个线程,其父类中也有一个Thread类型的变量
- NioEventLoopGroup包含多个NioEventLoop,起到线程池的作用
- 调用NioEventLoop的execute方法执行一个任务的时候,会委托给Executor,即ThreadPerTaskExecutor,而ThreadPerTaskExecutor执行的时候会通过DefaultThreadFactory创建一个Thread去执行这个任务
- Netty通过thread属性是否为空和一个状态位控制ThreadPerTaskExecutor的execute只会创建一次,即只会创建一个线程
- 第一次调用的时候会创建一个线程并保存到thread属性,后面调用的时候将任务放到队列中等待第一次创建的线程去执行
关系图:
![](https://img.haomeiwen.com/i10667181/41f47140b86cc694.png)
网友评论