一、线程池简介
Java并发编程中,我们常常使用以下两种方法来开启一个新的线程来并发完成某些任务:
- 写一个Thread类的子类,并重写run方法,再在主线程中调用子类的start方法,开启线程:
public class MyThread extends Thread {
...
@Override
public void run() {
doMyTask();
}
...
public static void main(String[] args) {
Thread t1 = new MyTread("ThreadName");
t1.start(); //启动线程
}
}
- 写一个类实现Runnable接口,并实现run方法,主线程中创建一个Thread实例,在构造方法中传入Runnable实现类的实例,最后调用start方法,启动线程:
public class MyRunnable implements Runnable {
...
public void run() {
doMyTask();
}
...
public static void main(String[] args) {
MyRunnable run1 = new MyRunable();
Thread t2 = new Thread(run1); // 构造方法中传入自定义的Runnable
t2.start();
}
}
由于Java的单继承机制,方法1导致MyThread类的父类只能是Thread,不方便扩展,同时任务只能执行一次。方法2通过实现接口来构造一个可重复被线程执行的Runnable实现类,实现方法更加灵活。
但这两种方法都不可避免地在主线程中显式地创建了线程对象,并在run方法执行完成后,会自动销毁线程(注意:是线程,不是线程对象)。然而线程的创建和销毁是非常消耗资源的,在实际的开发中,如果仅仅为了执行一次任务就创建一个线程,用完即销毁,无疑是一种浪费。为此,线程池的技术应运而生。
类似数据库的连接池,程序每次需要接入数据库时,就从连接池中获取一个空闲连接,使用完成后再归还给连接池。线程池管理并提供程序并发执行任务所需的线程资源。本文主要介绍有关Java线程池的基本原理以及常用的线程池对象实例。
二、线程池的核心类——ThreadPoolExecutor类
通过如下方法,可以方便地构建一个固定大小的线程池实例。调用execute方法,传入(提交)Runnable对象执行任务。
public class MyRunnable implements Runnable {
...
public void run() {
doMyTask();
}
...
public static void main(String[] args) {
MyRunnable run1 = new MyRunable();
int threadPoolSize = 5; // 固定大小线程池的容量
ExecutorService exec = Executors.newFixedThreadPool(threadPoolSize);
exec.execute(run1); // 执行任务
}
}
进入Executors类中,可以看到返回线程池实例的静态方法newFixedThreadPool调用的是java.util.concurrent.ThreadPoolExecutor类的构造方法,并返回一个ExecutorService实例。ThreadPoolExecutor类是java线程池最核心的类,通过传入其构造方法的参数不同,JDK实现了多种类型线程池。虽然ThreadPoolExecutor提供了四个构造方法,但调用的都是参数最多的那一个:
public ThreadPoolExecutor(int corePoolSize, // 核心线程数量
int maximumPoolSize, // 最大线程数量(核心+工作)
long keepAliveTime, // 空闲工作线程的存活时间
TimeUnit unit, // 时间单位
BlockingQueue<Runnable> workQueue, // 工作队列
ThreadFactory threadFactory, // 线程的创建工厂
RejectedExecutionHandler handler // 拒绝策略)
2.1 核心线程与工作线程
线程池中的线程分为核心线程和工作线程。corePoolSize为允许的最大核心线程数量,maximumPoolSize-corePoolSize为工作线程最大数量。核心线程与工作线程的区别就像一个公司的正式员工和临时工,公司会一直“养”着正式员工工作,当工作太多正式员工不够时,才会聘请临时工。
大部分线程池都会维护一定数量的核心线程(Cached线程池里都是工作线程,后面会介绍),当所有核心线程都忙碌,且工作队列已满,若再有任务提交的话,就会在maximumPoolSize允许的范围内,创建工作线程来执行任务(任务提交的具体执行步骤后面会提)。若有工作线程空闲了,没事做,并超过了keepAliveTime(单位TimeUnit)时间,则会自动销毁,就像公司会把多余的临时工开除一样。
通过设置类中的allowCoreThreadTimeOut变量为true,也可以令核心线程空闲超过keepAliveTime的时间后也自动销毁。
2.2工作队列workQueue
当调用线程池的execute方法传入一个Runnable对象时(本文中称“提交一个任务”),并不会立即执行该任务,而是将任务放置进一个阻塞队列(BlockingQueue)中,线程池中的线程会从队列中取出任务,执行,再取新任务。这是一种生产者-消费者模式,目的是将任务的提交和执行解耦开来。基本的工作队列有三种:无界队列,有界队列和同步移交(Synchronous Handoff)。工作队列一般要与线程池的大小搭配选择。
LinkedBlockingQueue类在Executors中用作newFixedThreadPool和newSingleThreadPool的工作队列。这两个工厂方法调用LinkedBlockingQueue()来创建一个阻塞队列实例。从LinkedBlockingQueue类的构造方法中可以看出,队列容量为Integer.MAX_VALUE
,近乎“无穷大”,此时LinkedBlockingQueue类是用作无界队列。
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity; // 设置队列容量
last = head = new Node<E>(null);
}
ArrayBlockingQueue、有界LinkedBlockingQueue以及PriorityBlockingQueue是常用的有界队列,能够避免任务的无限制提交导致的OOM(OutOfMemory异常)。ArrayBlockingQueue和LinkedBlockingQueue是FIFO队列(公平),消费者线程按照任务的阻塞顺序来从队列中取出任务执行。PriorityBlockingQueue中的元素具备优先级顺序,默认根据自然顺序排列,或者在构造方法中传入的Comparator来定义。
SynchronousQueue是一种同步移交队列。这种队列非常特殊,内部并不维护一个实际的队列,而是将生产者线程提供的任务直接转交给消费者线程进行执行。这个队列用于构造newCachedThreadPool,这是一个“无限大”的线程池,提交的任务会“放”进SychronousQueue队列中,如果有空闲线程,则直接转交并执行,否则新建一个线程执行刚提交的任务。当然也可以在定制的ThreadPoolExecutor中使用同步移交,但是若线程池已满,则任务会被拒绝策略处理。
举个例子描述下三种阻塞队列的不同。有AB两组服务生负责洗盘子,A组负责把盘子冲水洗干净,B组负责把盘子擦干水。A组把盘子洗干净后一个个放在盘子架上,B组从盘子架上取出盘子擦干净。这个盘子架的容量是否有限,就说明是无界队列或者有界队列。同步移交不同之处在于,A组洗完盘子后,就看看有没有B组空闲的服务员,有的话就把盘子直接交给他让他擦干净,没有的话就拿在手上等待是否有空闲的B组服务员,一段时间后还是没有空闲服务员,于是就把盘子丢进垃圾桶里。整个过程没有将盘子摆放在盘子架上或者取出的操作。
2.3 拒绝策略
拒绝策略,又称饱和策略。故名思意,是指线程池达到饱和状态后,如果再有新任务提交,此时的处理策略。线程池达到饱和状态,是指线程数量最大,并全部处于忙碌状态,且工作队列被添满的状态。所有的拒绝策略类都实现了RejectedExecutionHandler接口,这个接口只有一个方法rejectedExecution
,用来执行策略。常用的拒绝策略有如下几种:
AbortPolicy是默认的拒绝策略。该策略会抛出一个非检查异常,以供调用者捕获,并进一步处理。
// ThreadPoolExecutor的源码
/**
* The default rejected execution handler
*/
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
...
// AbortPolicy类的定义
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
// 直接抛出一个unchecked异常
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
DiscardPolicy策略会“一声不吭”地抛弃提交的新任务,从源码中可以看出,实现的rejectedExecution方法为空,不做任何处理。
/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
/**
* Does nothing, which has the effect of discarding task r.
*
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
DiscardOldestPolicy策略将队列中“最老”的任务抛弃,并重新提交新的任务。从源码中可以看出,即poll工作队列队列头元素,然后再次提交新任务。如果在工作队列具备优先级,则抛弃优先级最高的任务。
/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll(); // 抛弃队列头元素
e.execute(r); // 重新提交新的任务
}
}
}
CallerRunsPolicy策略的思想是:谁提交的谁运行。当A线程提交一个新任务时,发现线程池已经饱和,则转向执行拒绝策略的rejectedExecution方法。CallerRunsPolicy的rejectedExecution方法首先判断了线程池是否关闭,然后直接调用run方法执行任务。此时执行任务的是A线程,在处理完成之前,A线程不能再提交新的任务。这样的设计能够降低生产者线程任务提交的速度,给消费者线程更多的时间处理任务。
图2.1 rejectedExecution调用层级public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) { // 判断线程池是否关闭
r.run(); // 直接执行任务
}
}
}
三、执行任务的核心——execute方法
3.1 ThreadPoolExecutor的超类结构
ThreadPoolExecutor类使用生产者-消费者的设计模式,将任务的提交和执行解耦开。ThreadPoolExecutor类的超类结构如下所示。
图3.1 ThreadPoolExecutor超类结构层级该类继承自AbstractExecutorService抽象类(以下简称AES),AES类由实现了ExecutorService接口,该接口又继承自Executor。其中Executor接口中只有一个execute(Runnable command)
方法。
ExecutorService接口继承自Executor接口,扩展了提交Callable任务的submit方法、批量提交任务的invoke*方法以及5个管理线程池生命周期的方法。
图3.2 ExecutorService接口的内容submit方法与execute方法是java线程池中两个常用的单任务提交方法,其中submit方法在AbstractExecutorService类中有相关的实现,从源码中可以看出,submit方法对提交的对象进行了一定的封装之后,最终还是调用execute方法执行任务。
图 3.3 AbstractExecutorService中submit方法的实现而有关execute方法的实现,都是在AbstractExecutorService类的子类或者ThreadPoolExecutor类的子类中进行实现的。由此可见,这个方法是整个线程池执行任务的核心方法。
图3.4 execute方法的实现3.2 execute方法原理
先大致描述下Java线程池的一个任务的提交流程
图3.5 Java线程池提交任务本文在拒绝策略那一节描述了一个任务会在什么情况下走到被拒绝策略执行的这一步。由上图可知,主要要满足3个条件:1、核心线程池已满。2、工作队列已满。3、线程数量达到最大(不能再添加线程了)。提交任务后,程序按照1、2、3的顺序来判断条件,只要其中一个条件不满足,则该任务则有望被成功执行。之前学习时对为什么先判断工作队列是否已满,再判断线程数量是否最大这样的顺序有过困惑,后来想想应该是为了优先让核心线程完成工作,减少创建和销毁工作线程的消耗。
ThreadPoolExecutor类对executor方法的实现如下:
public void execute(Runnable command) {
if (command == null) // 非空判断
throw new NullPointerException();
int c = ctl.get(); // 线程池状态,一个AutomicInteger变量
// 1、判断线程数量是否小于核心线程数
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2、判断线程池是否是运行状态,并尝试将任务添加进工作队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 重新检查线程池状态,如果非运行状态,则移除之前的任务
if (! isRunning(recheck) && remove(command))
// 执行拒绝策略
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3、判断是否可以添加工作线程来执行任务
else if (!addWorker(command, false))
reject(command);
}
代码可以分成三个部分,即对上述三个条件进行判断(从该方法的注释也可以看出,参考ThreadPoolExecutor类源码1335~1354行)。
首先获取线程池状态,判断当前线程数量是否小于核心线程数量,若满足则使用addWorker方法添加核心线程。addWorker方法的第二个参数为true表示创建一个核心线程。若addWorker方法执行成功,则退出execute方法。
若以上核心线程数量已满,或者addWorker方法失败,则再次获取线程池状态,并在第二部分代码检查线程池是否为运行状态,并尝试将任务添加进工作队列。工作队列提供offer和add方法来添加任务。这两个方法是BlockedQueue接口的方法,区别在于前者返回true或false来表示执行成功与否,后者成功返回true,失败则抛异常。若成功添加任务进工作队列,还需根据二次判断线程池状态(参考1344~1349行注释可知,这是为了并发考虑的)的结果,来决定是回滚这个这个任务添加过程,即移除(remove方法)刚刚添加的任务,并交给拒绝策略处理(reject方法),还是addWorker(null, false)
(这里不太明白,待以后完善)。
如果工作队列已满,则尝试使用addWorker方法添加工作线程(第二个参数传false)。如果失败,就只能使用reject方法将该任务交给拒绝策略处理。
execute的关键方法是addWorker方法。就像方法名所述,新建的线程被包装成一个Worker类并添加进一个Set集合中来执行任务。Worker是ThreadPoolExecutor类的一个final内部类。代码片段如下:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
{
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
Worker类有点意思,它继承自AbstractQueuedSynchronizer类(AQS)(《Java并发核心类——AbstractQueuedSynchronizer类》)。它的这个“爹”很有背景,是FutureTask类、ReentrantLock类、信号量Semaphore类以及闭锁CountDownLatch类等很多并发工具类的底层实现,可以说java.util.concurrent包下绝大多数类都与AQS有一腿。在此就不继续深入了。
ThreadPoolExecutor类直接使用Worker对象来执行任务。Worker类实现了Runnable方法,并且封装了一个Thread对象,一个Thread对象对应一个Worker,如果由于抛出Exception导致线程死亡,对应的Worker对象同时也会从线程池中清除。Worker的run方法调用runWorker方法,该方法会先将该Woker对象lock,然后进入一个while循环,调用有阻塞功能的getTask方法不断从阻塞队列中取任务,并进行执行,直到发生异常或者getTask返回null。实际上调用Runnable任务的run方法前后,还调用了beforeExecute和afterExecute方法,并进行了比较复杂的异常处理,有兴趣可以参考源码,满精巧的。
至此,在回顾下本节开头所述的任务提交流程,应该会更加清晰、易于理解。
四、常见的几种线程池
平时开发中一般通过调用java.util.concurrent.Executors类的静态工厂方法,根据需求创建所需的线程池实例。常用的有以下几种:
4.1 固定容量线程池
通过newFixedThreadPool(int nThreads)
方法或newFixedThreadPool(int nThreads, ThreadFactory threadFactory)
创建固定容量线程池。传入的int参数即为核心线程数量又是最大线程数。线程工厂可以指定。
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads, // 核心线程数量与最大线程数量一致
0L, TimeUnit.MILLISECONDS, // 线程永不“过期”,一致存活
new LinkedBlockingQueue<Runnable>(), // 工作队列为无界队列
threadFactory); // 指定线程工厂,若这个参数没有,则使用默认线程工厂
}
由于最大线程数量与核心线程数一样,因此固定线程池中所有的线程都是核心线程,且在停止线程池之前一直存活。当线程数量最大,且全部忙碌时,任务被添加进工作队列。由于是无界队列,则任务可以“无限制”添加。
4.2 Cached线程池
Cached线程池使用CachedThreadPool方法创建。可选是否指定线程工厂
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, // 核心线程数0,
60L, TimeUnit.SECONDS, // 空闲线程存活时间60s
new SynchronousQueue<Runnable>()); // 同步提交
}
Cached线程池核心线程数为0,线程最大数量“无限”,因此Cached线程池中都是工作线程,且空闲线程存活时间为60s。阻塞队列为同步提交队列,即任务提交必须有一个空闲线程来接才能成功。由于Cached线程池“无限大”,因此任务也可以无限制提交,并且不需要排队执行。
4.3 单线程线程池
这个线程池通过newSingleThreadExecutor方法创建,核心线程数与最大线程数都指定为1,并搭配无界工作队列使用。因此提交的任务一FIFO的顺序一个个执行。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1, // 核心线程数与最大线程数为1
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>())); // 无界的工作队列
}
4.4 定时线程池
顾名思义,定时线程池可以规定任务何时执行。使用newScheduledThreadPool方法创建,该方法返回一个ScheduledThreadPoolExecutor类实例。这个类是ThreadPoolExecutor类的子类,并实现了ScheduledExecutorService接口,这个接口呢又是ExecutorService接口,增加了一系列的schedule**定时调度方法。
// Executors工厂方法
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
// ScheduledThreadPoolExecutor类构造方法之一
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
定时线程池的通过传入参数指定核心线程池数量,线程池最大数量“无限”,搭配一个DelayedWorkQueue的阻塞队列使用,这个阻塞队列类是ScheduledThreadPoolExecutor类的静态内部类。
以上线程池的拒绝策略均为默认的AbortPolicy策略,但一般情况下都走不到被拒绝策略处理这一步,毕竟他们要不是线程数量“无限大”,要不就是工作队列“无限大”提交任务。
4.5 线程池线程数量的配置
线程数量的配置要根据CPU的核数以及处理任务的类型来选择。若CPU有N个核(Runtime.getRuntime().availableProcessors()
获得可用的CPU核数),IO密集型任务可以配置2*N的线程数量,CPU密集型任务可以配置N+1的线程数量。若线程池实例是通过Executors工厂方法创建,则可以强转成ThreadPoolExecutor类型,然后使用下面这些方法重新配置线程池。
五、总结
使用线程池进行并发程序开发有诸多好处:
- 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
- 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
本文粗略地介绍了有关Java线程池中的相关知识。首先介绍线程池核心类ThreadPoolExecutor类具备的功能组件,然后简要介绍了线程池任务提交方法execute方法的工作流程,最后介绍了常用的几种线程池实例的配置详细。这些知识算是对自已学习并发编程的一点小总结吧。
文中多数使用“无限”这个词,并使用双引号引起,是因为这个词表示Integer.MAX_VALUE
,为十进制21_4748_3647。虽然这个数已经非常大了,接近于无限,但却并不是无限,故用引号引起。
六、参考
- 《Java并发编程实战》
- 深入理解Java之线程池
- 聊聊并发(七)——Java中的阻塞队列
- Java线程池详解 我室友ruheng博客,强烈推荐!
网友评论