ThreadPollExecutor的源码中有一段
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
COUNT_BITS 的十进制值是29, 它是用来干什么的呢?
RUNNING用二进制值表示是 11100000 00000000 00000000 00000000
SHUTDOWN用二进制表示是 00000000 00000000 00000000 00000000
STOP用二进制表示是 00100000 00000000 00000000 00000000
TIDYING 用二进制表示 01000000 00000000 00000000 00000000
TERMINATED 二进制表示 01100000 00000000 00000000 00000000
可见这几个值就是用一个32位的数(即Integer) 的前三位表示,
private static int ctlOf(int rs, int wc) { return rs | wc; }
那么ctl是一个什么样的值呢
可以看出来ctl的初始值 RUNNING | 0 ,结果是RUNNING 也就是 11100000 00000000 00000000 00000000
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
ThreadPoolExecutor的execute
方法中, 先获取了ctl
然后执行workerCountOf(c),我们看看workerCountOf(c)
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
private static int workerCountOf(int c) { return c & CAPACITY; }
CAPACITY的二进制值 1左移29位减去1.也就是 00011111 11111111 11111111 11111111
表示的是后29位表示容量
ctl & CAPACITY 得到的就是 后29位表示的值,
由此可知,
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
这段代码的意思是, ctl这个int值 的后29位如果小于 corePoolSize
, 就执行addWorker
看看addWorker的代码
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
//................为了方便查看,这里省略了一些代码
}
return workerStarted;
}
for循环里第二行 runStateOf
private static int runStateOf(int c) { return c & ~CAPACITY; }
~CAPACITY表示的当然就是 11100000 00000000 00000000 00000000
所以这个代码是取ctl的高三位, 也就是状态部分
下面的判断语句,
rs >= SHUTDOWN 也就是说 rs是 除了RUNNING以外的状态值, 因为RUNNING是负值,SHUTDOWN等都是正值
也就是说这里的if语句 如果当前ctl获得的状态不是RUNNING 并且 状态等于SHUTDOWN或者firstTask不为空, 或者workQueue为空 ,就直接return false
那么走下来的一定是, ctl获得的状态是RUNNING 或者 (状态不是SHUTDOWN, 且firstTask为空, 且workQueue不为空)
这种情况下走进下一个for循环
里面如果worker的数量大于最大容量CAPACITY 也就是2^29, 或者大于core/maximumPoolSize,直接返回false
否则执行CAI
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
这个方法的代码是把ctl的值原子的+1
所以结论
ctl是存放线程池状态和工作线程数量的变量
前三位存放线程池的数量.后29位存放wrokerCount, 通过移位运算和逻辑运算来获取具体的值
为什么要这么做呢?
实际上是为了在存储workCount和线程池状态的时候要保证线程安全,这样做的话就可以只用一个AtomicInteger保证了线程安全.减少了一定的资源消耗
网友评论