当我们需要频繁的创建多个线程进行耗时操作时,每次通过new Thread实现并不是一种好的方式,每次新建和销毁对象性能较差,线程缺乏统一管理,可能无限制新建线程,相互之间竞争,可能占用过多系统资源导致死锁,并且缺乏定时执行、定期执行、线程中断等功能。在开发过程中,合理地使用线程池能够带来如下好处。
-
降低资源消耗;
通过重复利用已创建的线程降低线程创建和销毁造成的消耗。 -
提高响应速度;
当任务到达时,任务可以不需要等到线程创建就能立即执行。 -
提高线程的可管理性;
线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源, 还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。提供定时执行,定期执行,单线程,并发数控制等功能。
线程池原理
当向线程池提交一个任务之后,线程池 的主要处理流程如图所示。

执行流程说明:
1)线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作 线程来执行任务。如果核心线程池里的线程都在执行任务,则进入下个流程。
2)线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这 个工作队列里。如果工作队列满了,则进入下个流程。
3)线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程 来执行任务。如果已经满了,则交给饱和策略来处理这个任务。
线程池使用
Executor框架的使用示意图如图所示:

- 主线程创建实现Runnable或者Callable接口的任务对象。工具类Executors可以把一个Runnable对象封装为一个Callable对象,使用方法:Executors.callable(Runnable task)或 Executors.callable(Runnable task,Object resule)。
- 可以把Runnable对象直接交给ExecutorService执行ExecutorService.execute(Runnable command);或者也可以把Runnable对象或Callable对象提交执行Executor- Service.submit(Runnable task)或ExecutorService.submit(Callable<T>task)。
- 主线程可以执行FutureTask.get()方法来等待任务执行完成。主线程也可以执行
FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行。
线程池的创建
通常我们都不会直接通过new的形式来创建线程池,由于创建参数过程相对复杂一些,JDK给我们提供了Executors工厂类来简化这个过程。
详细创建见下文:ThreadPoolExecutor
向线程池提交任务
可以使用两个方法向线程池提交任务,分别为execute()和submit()方法。
execute()
execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。如下所示:
threadsPool.execute(new Runnable() {
@Override
public void run() {
// TODO Auto-generated method stub
}
});
submit()
submit()方法用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个 future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值,get()方 法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线 程一段时间后立即返回,这时候有可能任务没有执行完。
Future<Object> future = executor.submit(harReturnValuetask);
try {
Object s = future.get();
} catch (InterruptedException e) {
// 处理中断异常
} catch (ExecutionException e) {
// 处理无法执行任务异常
} finally {
// 关闭线程池
executor.shutdown();
}
关闭线程池
可以通过调用线程池的shutdown或shutdownNow方法来关闭线程池。它们的原理是遍历线 程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务 可能永远无法终止。
shutdownNow
首先将线程池的状态设置成 STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表。
shutdown
只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线 程。
只要调用了这两个关闭方法中的任意一个,isShutdown方法就会返回true。当所有的任务 都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true。至于应该调用哪 一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用shutdown方法来关闭 线程池,如果任务不一定要执行完,则可以调用shutdownNow方法。
合理配置线程池
要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析。
-
任务的性质
1)CPU密集型任务
CPU密集型任务应配置尽可能小的 线程,如配置Ncpu+1个线程的线程池。2)IO密集型任务
IO密集型任务线程并不是一直在执行任务,则应配 置尽可能多的线程,如2*Ncpu。3)混合型任务。
混合型的任务,如果可以拆分,将其拆分成一个CPU密集型任务 和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量 将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行分解。 -
任务的优先级:高、中和低
优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高 的任务先执行。 -
任务的执行时间:长、中和短
执行时间不同的任务可以交给不同规模的线程池来处理,或者可以使用优先级队列,让 执行时间短的任务先执行。 -
任务的依赖性:是否依赖其他系统资源,如数据库连接
依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,等待的时间越 长,则CPU空闲时间就越长,那么线程数应该设置得越大,这样才能更好地利用CPU。
线程池的监控
如果在系统中大量使用线程池,则有必要对线程池进行监控,方便在出现问题时,可以根 据线程池的使用状况快速定位问题。可以通过线程池提供的参数进行监控,在监控线程池的 时候可以使用以下属性。
属性 | 说明 |
---|---|
taskCount | 线程池需要执行的任务数量 |
completedTaskCount | 线程池在运行过程中已完成的任务数量,小于或等于taskCount |
largestPoolSize | 线程池里曾经创建过的最大线程数量。通过这个数据可以知道线程池是 否曾经满过。如该数值等于线程池的最大大小,则表示线程池曾经满过。 |
getPoolSize | 线程池的线程数量 |
getActiveCount | 获取活动的线程数 |
通过扩展线程池进行监控。可以通过继承线程池来自定义线程池,重写线程池的 beforeExecute、afterExecute和terminated方法,也可以在任务执行前、执行后和线程池关闭前执 行一些代码来进行监控。
Executor框架
Executor框架简介
Executor框架的两级调度模型
在HotSpot VM的线程模型中,Java线程(java.lang.Thread)被一对一映射为本地操作系统线 程。Java线程启动时会创建一个本地操作系统线程;当该Java线程终止时,这个操作系统线程 也会被回收。操作系统会调度所有线程并将它们分配给可用的CPU。两级调度模型如下所示:

上层,把应用分解为若干个任务,然后使用用户级的调度器(Executor)将这些任务映射到固定数量的线程;
下层,操作系统内核将这些线程映射到硬件处理器上。
Executor框架的结构
Executor框架包含的主要的类与接口如图所示:

Executor框架主要由3大部分组成如下:
(1)任务
被执行任务需要实现的接口:Runnable接口或Callable接口。
(2)任务的执行
任务执行机制的核心接口Executor,以及继承自Executor的ExecutorService接口。
(3)异步计算的结果
接口Future和实现Future接口的FutureTask类。
Runnable接口和Callable接口
Runnable接口和Callable接口的实现类,都可以被Executor执行。它们之间的区别是Runnable不会返回结果,而Callable可以返回结果。可以使用工厂类Executors来把Runnable包装成一个Callable。
- 把一个Runnable包装成一个Callable的API:
public static Callable<Object> callable(Runnable task)
- 把一个Runnable和一个待返回的结果包装成一个Callable的API:
public static <T> Callable<T> callable(Runnable task, T result)
ExecutorService
ExecutorService定义了线程池需要实现的接口,其生命生命周期包括3种状态:运行、关闭、终止。创建后便进入运行状态,当调用shutdown()方法时,便进入关闭状态,此时意味着ExecutorService不再接受新的任务,但它还在执行已经提交了的任务。当所有已经提交了的任务执行完后,就变成终止状态。
ThreadPoolExecutor
ThreadPoolExecutor构造函数如下:
public ThreadPoolExecutor(int corePoolSize, int maxinumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
参数详细说明如下所示:
参数名 | 作用 |
---|---|
corePoolSize | 线程池所保存的核心线程数,线程池启动后默认是空的,当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的线程能够执行任务也会创建线程,等到需要执行的任务大于线程核心数后就不再创建。prestartAllCoreThreads方法可以在线程池启动后立即启动所有核心线程以等待任务 |
maxinumPoolSize | 线程池允许创建的最大线程数。如果队列满了,并且已创建的线程小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是,如果使用了无界的任务队列这个参数就没有效果。 |
keepAliveTime | 当前线程池线程总数大于核心线程数时,终止多于的空闲线程的时间。如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率。 |
unit | keepAliveTime参数的时间单元,可选值有天、小时、分钟、毫秒、微秒和纳秒。 |
workQueue | 任务队列,如果当前线程池达到核心线程数corePoolSize,且当前所有线程都处于活动状态时,则将新加入的任务放到此队列中 |
threadFactory | 线程工厂,可以通过线程工厂给每个创建出来的线程设 置更有意义的名字。用户可以定制线程的创建过程,通常不需要设置。 |
handler | 拒绝策略,当线程池与workQueue队列都满了的情况下,说明线程池处于饱和状 态,那么必须采取一种策略处理提交的新任务。 |
-
workQueue有下列几个常用实现:
(1)ArrayBlockingQueue:基于数组结构的有界队列,此队列按FIFO原则对任务进行排序,如果队列满了还有任务进来,则调用决绝策略。
(2)LinkedBlockingQueue:基于链表结构的无界队列,此队列按照FIFO原则对任务进行排序。因为是无界的,根本不会满,所以采用此队列后线程池将忽略最大线程数和拒绝策略;
(3)SynchronousQueue:不存元素的阻塞队列,直接将任务提交给线程而不是将它加入到队列。每个插入的操作必须等到另一个调用移除的操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue。如果新任务到来后线程池没有任务可用线程处理,则调用拒绝策略。
(4)PriorityBlockingQueue:具有优先级的队列的无限阻塞队列,可以自定义优先级,默认是按自然排序。 -
拒绝策略有如下几种:
(1)AbortPolicy:拒绝任务,抛出RejectedExecutionException异常,线程池的默认策略。
(2)CallerRunsPolicy:拒绝新任务进入,如果该线程池还没有被关闭,那么将这个新任务执行在调用线程中。
(3)DiscardOldestPolicy:如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)。这样的结果是最后加入的任务反而可能被执行到,先前加入的都被抛弃了。
(4)DiscardPolicy:加不进的任务都被抛弃,同时没有异常抛出。
FixedThreadPool详解
FixedThreadPool被称为可重用固定线程数的线程池。下面是FixedThreadPool的源代码实现:
public static ExecutorService newFixedThreadPool(int nThreads){
ruturn new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS
new LinkedBlockingQueue<Runnable>());
}
设置它的corePoolSize和maxinumPoolSize值都是nThreads,并且设置keepAliveTime参数为0毫秒,最后设置无界队列LinkedBlockingQueue作为线程池的工作队列(队列的容量为 Integer.MAX_VALUE)。该线程池中就含有了固定个数的线程,并且能够容纳无限个任务。
FixedThreadPool的execute()的运行示意图如下所示:

使用无界队列LinkedBlockingQueue作为工作队列会对线程池带来如下影响:
1> 当线程池中的线程数达到corePoolSize后,新任务将在无界队列中等待,因此线程池中 的线程数不会超过corePoolSize。
2> 拒绝策略不会生效。
适用场景
FixedThreadPool适用于为了满足资源管理的需求,而需要限制当前线程数量的应用场 景,它适用于负载比较重的服务器。
SingleThreadExecutor详解
SingleThreadExecutor是使用单个worker线程的Executor。下面是SingleThreadExecutor的源代码实现:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService (
new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>())
); }
SingleThreadExecutor的corePoolSize和maximumPoolSize被设置为1。其他参数与 FixedThreadPool相同。运行示意图如下:

适用场景
SingleThreadExecutor适用于需要保证顺序地执行各个任务;并且在任意时间点,不会有多 个线程是活动的应用场景。
CachedThreadPool详解
CachedThreadPool是一个会根据需要创建新线程的线程池。下面是创建CachedThread- Pool的源代码:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS, new
SynchronousQueue<Runnable>());
}
CachedThreadPool的corePoolSize被设置为0,即corePool为空;maximumPoolSize被设置为 Integer.MAX_VALUE,即maximumPool是无界的。这里把keepAliveTime设置为60L,意味着 CachedThreadPool中的空闲线程等待新任务的最长时间为60秒,空闲线程超过60秒后将会被终止。
CachedThreadPool使用没有容量的SynchronousQueue作为线程池的工作队列,但 CachedThreadPool的maximumPool是无界的。这意味着,如果主线程提交任务的速度高于 maximumPool中线程处理任务的速度时,CachedThreadPool会不断创建新线程。极端情况下, CachedThreadPool会因为创建过多线程而耗尽CPU和内存资源。
运行示意图如下:

1)首先执行SynchronousQueue.offer(Runnable task)。如果当前maximumPool中有空闲线程 正在执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那么主线程执行offer操作与空闲线程执行的poll操作配对成功,主线程把任务交给空闲线程执行,execute()方 法执行完成;否则执行下面的步骤2。
2)当初始maximumPool为空,或者maximumPool中当前没有空闲线程时,将没有线程执行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这种情况下,步骤1将失 败。此时CachedThreadPool会创建一个新线程执行任务,execute()方法执行完成。
3)在步骤2中新创建的线程将任务执行完后,会执行 SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这个poll操作会让空闲线程最多在SynchronousQueue中等待60秒钟。如果60秒钟内主线程提交了一个新任务(主线程执行步骤1),那么这个空闲线程将执行主线程提交的新任务;否则,这个空闲线程将终止。由于 空闲60秒的空闲线程会被终止,因此长时间保持空闲的CachedThreadPool不会使用任何资源。
适用场景
SingleThreadExecutor适用于需要保证顺序地执行各个任务;并且在任意时间点,不会有多 个线程是活动的应用场景。
ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor继承自ThreadPoolExecutor。它主要用来在给定的延迟之后运 行任务,或者定期执行任务。ScheduledThreadPoolExecutor的功能与Timer类似,但 ScheduledThreadPoolExecutor功能更强大、更灵活。Timer对应的是单个后台线程,而 ScheduledThreadPoolExecutor可以在构造函数中指定多个对应的后台线程数。

ScheduledThreadPoolExecutor的执行主要分为两大部分。
1)当调用ScheduledThreadPoolExecutor的scheduleAtFixedRate()方法或者scheduleWith- FixedDelay()方法时,会向ScheduledThreadPoolExecutor的DelayQueue添加一个实现了 RunnableScheduledFutur接口的ScheduledFutureTask。
2)线程池中的线程从DelayQueue中获取ScheduledFutureTask,然后执行任务。

下面是对这4个步骤的说明:
1)线程1从DelayQueue中获取已到期的ScheduledFutureTask(DelayQueue.take())。到期任务是指ScheduledFutureTask的time大于等于当前时间。
2)线程1执行这个ScheduledFutureTask。
3)线程1修改ScheduledFutureTask的time变量为下次将要被执行的时间。
4)线程1把这个修改time之后的ScheduledFutureTask放回DelayQueue中(DelayQueue.add())。
ScheduledThreadPoolExecutor
适用场景
ScheduledThreadPoolExecutor适用于需要多个后台线程执行周期任务,同时为了满足资源 管理的需求而需要限制后台线程的数量的应用场景。
SingleThreadScheduledExecutor
适用场景
SingleThreadScheduledExecutor适用于需要单个后台线程执行周期任务,同时需要保证顺 序地执行各个任务的应用场景。
FutureTask
FutureTask除了实现Future接口外,还实现了Runnable接口。因此,FutureTask可以交给 Executor执行,也可以由调用线程直接执行FutureTask.run()。
根据FutureTask.run()方法被执行 的时机,FutureTask可以处于下面3种状态,如下图所示:

1)未启动
当创建一 个FutureTask,且没有执行FutureTask.run()方法之前,这个FutureTask处于未启动状态。
2)已启动
FutureTask.run()方法被执行的过程中,FutureTask处于已启动状态。
3)已完成
FutureTask.run()方法执行完后正常结束,或被取消,或 执行时抛出异常而异常结束,FutureTask处于已完成状态。
FutureTask调用get方法和cancel方法的执行示意图:

FutureTask实现
FutureTask的实现基于AbstractQueuedSynchronizer(以下简称为AQS),AQS是一个同步框架,它提供通 用机制来原子性管理同步状态、阻塞和唤醒线程,以及维护被阻塞线程的队列。
基于“复合优先于继承”的原则,FutureTask声明了一个内部私有的继承于AQS的子类 Sync,对FutureTask所有公有方法的调用都会委托给这个内部子类。AQS被作为“模板方法模式”的基础类提供给FutureTask的内部子类Sync,这个内部子类只 需要实现状态检查和状态更新的方法即可,这些方法将控制FutureTask的获取和释放操作。具 体来说,Sync实现了AQS的tryAcquireShared(int)方法和tryReleaseShared(int)方法,Sync通过这 两个方法来检查和更新同步状态。
FutureTask的设计示意图如下图所示:

-
get()/get(long timeout,TimeUnit unit)方法调用
这个操作对应acquire,阻塞调用线程,除非/直到AQS的状态允许这个线程继续执行。其具体流程为:
1> 调用AQS.acquireSharedInterruptibly(int arg)方法,这个方法首先会回调在子类Sync中实 现的tryAcquireShared()方法来判断acquire操作是否可以成功。acquire操作可以成功的条件为: state为执行完成状态RAN或已取消状态CANCELLED,且runner不为null。
2> 如果成功则get()方法立即返回。如果失败则到线程等待队列中去等待其他线程执行 release操作。
3> 当其他线程执行release操作(比如FutureTask.run()或FutureTask.cancel(…))唤醒当前线 程后,当前线程再次执行tryAcquireShared()将返回正值1,当前线程将离开线程等待队列并唤 醒它的后继线程。
4> 最后返回计算的结果或抛出异常。 -
run() 或 cancel()方法调用
这个操作对应release,改变AQS的状态,改变后的状态可允许一个或多个阻塞线程被解除阻塞。FutureTask.run()的执行过程如下:
1> 执行在构造函数中指定的任务(Callable.call())。
2> 以原子方式来更新同步状态(调用AQS.compareAndSetState(int expect,int update),设置 state为执行完成状态RAN。如果这个原子操作成功,就设置代表计算结果的变量result的值为 Callable.call()的返回值,然后调用AQS.releaseShared(int arg)。
3> AQS.releaseShared(int arg)首先会回调在子类Sync中实现的tryReleaseShared(arg)来执 行release操作(设置运行任务的线程runner为null,然会返回true);AQS.releaseShared(int arg), 然后唤醒线程等待队列中的第一个线程。
4> 调用FutureTask.done()。
参考资料:
[1] Java并发编程的艺术,方腾飞,魏鹏,程晓明
[2] Android开发进阶--从小工到专家,何红辉
网友评论