今天看到别人写的@Async
注解的文章,发现自己对java线程池的工作原理有点记不太清了,再翻出源码记录一下吧。
jdk版本:1.8.0_191
核心代码在ThreadPoolExecutor#execute()
方法中。
先上此方法的注释:
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
这个注释把方法入参command的工作流程写的非常清楚,看完再读代码就非常轻松了。简单的翻译一下。
task 通过以下三步被处理:
1. 如果线程池中当前线程数量小于 corePoolSize 的值,那么尝试开启一个新的线程,并将入参command作为它的firt task。调用addWorker方法时,会自动检查runState以及workerCount, 当它不能新增线程的时候,就返回false来避免错误。
2. 如果一个任务可以成功的进入等待队列,那么我们仍然需要再次检查到底是需要新增一条线程(因为存在现存的线程挂掉的情况)还是说自从接收这个任务后线程池关闭了。所以再次检查state,如果线程池关闭了,就回滚入队操作,拒绝任务;如果线程数为0,就新建一条线程。
3. 如果task并不能成功入队,那么就尝试新建一条线程。如果这个操作失败了,那么线程池就是处于关闭或者饱和的状态,所以就拒绝掉这个task。
核心代码如下:
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { // 对应step1
if (addWorker(command, true)) // 二参为true,以corePoolSize为边界
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { // 对应step2
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false); // 注意这里firstTask是null。因为走到这里时,task已经在queue里
}
else if (!addWorker(command, false)) // 对应step3。二参为false,以maximumPoolSize为边界
reject(command);
可以简单的总结下流程:
当提交一个新的task时,
- 当前线程数小于corePoolSize时,尝试addWorker;
- 大于或者addWorker失败,且线程池没关闭就尝试入队
入队成功,二次检查:
2.1 线程池已关闭,回滚入队操作并拒绝task;
2.2 线程池中没有线程就新增一条线程到线程池 - 入队失败则尝试新建一个线程接纳任务,还是不行就拒绝任务。
关键方法:addWorker(Runnable firstTask, boolean core)
分成两部分来看:
- retry 部分
- 成功增加线程数后的操作
前半部分
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false; // 简单理解为线程池已关闭吧。还要再去看看线程池的生命周期
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
注释也挺清楚的,也可以分为内外两个循环来看。
外层循环主要是获取线程池的生命周期状态并判断是否直接返回false;
内层循环主要是cas来设置线程数,因为要+1嘛。如果cas成功,跳出retry执行后半部分的代码;如果cas失败,则判断下线程池的状态是否发生过变化,没变就继续内层循环cas,变化了就去外层循环判断线程池状态是否应该退出。
后半部分
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
这部分就是在线程数加完1后,做的后续操作。封装Worker,启动线程。注意这部分为了保证多线程下的正确性,使用了reentrantLock进行同步。有一点要说一下,按理来说workerCount+1的操作和创建worker并加入workerSet理论上应该做成原子性的过程,但这里是分开的。但是,这里通过finally中的addWorkerFailed方法进行回滚,保证了在后半部分产生异常时的正确性。
另外我想这里这么搞,主要是为了减小锁的粒度,尽可能多的把不需要放在同步块里的代码剥离出来,提高性能。毕竟jdk并不仅仅为了能用或者正确就行。
网友评论