前言
多线程的软件设计方案确实可以最大限度地发挥现代多核处理器的计算能力,提高生产系列的吞吐量和性能。但是,若不加控制和管理的随意使用线程,对系统的性能反而会产生不利的影响。最容易想到的后果就是线程过多导致CPU忙于切换而无力执行其中的工作。
为了避免系统频繁地创建和销毁线程,我们可以让创建的线程进行复用。如果有同学有过数据库开发的经验,对数据库连接池
这个概念应该不会陌生。为了避免每次数据库查询都重新建立和销毁数据库连接,我们可以使用数据库连接池维护一些数据库连接,使其长期保持在一个激活的状态。当系统需要使用数据库时,并不是创建一个新的连接,而是从连接池中获得一个可用的连接即可。反之,当需要关闭连接时,并不真的把连接关闭,而是将这个连接“还”给连接池即可。通过此方法,通过调节线程池的基本大小和存活时间,可以帮助线程池回收空闲线程占有的资源,从而使得这些资源可以用于执行其他的工作。
为了更好地控制多线程,JDK提供了一套Executor框架。核心成员如下图所示
java-util-concurrent-CoreClass.png以上成员均在java.util.concurrent包中,是JDK并发包的核心类。其中ThreadPoolExecutor表示一个线程池。Executors类则扮演线程池工厂角色,通过Executors可以取得一个具有特定功能的线程池。从UML图中亦可知,ThreadPoolExecutor实现了Executor接口,因此通过这个接口,任何Runnable对象都可以被ThreadPoolExecutor线程池调度。
Java提供了ExecutorService的三种实现:
- ThraedPoolExecutor:标准线程池
- ScheduledThreadPoolExecutor:支持延时任务的线程池
- ForkJoinPool:类似于
ThraedPoolExecutor
,但是使用work-stealing模式,其会为线程池中的每个线程创建一个队列,从而使用work-stealing(任务窃取)算法使得线程可以从其他线程队列里窃取任务来执行。即如果自己的任务处理完成了,则可以去忙碌的工作线程那里去窃取任务执行。
在本文,将会主要以ThraedPoolExecutor
作为讲解例子。
线程池的基本大小(Core POOL SIZE) ,较大大小(Maximum pool size) 以及存活时间等因素共同负责线程的创建和销毁 。 基本大小也就是线程池的目标大小,即在没有任务执行是的线程池的大小,并且只有在工作队列满了的情况下才会创建超过这个数量的线程。线程池的较大大小表示可同时活动的线程数量的上限,如并且果某个线程的空闲时间超过了存活时间,那么将被标记为可回收的,并且当线程池的当前大小超过了基本大小时,这个线程将被终止。
newFixedThreadPool
工厂方法将线程池的基本大小和较大大小设置为参数中的执行值,而且创建的线程池不会超时。newCachedThreadPool
工厂方法将线程池的较大大小设置为Integer.MAX_VALUE,而将其基本大小设置为0,并将超时时间设置为1分钟,这种方法创建的线程池可以无限扩展,并且当需求降低时会自动收缩,其他形式的线程池可以通过显示的ThreadPoolExecutor构造函数来沟通。
Executor框架提供了各种类型的线程池,主要有以下工厂方法。
public static ExecutorService newFixedThreadPool(int nThreads)
public static ExecutorService newSingleThreadExecutor()
public static ExecutorService newCachedThreadPool()
public static ScheduledExecutorService newSingleThreadScheduledExecutor()
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
以上方法返回了具有不同工作特性的线程池,具体说明如下:
- newFixedThreadPool返回一个固定数量的线程池。当一个新任务提交时,如果有空闲线程,则执行。否则新任务暂存在一个任务队列中,待有空闲时,便处理在任务队列中的任务。
- newSingleThreadExecutor返回一个线程的线程池。当多余一个新任务提交时,会暂存在一个任务队列中,待有空闲时,按先入先出的顺序处理在任务队列中的任务。
- newCachedThreadPool返回一个可根据实际情况调整线程数量的线程池,线程数量不确定,若有空闲,则会有限复用线程。否则创建新线程处理任务。所有线程在当前任务执行完后,将返回线程池待复用。
-
newSingleThreadScheduledExecutor返回一个ScheduledExecutorService对象,线程池大小为1。ScheduledExecutorService在Executor接口之上扩展了在给定时间执行某任务的功能。如果在某个固定的延时之后执行,或周期性执行某个任务。可以用这个工厂。
newScheduledThreadPool,返回一个ScheduledExecutorService对象,但该线程可以指定线程数量。
固定大小的线程池
public class ExecutorExample {
public static class MyTask implements Runnable{
@Override
public void run() {
System.out.println(System.currentTimeMillis()+":Thread ID:"+Thread.currentThread().getId() );
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] a ){
MyTask myTask = new MyTask();
//创建固定大小线程池
ExecutorService es = Executors.newFixedThreadPool(5);
for (int i =0;i<10;i++){
es.submit(myTask);
}
es.shutdown();
//总的来说就是5个线程去执行10个任务,因此你能看到每个ID都被打印了2遍。
}
}
任务
执行单位
在此之前,我们得先了解线程池中最基本的执行单位——Runable和Callable。
Executor使用Runnable作为其基本的任务表示形式。Runnable是有一种很大局限的抽象,虽然run能写入到日志或者将结果放入某个共享的数据结构,但它不能返回一个值或者抛出一个受检查的异常。那么Callble则可以弥补这些缺陷。
Runnable和Callable描述的都是抽象的计算任务。这些任务通常都是有范围的,即都有一个明确的起始点,并且最终会结束。Executor执行的任务有4个生命周期阶段:
- 创建
- 提交
- 开始
- 完成
由于有些任务可能要执行很长时间,因此通常能够希望取消这些任务。在Executor框架中,已提交但尚未开始的任务可以取消,但对于那些已经开始执行的任务,只有当它们能够响应中断,才能取消。取消一个已完成的任务不会有任何影响。
Future则表示一个任务的生命周期,并提供了相应的方法来判断是否已经完成或取消,以及获取任务的结果和取消的任务等。在Future规范中包含的隐含含义是,任务的生命周期只能前进,不能后退,就像ExecuteService的生命周期一样,当某个任务完成后,它就永远停留在“完成”状态上。
Future.get方法的行为取决于任务的状态(尚未开始、已经运行、已完成)。如果任务已经完成,那么get会立即返回或者抛出一个Exception,如果任务没有完成,那么get将阻塞并直到任务完成。如果任务抛出了异常,那么get将异常封装为ExecutionException并重新抛出。如果任务被取消了,那么get将抛出CancellationException。如果get抛出ExecutionException,那么可以通过getCause来获得被封装的初始异常。
计划任务
newScheduledThreadPool返回一个ScheduledExecutorService对象,可以根据实际对线程进行调度。
//在给定的时间,对任务进行一次调度
public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
//用于对任务进行周期性调度,任务调度的频率是一定的,它以上一个任务开始执行时间为起点,之后的period时间后调度下一次任务。如果任务的执行时间大于调度时间,那么任务就会在上一个任务结束后,立即被调用。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);
//对任务进行周期性调度,在上一个任务结束后,再经过delay长的时间进行任务调度。
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);
ScheduledExecutorService不会立即安排执行任务,它类似Linux中的crontab工具。如果任务遇到异常,则后续的所有子任务都会停止执行。因此,必须保证异常被及时处理,为周期性任务的稳定调度提供条件。
public class ScheduledExecutorExample {
public static void main(String[] a){
ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
//如果前面的任务没有完成,则调度也不会启动
ses.scheduleAtFixedRate(new Runnable(){
@Override
public void run() {
long s = System.currentTimeMillis();
try {
System.out.println(Thread.currentThread().getId() + " 号线程开始工作...");
//模拟处理事务
Thread.sleep(1000);
long e = System.currentTimeMillis();
System.out.println(Thread.currentThread().getId() + " 号线程结束工作...用时:" +( (e -s)/1000) +"s");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},0,2, TimeUnit.SECONDS);
}
}
核心线程池的内部实现
对于核心的几个线程池,无论是newFixedThreadPool()、newSingleThreadExecutor()还是newCacheThreadPool方法,虽然看起来创建的线程具有完全不同的功能特点,但其内部均使用了ThreadPoolExecutor实现。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}
由以上线程池的实现可以看到,它们都只是ThreadPoolExecutor类的封装。我们看下ThreadPoolExecutor最重要的构造函数:
public ThreadPoolExecutor(
//指定了线程池中的线程数量
int corePoolSize,
//指定了线程池中的最大线程数量
int maximumPoolSize,
//当前线程池数量超过corePoolSize时,多余的空闲线程的存活时间,即多次时间内会被销毁。
long keepAliveTime,
//keepAliveTime的单位
TimeUnit unit,
//任务队列,被提交但尚未被执行的任务。
BlockingQueue<Runnable> workQueue,
//线程工厂,用于创建线程,一般用默认的即可
ThreadFactory threadFactory,
//拒绝策略,当任务太多来不及处理,如何拒绝任务。
RejectedExecutionHandler handler)
在这里面,大多数的参数都是较好理解的,但是workQueue和handler需要进行详细说明。
WorkQueue
workQueue指提交但未执行的任务队列,它是一个BlockingQueue接口的对象,仅用于存放Runnable对象,根据队列功能分类,在ThreadPoolExecutor的构造函数中可使用以下几种BlockingQueue。
-
直接提交的队列:
该功能由synchronousQueue对象提供,synchronousQueue对象是一个特殊的BlockingQueue。synchronousQueue没有容量,每一个插入操作都要等待一个响应的删除操作,反之每一个删除操作都要等待对应的插入操作。如果使用synchronousQueue,提交的任务不会被真实的保存,而总是将新任务提交给线程执行,如果没有空闲线程,则尝试创建线程,如果线程数量已经达到了最大值,则执行拒绝策略,因此,使用synchronousQueue队列,通常要设置很大的maximumPoolSize值,否则很容易执行拒绝策略。
-
有界的任务队列:
有界任务队列可以使用ArrayBlockingQueue实现。ArrayBlockingQueue构造函数必须带有一个容量参数,表示队列的最大容量。
public ArrayBlockingQueue(int capacity)
当使用有界任务队列时,若有新任务需要执行时,如果线程池的实际线程数量小于corePoolSize,则会优先创建线程。若大于corePoolSize,则会将新任务加入等待队列。若等待队列已满,无法加入,则在总线程数不大于maximumPoolSize的前提下,创建新的线程执行任务。若大于maximumPoolSize,则执行拒绝策略。可见有界队列仅当在任务队列装满后,才可能将线程数量提升到corePoolSize以上,换言之,除非系统非常繁忙,否则确保核心线程数维持在corePoolSize。
-
无界的任务队列:
无界队列可以通过LinkedBlockingQueue类实现。与有界队列相比,除非系统资源耗尽,无界队列的任务队列不存在任务入队失败的情况。若有新任务需要执行时,如果线程池的实际线程数量小于corePoolSize,则会优先创建线程执行。但当系统的线程数量达到corePoolSize后就不再创建了,这里和有界任务队列是有明显区别的。若后续还有新任务加入,而又没有空闲线程资源,则任务直接进入队列等待。若任务创建和处理的速度差异很大,无界队列会保持快速增长,知道耗尽系统内存。
-
优先任务队列:
带有优先级别的队列,它通过PriorityBlokingQueue实现,可以控制任务执行的优先顺序。它是一个特殊的无界队列。无论是ArrayBlockingQueue还是LinkedBlockingQueue实现的队列,都是按照先进先出的算法处理任务,而PriorityBlokingQueue根据任务自身优先级顺序先后执行,在确保系统性能同时,也能很好的质量保证(总是确保高优先级的任务优先执行)。
开发人员以免有时会将线程池的基本大小设置为零,从而最终销毁工作者线程以免阻碍JVM的退出。然而,如果在线程池中没有使用SynchronousQueue作为其工作队列(例如在newCachedThreadPool中就是如此,它的核心池设为0,但它的任务队列使用的是SynchronousQueue),那么这种方式将产生一些奇怪的行为。如果线程池中的线程数量等于线程池的基本大小,那么仅当在工作队列已满的情况下ThreadPoolExecutor才会创建新的线程。因此,如果线程池的基本大小为零并且其工作队列有一定的容量,那么当把任务提交给该线程池时,只有当线程池的工作队列被填满后,才会开始执行任务,而这种行为通常不是我们所希望的。在Java6中,可以通过allowCoreThreadTimeOut来使线程池中的所有线程超时。对于一个大小有限的线程池并且在该线程池中包含了一个工作队列,如果希望 **这个线程池在没有任务的情况下能销毁所有的线程 ** ,那么可以启用这个特性并将基本大小设置为零。
调度逻辑可以总结为
ThreadPoolExecutorJobPlan.png饱和策略
线程池中的线程已经用完了,无法继续为新任务服务,同时,等待队列也已经排满了,再也塞不下新任务了。这时候我们就需要拒绝策略机制合理的处理这个问题。
JDK内置的拒绝策略如下:
- AbortPolicy : 默认策略。直接抛出异常
DiscardExecutionException
,调用者可以考虑捕获这个异常,编写自己的处理代码。 - CallerRunsPolicy : 该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者。
- DiscardOldestPolicy : 丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。
- DiscardPolicy : 该策略默默地丢弃无法处理的任务,不予任何处理。如果允许任务丢失,这是最好的一种方案。
以上内置拒绝策略均实现了RejectedExecutionHandler接口,若以上策略仍无法满足实际需要,完全可以自己扩展RejectedExecutionHandler接口。RejectedExecutionHandler的定义如下。
public interface RejectedExecutionHandler {
/**
* @param r 请求执行的任务
* @param executor 当前线程池
**/
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
~~~
那么接下来看一个简单地演示了自定义线程池和拒绝策略的使用:
```java
public class RejectThreadPoolDemo {
public static class MyTask implements Runnable {
public void run() {
System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
try {
Thread.sleep(100);
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws InterruptedException {
MyTask task = new MyTask();
ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(10), new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println(r.toString() + "is discard");
}
});
for (int i = 0; i < Integer.MAX_VALUE; i++) {
es.submit(task);
Thread.sleep(10);
}
}
}
这个例子定义一个线程池,里面有5个常驻线程,并且最大线程数也是5个。这和固定大小的线程池是一样的。但是它却拥有一个只有10个容量的等待队列。那么必定会有在大量的任务被直接丢弃。
自定义线程创建:ThreadFactory
线程池中的线程从何而来?来自ThreadFactory。
ThreadFactory是一个接口,它只有一个方法,用来创建线程:
Thread newThread(Runnable r);
当线程池需要新建线程时,就会调用这个方法。
自定义线程池可以帮我们做不少事情。我们可以跟踪线程池在何时创建了多少线程,也可以自定义线程的名称、组以及优先级等信息,甚至可以任性地将所有的线程设置为守护线程。总之,使用自定义线程可以让我们更加自由地设置池中所有的线程的状态。下面的案例使用自定义ThreadFactory,一方面记录了线程的创建,另一方面将所有的线程都设置为守护线程,这样,当主线程退出后,将会强制销毁线程池。
public class ThreadFactoryExample {
public static class MyTask implements Runnable{
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " coming...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] a ) throws InterruptedException {
MyTask myTask = new MyTask();
ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10)
, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setName("T " + t.getId() + "_" +System.currentTimeMillis());
t.setDaemon(true);
System.out.println("Create a Thread Name is : "+t.getName());
return t;
}
});
for (int i=0;i<10;i++){
es.submit(myTask);
}
Thread.sleep(2000);
}
}
扩展线程池
ThreadPoolExecutor是可扩展的,它提供了几个“钩子”方法可以在子类化中改写:beforeExecute、afterExecute和terminated,这些方法可以用于扩展ThreadPoolExecutor的行为。
在执行任务的线程中将调用beforeExecute和afterExecute等方法,在这些方法中还可以添加日志、计时、监视或统计信息收集的功能。无论任务是从run中正常返回,还是抛出一个异常而返回,afterExecute都会被调用。(如果任务在完成后带有一个Error,那么就不会调用afterExecute。)如果beforeExecute抛出了一个RuntimeException,那么任务将不会被执行,并且afterExecute也不会被调用。
栗子:
public class ExtThreadPool {
public static class MyTask implements Runnable {
private String name;
public MyTask(String name) {
this.name = name;
}
@Override
public void run() {
try {
System.out.println("现在向控制台走来的是线程" + Thread.currentThread().getId() + "号" + "名字为:" + name);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] a) throws InterruptedException {
ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>()) {
/**
* 创建ThreadPoolExecutor的匿名内部类的子类
*
* @param t
* the thread that will run task 将要运行任务的线程
* @param r
* the task that will be executed 将要执行的任务
**/
@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.println("start execute .." + ((MyTask) r).name);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
System.out.println("after execute .." + ((MyTask) r).name);
}
@Override
protected void terminated() {
System.out.println("exit execute ..");
}
};
for (int i = 0; i < 10; i++) {
MyTask myTask = new MyTask("T_" + i);
es.execute(myTask);// execute 和 submit 的区别在future模式中再说
Thread.sleep(100);
}
/**
* 不会暴力的关闭,而会等待所有线程执行完后关闭线程 可以简单的理解为shutdown只是发送一个关闭信号,
* 但在shutdown之后,线程就不能再接受其他任务了.
**/
es.shutdown();
}
}
分治思想:Fork/Join框架
分治思想经常在一些经典的算法中能看到,算是一个非常有效地处理大量数据的方法。
在Linux平台中,函数fork()用来创建子线程,使得系统进程可以多一个执行分支。
而join这个方法相信了解java多线程的同学一定不会陌生,它表示等待。也就是使用fork()后系统多一个执行分支(线程),所以需要等待这个执行分支执行完毕,才有可能得到最终的结果,因此join()就表示等待。
在实际使用中,如果毫无顾忌的使用fork()开启线程进行处理,那么很有可能导致系统开启过多的线程而严重影响性能。所以,在JDK中给出一个ForkJoinPool线程池,对于fork()方法并不着急开启线程,而是提交给ForkJoinPool线程池进行处理,以节省系统资源。使用Fork/Join进行数据处理时候的总体结构如下图。
Fork'JoinExecutorPlan.jpg由于线程池的优化,提交的任务和线程数量并不是一对一的关系。在绝大多数情况下,一个物理线程实际上是需要处理多个逻辑任务的。因此,每个线程必然需要拥有一个任务队列。在实际执行过程中,可能遇到这么一种情况:线程A执行完了自己的所有任务,而线程B还有一堆任务等着处理。此时,线程A就会“帮助”线程B,从线程B的任务列表中拿一个任务过来处理,尽可能达到平衡。值得注意的是,当线程视图帮助别的线程时,总是从任务队列的底部开始拿数据,而线程视图执行自己的任务时,则是从相反的顶部开始拿。因此这种行为也十分有利于避免数据竞争。
接下来看一下ForkJoinPool的一个重要的接口:
public<T> ForkJoinPool<T> submit(ForkJoinPoolTask<T>task)
你可以向ForkJoinPool线程池提交一个ForkJoinTask任务。所谓ForkJoinTask任务就是支持fork()分解以及join等待的任务。ForkJoinTask有两个最重要的子类,RecursiveAction和RecursiveTask。它们分别表示没有返回值的任务和可以携带返回值的任务。
网友评论