一 线程池中的一些重要概念
Worker / workers
Worker类是ThreadPoolExecutor类的一个内部类,也是线程池管理操作线程的核心所在。每一个worker都对应着一个thread,所以在不混淆的情况下,可以把worker理解为工作线程。
ThreadPoolExecutor有一个名为workers的成员变量,它保存了这个线程池所有存活的worker对象。
workQueue
workQueue是线程池内部用来保存待执行任务的队列。它是一个BlockingQueue<Runnable>类型的变量,在没有任务的时候,它的poll()方法会阻塞。
在一个允许超时的worker执行完任务之后,会调用workQueue.poll()取出下一个任务执行。如果没有任务,则会在这里阻塞;当阻塞时间达到超时时间后,这个工作线程会退出并销毁。
一些简单的线程池使用方法,可以看看这篇文章Android 多线程之线程池(一)
二 源码分析
ctl
ThreadPoolExecutor通过一个原子整型ctl来保存线程池的两个重要字段,workerCount和runState。workerCount即线程池工作线程的数量,而runState代表了线程池当前的状态(如:运行中、关闭、终止)。通过位运算,可以从ctl得到workerCount和runState的值,反之也可以通过workerCount和runState组合得到ctl。
//ThreadPoolExecutor.java
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
execute(runnable)
//ThreadPoolExecutor.java
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
当线程池执行一个任务会调这个方法进来,回想下之前讲过线程池的工作流程是怎么样的Android 多线程之线程池(一),再来看看源码会加深印象。
分析:
1、先判断任务是否为空,如果为空直接抛异常。
2、如果当前运行的线程小于核心线程数,那么就去addWorker(command, true),这个方式返回一个Boolen值,意思是创建一个worker是否成功,第一个参数是任务,第二个参数是创建的worker的线程是否为核心线程。
3、如果当前线程池是运作的,并且这个任务workQueue.offer(command)添加进队列,那么继续往下走,如果添加进队列失败,则去addWorker(command, false)创建一个普通线程并把任务对给他执行,如果创建失败,则拒绝这个任务。
4、如果3的条件重新取到ctl,如果这个线程池不是运作的,并且这个任务已经在任务队列移除了,那么直接拒绝;如果上述条件不成立则判断workerCountOf(recheck) == 0 线程池没有工作线程的时候,则去创建普通线程的worker。
线程池工作原理.png
addWorker(Runnable firstTask, boolean core)
//ThreadPoolExecutor.java
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
先去判断线程池的状态if (rs >= SHUTDOWN &&!(rs == SHUTDOWN &&firstTask == null && ! workQueue.isEmpty()))return false;
直接返回false有以下3种情况:
1、线程池状态为STOP、TIDYING、TERMINATED
2、线程池状态不是running状态,并且firstTask不为空
3、线程池状态不是running状态,并且工作队列为空
当返回false时则进入拒绝策略
之后去判断
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
addWorker的core的值,如果是核心线程,当前的workerCountOf大于corePoolSize则返回false;如果是非核心线程,当前的workerCountOf大于maximumPoolSize则返回false,当返回false时则进入拒绝策略
如果上述条件满足,compareAndIncrementWorkerCount(c)表示workerCountOf+1
接下来就是去创建一个worker了,相当于创建一个工作线程,之后把work添加到队列里面workers.add(w);如果workers.size() > largestPoolSize 当前的工作线程(包含未启动)大于最大工作线程,那么最大工作线程设置为队列的工作线程
if (workerAdded) {
t.start();
workerStarted = true;
}
执行work对象里面的Thread
ThreadPoolExecutor.Worker(Runnable firstTask)、runWorker(Worker)
//ThreadPoolExecutor.java
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
刚才有提及到worker可以理解为工作线程,原因是worker.thread是工作线程,而worker.firstTask为该线程处理的任务
当Worker的线程开始运行之后,会调用其run()方法:runWorker(this)
// 省略一大部分
final void runWorker(Worker w) {
Runnable task = w.firstTask;
while (task != null || (task = getTask()) != null) {
try {
task.run();
} catch (Exception x) {
}
}
}
// 省略一大部分
private Runnable getTask() {
for (;;) {
if (/*无法获取任务*/) {
return null;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
} catch (InterruptedException retry) {
}
}
}
为了便于观看,只留了核心的几行,如果传入的task不为null,则执行相应的run方法;当传入的task为null,则会从getTask()方法中获取runnable对象
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
} catch (InterruptedException retry) { }
这里面通过timed变量判断是使用poll还是task
poll:如果有值,则立马返回。否则等待一段时间,该时间内阻塞,时间结束后,队列还是没有元素则返回null。
task:如果有值,则立马返回。否则一直阻塞。
什么时候返回null,不让worker继续存活了呢?
1、线程池被shutdown,并且任务队列空了;
2、线程池超容量;
3、超时;
网友评论