流泪撒种的,必欢呼收割。---诗 126:5
最近完成一个功能,当手机板温到达一定值时,触发系统弹出一个警示框,提示用户手机温度过高,系统自动采取降温措施,可能会引起系统流畅性降低,屏幕变暗。由于底层没有提供板温自动上报功能,因此我采取的是从上层主动间隔一段时间去查询板温节点,进而判断是否需要弹框。这里用到了线程池。
本篇以此为契机,总结下线程池的工作原理。
ThreadPoolExecutor是什么
ThreadPoolExecutor是线程池的根基,先上一张围绕ThreadPoolExecutor的类图,主要部分在红色框中。
这里写图片描述
线程池的核心类是ThreadPoolExecutor,我们平时用的最多的是通过工厂类Executors创建出需要的ThreadPoolExecutor。比如
Executors.newCachedThreadPool
Executors.newFixedThreadPool(int n)
Notes:
抽象类不一定都有抽象方法,抽象类不能被实例化,即使是一个没有抽象方法的抽象类,也同样不能被实例化,见AbstractExecutorService类
我们去Executors看对应的方法,会发现其实都是在创建ThreadPoolExecutor对象,只是给ThreadPoolExecutor传入的构造参数不同罢了。
//创建可复用thread的线程池,适用大量短周期的异步任务
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
--------------------------------------------------------------------------
//创建至多包含nThreads个可复用thread的线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
是时候来看看ThreadPoolExecutor的参数了。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
参数 | 意义 |
---|---|
corePoolSize | 线程池中保留的核心线程数,除非设置了allowCoreThreadTimeOut,否则即使核心线程空闲,其依然不会死亡,当新任务需要执行时,即使核心线程有空闲的,只要数量小于corePoolSize,依然会创建新的线程,直到核心线程数等于corePoolSize。 |
maximumPoolSize | 线程池中所能容纳的最大线程数 |
keepAliveTime | 当池中线程数大于corePoolSize时,其它非核心线程,当没有工作时,最长存活keepAliveTime的时长 |
unit | keepAliveTime的时间单位 |
workQueue | 存放待执行任务的阻塞队列,BlockingQueue是一个接口,可以根据需要选取实现BlockingQueue的阻塞队列,比如SynchronousQueue/LinkedBlockingQueue等 |
threadFactory | 线程池通过threadFactory创建线程 |
handler | 拒绝执行策略,当线程池跟workQueue都饱和的情况下,又有新的任务到来,就会触发拒绝策略。RejectedExecutionHandler是一个接口,ThreadPoolExecutor内部有现成实现。 |
线程池的好处
- 降低资源消耗。java中所有的池化技术都有一个好处,就是通过复用池中的对象,降低系统资源消耗。设想以下如果我们有n多个子任务需要执行,如果我们为每一个待执行对象都创建一个执行线程,而创建线程的过程是需要一定的系统消耗的,最后肯定会拖慢整个系统的处理速度。而通过线程池我们可以做到复用线程,任务有多个,但执行任务的线程可以通过线程池来复用,这样减少了创建线程的开销,系统资源利用率得到了提升。
- 降低管理线程的难度。多线程环境下对线程的管理是最容易出现问题的,而线程池通过框架为我们降低了管理线程的难度。我们不用再去担心何时该销毁线程,如何最大限度的避免多线程的资源竞争。这些事情线程池都帮我们代劳了。
- 提升任务处理速度。线程池中长期驻留了一定数量的活线程,当任务需要执行时,我们不必先去创建线程,线程池会自己选择利用现有的活线程来处理任务。
线程池何时退出?核心线程常驻,如果不退出岂不是一直有空线程在占资源
如果我们没有手动关闭线程池,线程池一直就会存在,池中的核心线程就会常驻其中,即使没有任何任务需要执行,核心线程也不会销毁。如此肯定会占有一定的系统资源,但是这种情况大多数是值得的,维护一个空转的线程一直等待新任务的到来,比新任务到来后在创建线程更划算。并且线程池也提供了关闭线程池的方法,必要情况下我们可以手动关闭。
来看看下面这段代码
package com.azhengye.testthreadpool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import android.app.Activity;
import android.os.Bundle;
import android.util.Log;
public class MainActivity extends Activity {
private ExecutorService executor;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
// 2个核心线程,最多创建8个线程;非核心线程没有任务可执行时,5s后自动销毁;任务队列最多容纳2个任务;任务饱和情况下直接丢弃新任务
executor = new ThreadPoolExecutor(2, 8, 5, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(2), new ThreadPoolExecutor.DiscardPolicy());
// 执行15个任务
for (int i = 1; i <= 15; i++) {
executor.execute(new TestRunable(i * 100));
}
// 60s后关闭线程池
/*try {
TimeUnit.SECONDS.sleep(60);
executor.shutdown();
Log.d("azhengye_test", "ThreadPoolExecutor exit==" + executor.isShutdown());
} catch (InterruptedException e) {
e.printStackTrace();
}*/
}
class TestRunable implements Runnable {
private int flag;
public TestRunable(int flag) {
this.flag = flag;
}
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
Log.d("azhengye_test", "thread id==" + Thread.currentThread().getId() + " flag=="
+ flag);
}
}
}
上述代码注释掉了关闭线程池的方法,那么我们待执行的15个任务都执行完毕后,线程池中的还有多少线程?
我们用如下命令来观察应用中线程分布情况
$ adb shell top -d 1 -H|grep azhengye|grep pool-1-thread
最开始的输出片段如下,可以看到池中有8个线程
9975 10007 u0_a187 20 0 0% S 986380K 39092K fg pool-1-thread-5 com.azhengye.testthreadpool
9975 10008 u0_a187 20 0 0% S 986380K 39092K fg pool-1-thread-6 com.azhengye.testthreadpool
9975 10009 u0_a187 20 0 0% S 986380K 39092K fg pool-1-thread-7 com.azhengye.testthreadpool
9975 10013 u0_a187 20 0 0% S 986380K 39092K fg pool-1-thread-1 com.azhengye.testthreadpool
9975 10022 u0_a187 20 0 0% S 986380K 39092K fg pool-1-thread-2 com.azhengye.testthreadpool
11125 11155 u0_a187 20 0 0% S 990476K 38056K fg pool-1-thread-1 com.azhengye.testthreadpool
11125 11156 u0_a187 20 0 0% S 990476K 38056K fg pool-1-thread-2 com.azhengye.testthreadpool
11125 11157 u0_a187 20 0 0% S 990476K 38056K fg pool-1-thread-3 com.azhengye.testthreadpool
11125 11158 u0_a187 20 0 0% S 990476K 38056K fg pool-1-thread-4 com.azhengye.testthreadpool
11125 11159 u0_a187 20 0 0% S 990476K 38056K fg pool-1-thread-5 com.azhengye.testthreadpool
11125 11160 u0_a187 20 0 0% S 990476K 38056K fg pool-1-thread-6 com.azhengye.testthreadpool
11125 11161 u0_a187 20 0 0% S 990476K 38056K fg pool-1-thread-7 com.azhengye.testthreadpool
11125 11162 u0_a187 20 0 0% S 990476K 38056K fg pool-1-thread-8 com.azhengye.testthreadpool
任务结束几分钟以后输出片段如下,池中仅剩余2个线程:
11125 11155 u0_a187 20 0 0% S 983060K 38228K fg pool-1-thread-1 com.azhengye.testthreadpool
11125 11162 u0_a187 20 0 0% S 983060K 38228K fg pool-1-thread-8 com.azhengye.testthreadpool
11125 11155 u0_a187 20 0 0% S 983060K 38228K fg pool-1-thread-1 com.azhengye.testthreadpool
11125 11162 u0_a187 20 0 0% S 983060K 38228K fg pool-1-thread-8 com.azhengye.testthreadpool
将注释掉的代码打开后,待任务结束几分钟后,我们将观察不到pool-1-thread-x的输出,说明我们主动关闭线程池后,池中一个线程都没有了,即核心线程也被销毁掉。
通过以上实验,我们可以明确看到不关闭线程池,就算很长时间也没有可执行的任务,核心线程还是会常驻。
开始接触线程池时,对keepAliveTime参数存疑,不知道它会不会影响到核心线程,以上也可以释疑了。
这里写图片描述
线程池如何保持核心线程常驻
我们自己创建的单个线程在执行完任务后就自己销毁了,并且也没有调用销毁线程的方法。那么问题来了,线程池中的线程执行完任务后,为什么没有被销毁,它怎么就能常驻线程池中呢?
探究这个问题之前,我们设想下可能的实现方式。肯定在线程的run方法里有一个死循环,循环不退出,线程就不会结束。因为一个线程不可能会start两次,一旦运行结束,线程就会退出销毁掉。这点可以从线程的start方法注释中看出。
* It is never legal to start a thread more than once.
* In particular, a thread may not be restarted once it has completed
* execution.
public synchronized void start() {
//...
}
任务的执行入口是execute方法,我们从该处作为分析的切入点
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//如果池中线程数<核心线程数,尝试新建一个线程执行当前任务,尝试成功就直接返回
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) //A
return;
c = ctl.get();
}
//线程池正常运行,并且成功将当前任务放入workQueue
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//任务成功入队后,可能线程池状态发生了变化,因此二次检查,二次检查结果异常则调用reject策略
if (! isRunning(recheck) && remove(command))
reject(command);
//任务成功入队后,检查发现池中线程数量为0,则调用addWorker。
else if (workerCountOf(recheck) == 0)
addWorker(null, false); //B
}
//如果待执行任务无法加入到workQueue,那么调用addWorker,如果失败,则调用reject策略
else if (!addWorker(command, false)) //C
reject(command);
}
以上过程体现了线程池执行任务的流程。我们将其转化成下面的流程图。
这里写图片描述
上述代码片段中标记的A/B/C三个地方都调用了addWorker方法来处理任务,并且三处传入的参数都不同,继续探究addWorker方法。
addWorker方法
// firstTask参数: 新的任务需要通过创建新线程来执行。如果为空则会尝试去workQueue中取任务,然后复用池中已有的线程
// core参数: 为true创建核心线程执行新任务,否则创建非核心线程执行
private boolean addWorker(Runnable firstTask, boolean core) {
retry: //retry是一个语法标记,类似goto
for (;;) {//这种写法,相对与while(true),在某些未做过优化的编译器上效率更高。
int c = ctl.get();
int rs = runStateOf(c); //获取线程池的状态,这里用到了位操作,后续在展开说明
// 条件1(rs >= SHUTDOWN): 线程池不是RUNNING状态
// 条件2(! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
// 条件2.1(rs == SHUTDOWN): 线程池处在SHUTDOWN状态
// 条件2.2(firstTask == null): 任务为空,表明要去workQueue中取任务
// 条件2.3(! workQueue.isEmpty()): workQueue不为空
// 合起来看条件2表明是: 排除线程池在SHUTDOWN状态,但workQueue里还有待执行任务的情况
// 加上条件1整个判断语句表明的是: 线程池不是RUNNING状态,并且如果线程池处在SHUTDOWN状态,除非workQueue里还有待执行的任务
// 否则addWorker失败,待执行的任务不会得到执行。
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c); //池中的线程数量
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize)) //线程池已经饱和,待执行任务无法执行
return false;
if (compareAndIncrementWorkerCount(c)) //可以执行任务,池中线程数+1。用到了CAS操作,后续展开说明
break retry;// 跳出了两重死循环
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)// 重新check 线程池状态,在从最外层循环开始
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);//创建Worker对象,一个Worker就对应一个池中的线程,注意firstTask可以为null
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
//线程池未关闭或者(线程池处在SHUTDOWN状态并且firstTask为null.)
//后一种情况说明SHUTDOWN状态下,线程池依然要去执行workQueue中的任务
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);//添加当前worker到workers集合中,workers集合作为整个线程池的一个属性
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;// 表明worker已经成功添加到池中
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();//worker开始工作了
workerStarted = true;
}
}
} finally {
if (! workerStarted)//如果worker启动失败,那么执行addWorkerFailed,该方法会1.将当前work从workers集合删除2.池中线程数量-1
addWorkerFailed(w);
}
return workerStarted;
}
上述代码分析完成,我们有一个大概的认知: 新来的任务会被分配给一个Worker对象,Worker对象生成的同时,会自我创建一个线程,任务就会在该线程中得到执行。
可能会有疑问,上述代码看来似乎一个任务会创建一个Worker,而Worker又会创建线程,那么会不会造成多少个任务就多少个线程呢?
当然不会咯,注意addWorker调用的条件,就能消除该疑问,事实上大部分任务都被放入了workQueue中,而不是来一个任务就创建一个Worker。
Worker对象
我们继续跟进Worker对象寻找答案。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
// ...
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker. */
public void run() {
runWorker(this);
}
// ...
}
上一节代码中Worker启动调用的是
final Thread t = w.thread;
//...
t.start();//worker开始工作了
//...
Worker继承自AbstractQueuedSynchronizer,重要的是实现了Runnable接口,因此t.start()
方法实际上调用的是Worker的run方法。这个模式跟下面的简化版代码本质上一样。
package com.azhengye.test;
public class Test {
public static void main(String[] args) {
Worker worker = new Worker("azhengye-thread");
worker.thread.start();
}
static class Worker implements Runnable {
private Thread thread;
public Worker(String name) {
thread = new Thread(this, name);
}
public void run() {
System.out.print("name===" + Thread.currentThread().getName());
}
}
}
所以我们需要追查runWorker方法.
runWorker方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;// 从worker中取出task,这个就是外部丢给线程池处理的任务,注意前面提过firstTask可能为空
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//task为空,或者用getTask方法从workQueue能取出非空的任务
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();//真正开始执行任务
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
看到这我们最开始的疑问,核心线程如何常驻的马上就要揭晓答案了。runWorker在线程池中的某个线程中运行,如果这里面的while循环一直不退出,那么线程也就不会退出,这不就是常驻的效果嘛。while不退出我们需要查看getTask方法
getTask方法
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
//允许核心线程超时或者池中线程数大于核心线程数
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
//允许超时的情况下,采用poll方式取任务,其取法特点是从workQueue首位取,最长等待keepAliveTime的时长,取不到返回null
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
//未设置核心线程超时,并且池中线程数量<=核心线程数,采用take方式取任务,
//其取法特点是从workQueue首位取,若workQueue为null,则进入阻塞状态,直到能取出对象为止
//没有任务执行,为什么核心线程能常驻池中的疑问揭晓了,核心线程被阻塞在该处了
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
总结核心线程能常驻线程池的原因是由于,核心线程在取任务的时候采用了阻塞队列中的take方式,它会一直等待任务到来,而非核心线程采用的是阻塞队列中的poll方式,等待一段时间后,取出的任务为空,runWorker就会退出死循环,进而也就标志着线程的run方法执行完毕,最终等待它的只有销毁。
在探究问题的同时,也发现之前不主动关闭线程池核心线程就不会退出的说法不够严谨,因为线程池有allowCoreThreadTimeOut的参数,可以通过设置它,让核心线程也能超时退出。
线程池基础原理
通过以上分析,基本弄清了线程池的工作原理。我理解的线程池原理就是将待执行任务跟执行线程分开看待,任务放在队列中,而执行线程循环从队列中取任务执行。
线程池的整体实现还是比较复杂的,下面的例子将线程池需要考虑的各种复杂情况剔除,回归到本质。希望能帮助到你对其原理的理解。
package com.azhengye.test;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class Test {
public static void main(String[] args) {
// 模拟线程池工作队列
BlockingQueue<Runnable> workqueue = new LinkedBlockingQueue<Runnable>();
for (int i = 0; i <= 10; i++) {
//模拟待执行任务,让其打印随机数
Task task = new Task();
workqueue.offer(task);//任务入队
}
// 模拟池中的线程,所有的任务都可以用它来执行,并且它只创建一次,做到了复用
Worker worker = new Worker("azhengye-thread", workqueue);
worker.thread.start();
}
static class Task implements Runnable {
public void run() {
String threadName = Thread.currentThread().getName();
System.out.print("task-" + Math.random() + " 在线程" + threadName + "中执行\n");
}
}
static class Worker implements Runnable {
private Thread thread;
private BlockingQueue<Runnable> queue;
public Worker(String name, BlockingQueue<Runnable> queue) {
this.thread = new Thread(this, name);
this.queue = queue;
}
public void run() {
System.out.print("threadname===" + Thread.currentThread().getName() + "\n");
for (;;) {
try {
queue.take().run();// take 方法,任务执行完成后,该线程还是不会退出
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
未完待续部分
线程池在concurrent包下,这个下面到处都是Doug Lea大神的身影。我只能说,TMD,为何这么吊。
本文某些地方并没有分析到位,比如
- BlockingQueue有哪些?各自的适用场景是什么?
- Executors创建的线程池都有哪些?各自的适用场景是什么?
- 线程池标记状态跟统计数量非常巧妙的利用了位操作
- 线程池出了常规的execute方法,还有submit方法,这又会涉及到能返回线程执行结果的Callable
后续一一补充完善。追赶大神的脚步,争取把concurrent包过一遍。
网友评论