线程池
为了能够更好地控制多线程,JDK提供了一套Executor框架,帮助开发人员有效地进行线程控制,其本质就是一个线程池。
image.png
以上成员均在java.util.concurrent包中,是JDK并发包的核心类。
Executor框架提供了各种类型的线程池,主要有以下工厂方法
public static ExecutorService newFixedThreadPool(int nThreads)
public static ExecutorService newSingleThreadExecutor()
public static ExecutorService newCachedThreadPool()
public static ScheduledExecutorService newSingleThreadScheduledExecutor()
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
以上工厂方法分别返回具有不同工作特性的线程池。这些线程池工厂方法的具体说明如下:
- newFixedThreadPool()方法:该方法返回一个固定线程数量的线程池。该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务。
- newSingleThreadExecutor()方法:该方法返回一个只有一个线程的线程池。若多余一个任务被提交到该线程池,任务会被保存在一个任务队列中,待线程空闲,按先入先出的顺序执行队列中的任务。
- newCachedThreadPool()方法:该方法返回一个可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交,则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用。
- newSingleThreadScheduledExecutor()方法:该方法返回一个ScheduledExecutorService对象,线程池大小为1。ScheduledExecutorService接口在ExecutorService接口之上扩展了在给定时间执行某任务的功能,如在某个固定的延时之后执行,或者周期性执行某个任务。
- newScheduledThreadPool()方法:该方法也返回一个ScheduledExecutorService对象,但该线程池可以指定线程数量。
固定大小的线程池
01 public class ThreadPoolDemo {
02 public static class MyTask implements Runnable {
03 @Override
04 public void run() {
05 System.out.println(System.currentTimeMillis() + ":Thread ID:"
06 + Thread.currentThread().getId());
07 try {
08 Thread.sleep(1000);
09 } catch (InterruptedException e) {
10 e.printStackTrace();
11 }
12 }
13 }
14
15 public static void main(String[] args) {
16 MyTask task = new MyTask();
17 ExecutorService es = Executors.newFixedThreadPool(5);
18 for (int i = 0; i < 10; i++) {
19 es.submit(task);
20 }
21 }
22 }
输出
1426510293450:Thread ID:8
1426510293450:Thread ID:9
1426510293450:Thread ID:12
1426510293450:Thread ID:10
1426510293450:Thread ID:11
1426510294450:Thread ID:12
1426510294450:Thread ID:11
1426510294450:Thread ID:8
1426510294450:Thread ID:10
1426510294450:Thread ID:9
计划任务
newScheduledThreadPool()。它返回一个ScheduledExecutorService对象,可以根据时间需要对线程进行调度。它的一些主要方法如下:
public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
计划任务即定时线程池,定时执行任务,这个定时有两个维度,周期和延迟:
- scheduleAtFixedRate
- 创建一个周期性任务。任务开始于给定的初始延时。后续的任务按照给定的周期进行:后续第一个任务将会在initialDelay+period时执行,后续第二个任务将在initialDelay+2period时进行,依此类推。即按照已固定的频率来执行某项计划(任务)*。
- scheduleWithFixedDelay
- 创建并执行一个周期性任务。任务开始于初始延时时间,后续任务将会按照给定的延时进行,即上一个任务的结束时间到下一个任务的开始时间的时间差。这个周期执行,是等上一次任务结束后,等待一个延迟时间执行一次任务,即每个任务之间按照固定的延迟执行任务。
下面的例子使用scheduleAtFixedRate()方法调度一个任务。这个任务会执行1秒钟时间,调度周期是2秒。也就是说每2秒钟,任务就会被执行一次。
01 public class ScheduledExecutorServiceDemo {
02 public static void main(String[] args) {
03 ScheduledExecutorService ses=Executors.newScheduledThreadPool(10);
04 //如果前面的任务没有完成,则调度也不会启动
05 ses.scheduleAtFixedRate(new Runnable() {
06 @Override
07 public void run() {
08 try {
09 Thread.sleep(1000);
10 System.out.println(System.currentTimeMillis()/1000);
11 } catch (InterruptedException e) {
12 e.printStackTrace();
13 }
14 }
15 }, 0, 2, TimeUnit.SECONDS);
16 }
17 }
执行上述代码,一种输出的可能如下:
1426515345
1426515347
1426515349
1426515351
上述输出的单位是秒。可以看到,时间间隔是2秒。
如果任务的执行时间超过调度时间,会发生什么情况呢?
将第9行的代码改为
Thread.sleep(8000);
再执行上述代码,你就会发现任务的执行周期不再是2秒,而是变成了8秒。如下所示,是一种可能的结果。
1426516323
1426516331
1426516339
1426516347
1426516355
也就是说,周期如果太短,那么任务就会在上一个任务结束后,立即被调用。
另外一个值得注意的问题是,调度程序实际上并不保证任务会无限期的持续调用。如果任务本身抛出了异常,那么后续的所有执行都会被中断,因此,如果你想让你的任务持续稳定的执行,那么做好异常处理就非常重要,否则,你很有可能观察到你的调度器无疾而终。
注意:如果任务遇到异常,那么后续的所有子任务都会停止调度,因此,必须保证异常被及时处理,为周期性任务的稳定调度提供条件。
核心线程池的内部实现
ThreadPoolExecutor最重要的构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
-
corePoolSize:指定了线程池中的线程数量。
-
maximumPoolSize:指定了线程池中的最大线程数量。
-
keepAliveTime:当线程池线程数量超过corePoolSize时,多余的空闲线程的存活时间。即,超过
corePoolSize的空闲线程,在多长时间内,会被销毁。
-
unit:keepAliveTime的单位。
-
workQueue:任务队列,被提交但尚未被执行的任务。
-
threadFactory:线程工厂,用于创建线程,一般用默认的即可。
-
handler:拒绝策略。当任务太多来不及处理,如何拒绝任务。
以上参数中,大部分都很简单,只有workQueue和handler需要进行详细说明。
workQueue
参数workQueue指被提交但未执行的任务队列,它是一个BlockingQueue接口的对象,仅用于存放Runnable对象。根据队列功能分类,在ThreadPoolExecutor的构造函数中可使用以下几种BlockingQueue。
-
直接提交的队列:该功能由SynchronousQueue对象提供。SynchronousQueue是一个特殊的BlockingQueue。SynchronousQueue没有容量,每一个插入操作都要等待一个相应的删除操作,反之,每一个删除操作都要等待对应的插入操作。如果使用SynchronousQueue,提交的任务不会被真实的保存,而总是将新任务提交给线程执行,如果没有空闲的进程,则尝试创建新的进程,如果进程数量已经达到最大值,则执行拒绝策略。因此,使用SynchronousQueue队列,通常要设置很大的maximumPoolSize值,否则很容易执行拒绝策略。即SynchronousQueue没有容量,提交的任务直接交给线程执行,不保存任务,如果进程数量已经达到最大值,则执行拒绝策略。
-
有界的任务队列:有界的任务队列可以使用ArrayBlockingQueue实现。ArrayBlockingQueue的构造函数必须带一个容量参数,表示该队列的最大容量,如下所示。
public ArrayBlockingQueue(int capacity)
- 当使用有界的任务队列时,若有新的任务需要执行,如果线程池的实际线程数小于corePoolSize,则会优先创建 新的线程,若大于corePoolSize,则会将新任务加入等待队列。若等待队列已满,无法加入,则在总线程数不大于maximumPoolSize的前提下,创建新的进程执行任务。若大于maximumPoolSize,则执行拒绝策略。可见,有界队列仅当在任务队列装满时,才可能将线程数提升到corePoolSize以上,换言之,除非系统非常繁忙,否则确保核心线程数维持在在corePoolSize。
-
无界的任务队列:无界任务队列可以通过LinkedBlockingQueue类实现。与有界队列相比,除非系统资源耗尽,否则无界的任务队列不存在任务入队失败的情况。当有新的任务到来,系统的线程数小于corePoolSize时,线程池会生成新的线程执行任务,但当系统的线程数达到corePoolSize后,就不会继续增加。若后续仍有新的任务加入,而又没有空闲的线程资源,则任务直接进入队列等待。若任务创建和处理的速度差异很大,无界队列会保持快速增长,直到耗尽系统内存。即如果使用无界队列,正在工作的线程数最大只能是corePoolSize,超过则直接进入队列等待,无界队列可以无限增长,直到系统内存耗尽。
-
优先任务队列:优先任务队列是带有执行优先级的队列。它通过PriorityBlockingQueue实现,可以控制任务的执行先后顺序。它是一个特殊的无界队列。无论是有界队列ArrayBlockingQueue,还是未指定大小的无界队列LinkedBlockingQueue都是按照先进先出算法处理任务的。而PriorityBlockingQueue则可以根据任务自身的优先级顺序先后执行,在确保系统性能的同时,也能有很好的质量保证(总是确保高优先级的任务先执行)。
notice:
newFixedThreadPool()和newCachedThreadPool()的线程池数量
newFixedThreadPool()
回顾newFixedThreadPool()方法的实现。它返回了一个corePoolSize和maximumPoolSize大小一样的,并且使用了LinkedBlockingQueue任务队列的线程池。因为对于固定大小的线程池而言,不存在线程数量的动态变化,因此corePoolSize和maximumPoolSize可以相等。同时,它使用无界队列存放无法立即执行的任务,当任务提交非常频繁的时候,该队列可能迅速膨胀,从而耗尽系统资源。
newSingleThreadExecutor()返回的单线程线程池,是newFixedThreadPool()方法的一种退化,只是简单的将线程池线程数量设置为1。
newCachedThreadPool()
newCachedThreadPool()方法返回corePoolSize为0,maximumPoolSize无穷大的线程池,这意味着在没有任务时,该线程池内无线程,而当任务被提交时,该线程池会使用空闲的线程执行任务,若无空闲线程,则将任务加入SynchronousQueue队列,而SynchronousQueue队列是一种直接提交的队列,它总会迫使线程池增加新的线程执行任务。当任务执行完毕后,由于corePoolSize为0,因此空闲线程又会在指定时间内(60秒)被回收。
对于newCachedThreadPool(),如果同时有大量任务被提交,而任务的执行又不那么快时,那么系统便会开启等量的线程处理,这样做法可能会很快耗尽系统的资源。即newCachedThreadPool()是根据实际情况调整线程数量的线程池,最大可以创建无限多的线程。
ThreadPoolExecutor线程池的核心调度代码
01 public void execute(Runnable command) {
02 if (command == null)
03 throw new NullPointerException();
04 int c = ctl.get();
05 if (workerCountOf(c) < corePoolSize) {
06 if (addWorker(command, true))
07 return;
08 c = ctl.get();
09 }
10 if (isRunning(c) && workQueue.offer(command)) {
11 int recheck = ctl.get();
12 if (! isRunning(recheck) && remove(command))
13 reject(command);
14 else if (workerCountOf(recheck) == 0)
15 addWorker(null, false);
16 }
17 else if (!addWorker(command, false))
18 reject(command);
19 }
代码第5行的workerCountOf()函数取得了当前线程池的线程总数。当线程总数小于corePoolSize核心线程数时,会将任务通过addWorker()方法直接调度执行。否则,则在第10行代码处(workQueue.offer())进入等待队列。如果进入等待队列失败(比如有界队列到达了上限,或者使用了SynchronousQueue),则会执行第17行,将任务直接提交给线程池。如果当前线程数已经达到maximumPoolSize,则提交失败,就执行第18行的拒绝策略。
image.png拒绝策略(handler)
ThreadPoolExecutor的最后一个参数指定了拒绝策略。也就是当任务数量超过系统实际承载能力时,使用拒绝策略。
JDK内置的拒绝策略如下。
- AbortPolicy策略:该策略会直接抛出异常,阻止系统正常工作。
- CallerRunsPolicy策略:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。显然这样做不会真的丢弃任务,但是,任务提交线程的性能极有可能会急剧下降。即该拒绝策略丢弃的线程放到调用者的线程中执行,也就是谁调用线程池,该任务就交给谁执行。
- DiscardOledestPolicy策略:该策略将丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务。
- DiscardPolicy策略:该策略默默地丢弃无法处理的任务,不予任何处理。如果允许任务丢失,我觉得这可能是最好的一种方案了吧!
notice: 执行拒绝策略的前提都是队列已满,队列将请求交给线程池。
以上内置的策略均实现了RejectedExecutionHandler接口,若以上策略仍无法满足实际应用需要,完全可以自己扩展RejectedExecutionHandler接口。RejectedExecutionHandler的定义如下:
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
其中r为请求执行的任务,executor为当前的线程池。
下面的代码简单地演示了自定义线程池和拒绝策略的使用:
01 public class RejectThreadPoolDemo {
02 public static class MyTask implements Runnable {
03 @Override
04 public void run() {
05 System.out.println(System.currentTimeMillis() + ":Thread ID:"
06 + Thread.currentThread().getId());
07 try {
08 Thread.sleep(100);
09 } catch (InterruptedException e) {
10 e.printStackTrace();
11 }
12 }
13 }
14
15 public static void main(String[] args) throws InterruptedException {
16 MyTask task = new MyTask();
17 ExecutorService es = new ThreadPoolExecutor(5, 5,
18 0L, TimeUnit.MILLISECONDS,
19 new LinkedBlockingQueue<Runnable>(10),
20 Executors.defaultThreadFactory(),
21 new RejectedExecutionHandler(){
22 @Override
23 public void rejectedExecution(Runnable r,
24 ThreadPoolExecutor executor) {
25 System.out.println(r.toString()+" is discard");
26 }
27 });
28 for (int i = 0; i < Integer.MAX_VALUE; i++) {
29 es.submit(task);
30 Thread.sleep(10);
31 }
32 }
33 }
上述代码的第17~27行自定义了一个线程池。该线程池有5个常驻线程,并且最大线程数量也是5个。这和固定大小的线程池是一样的。但是它却拥有一个只有10个容量的等待队列。因为使用无界队列很可能并不是最佳解决方案,如果任务量极大,很有可能会把内存撑爆。给出一个合理的队列大小,也是合乎常理的选择。同时,这里自定义了拒绝策略,我们不抛出异常,因为万一在任务提交端没有进行异常处理,则有可能使得整个系统都崩溃,这极有可能不是我们希望遇到的。但作为必要的信息记录,我们将任务丢弃的信息进行打印,当然,这只比内置的DiscardPolicy策略高级那么一点点。
由于在这个案例中,MyTask执行需要花费100毫秒,因此,必然会导致大量的任务被直接丢弃。执行上述代码,可能的部分输出如下:
1426597264669:Thread ID:11
1426597264679:Thread ID:12
java.util.concurrent.FutureTask@a57993 is discard
java.util.concurrent.FutureTask@1b84c92 is discard
可以看到,在执行几个任务后,拒绝策略就开始生效了。在实际应用中,我们可以将更详细的信息记录到日志中,来分析系统的负载和任务丢失的情况。
自定义线程创建:ThreadFactory
线程池的主要作用是为了线程复用,也就是避免了线程的频繁创建。但是,最开始的那些线程从何而来呢?答案就是ThreadFactory。
ThreadFactory是一个接口,它只有一个方法,用来创建线程:
Thread newThread(Runnable r);
当线程池需要新建线程时,就会调用这个方法。
自定义线程池可以帮助我们做不少事。比如:
-
可以跟踪线程池究竟在何时创建了多少线程
-
可以自定义线程的名称、组以及优先级等信息
-
可以任性地将所有的线程设置为守护线程。
总之,使用自定义线程池可以让我们更加自由地设置池子中所有线程的状态。下面的案例使用自定义的ThreadFactory,一方面记录了线程的创建,另一方面将所有的线程都设置为守护线程,这样,当主线程退出后,将会强制销毁线程池(这个销毁线程池的思路真是石破天惊)。
01 public static void main(String[] args) throws InterruptedException {
02 MyTask task = new MyTask();
03 ExecutorService es = new ThreadPoolExecutor(5, 5,
04 0L, TimeUnit.MILLISECONDS,
05 new SynchronousQueue<Runnable>(),
06 new ThreadFactory(){
07 @Override
08 public Thread newThread(Runnable r) {
09 Thread t= new Thread(r);
10 t.setDaemon(true);
11 System.out.println("create "+t);
12 return t;
13 }
14 }
15 );
16 for (int i = 0; i < 5; i++) {
17 es.submit(task);
18 }
19 Thread.sleep(2000);
20 }
扩展线程池
ThreadPoolExecutor也是一个可以扩展的线程池。它提供了beforeExecute()、afterExecute()和terminated()三个接口对线程池进行控制。
以beforeExecute()、afterExecute()为例,在ThreadPoolExecutor.Worker. runTask()方法内部提供了这样的实现:
boolean ran = false;
beforeExecute(thread, task); //运行前
try {
task.run(); //运行任务
ran = true;
afterExecute(task, null); //运行结束后
++completedTasks;
} catch (RuntimeException ex) {
if (!ran)
afterExecute(task, ex); //运行结束
throw ex;
}
ThreadPoolExecutor.Worker是ThreadPoolExecutor的内部类,它是一个实现了Runnable接口的类。ThreadPoolExecutor线程池中的工作线程也正是Worker实例。Worker.runTask()方法会被线程池以多线程模式异步调用,即Worker.runTask()会同时被多个线程访问。因此其beforeExecute()、afterExecute()接口也将同时多线程访问。
在默认的ThreadPoolExecutor实现中,提供了空的beforeExecute()和afterExecute()实现。在实际应用中,可以对其进行扩展来实现对线程池运行状态的跟踪,输出一些有用的调试信息,以帮助系统故障诊断,这对于多线程程序错误排查是很有帮助的。下面演示了对线程池的扩展,在这个扩展中,我们将记录每一个任务的执行日志。(这种扩展太棒了)
线程池扩展起始时间,完成时间,退出,和为线程赋名字范式
01 public class ExtThreadPool {
02 public static class MyTask implements Runnable {
03 public String name;
04
05 public MyTask(String name) {
06 this.name = name;
07 }
08
09 @Override
10 public void run() {
11 System.out.println("正在执行" + ":Thread ID:" + Thread. currentThread().getId()
12 + ",Task Name=" + name);
13 try {
14 Thread.sleep(100);
15 } catch (InterruptedException e) {
16 e.printStackTrace();
17 }
18 }
19 }
20
21 public static void main(String[] args) throws InterruptedException {
22
23 ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
24 new LinkedBlockingQueue<Runnable>()) {
25 @Override
26 protected void beforeExecute(Thread t, Runnable r) {
27 System.out.println("准备执行:" + ((MyTask) r).name);
28 }
29
30 @Override
31 protected void afterExecute(Runnable r, Throwable t) {
32 System.out.println("执行完成:" + ((MyTask) r).name);
33 }
34
35 @Override
36 protected void terminated() {
37 System.out.println("线程池退出");
38 }
39
40 };
41 for (int i = 0; i < 5; i++) {
42 MyTask task = new MyTask("TASK-GEYM-" + i);
43 es.execute(task);
44 Thread.sleep(10);
45 }
46 es.shutdown();
47 }
48 }
输出
准备执行:TASK-GEYM-0
正在执行:Thread ID:8,Task Name=TASK-GEYM-0
准备执行:TASK-GEYM-1
正在执行:Thread ID:9,Task Name=TASK-GEYM-1
准备执行:TASK-GEYM-2
正在执行:Thread ID:10,Task Name=TASK-GEYM-2
准备执行:TASK-GEYM-3
正在执行:Thread ID:11,Task Name=TASK-GEYM-3
准备执行:TASK-GEYM-4
正在执行:Thread ID:12,Task Name=TASK-GEYM-4
执行完成:TASK-GEYM-0
执行完成:TASK-GEYM-1
执行完成:TASK-GEYM-2
执行完成:TASK-GEYM-3
执行完成:TASK-GEYM-4
线程池退出
上述代码在第23~40行,扩展了原有的线程池,实现了beforeExecute()、afterExecute()和terminiated()三个方法。这三个方法分别用于记录一个任务的开始、结束和整个线程池的退出。在第42~43行,向线程池提交5个任务,为了有更清晰的日志,我们为每个任务都取了一个不同的名字。
在提交完成后,调用shutdown()方法关闭线程池。这是一个比较安全的方法,如果当前正有线程在执行,shutdown()方法并不会立即暴力地终止所有任务,它会等待所有任务执行完成后,再关闭线程池,但它并不会等待所有线程执行完成后再返回,因此,可以简单地理解成shutdown()只是发送了一个关闭信号而已。但在shutdown()方法执行后,这个线程池就不能再接受其他新的任务了。
优化线程池线程数量
在《Java Concurrency in Practice》一书中给出了一个估算线程池大小的经验公式:
image.png为保持处理器达到期望的使用率,最优的池的大小等于:
image.png在Java中,可以通过:
Runtime.getRuntime().availableProcessors()
取得可用的CPU数量。通过这么为线程池赋予线程数更加优雅。
在线程池中寻找堆栈
处理不好线程的异常,线程池会吞异常信息
public class DivTask implements Runnable {
int a,b;
public DivTask(int a,int b){
this.a=a;
this.b=b;
}
@Override
public void run() {
double re=a/b;
System.out.println(re);
}
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
ThreadPoolExecutor pools=new ThreadPoolExecutor(0, Integer.MAX_VALUE,
0L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
for(int i=0;i<5;i++){
pools.submit(new DivTask(100,i));
}
}
上述代码将DivTask提交到线程池,从这个for循环来看,我们应该会得到5个结果,分别是100除以给定的i后的商。但如果你真的运行程序,你得到的全部结果是:
33.0
50.0
100.0
25.0
只有4个输出。也就说是程序漏算了一组数据!但更不幸的是,程序没有任何日志,没有任何错误提示,就好像一切都正常一样。在这个简单的案例中,只要你稍有经验,你就能发现,作为除数的i取到了0,这个缺失的值很可能是由于除以0导致的。但在稍复杂的业务场景中,这种错误足可以让你几天萎靡不振。
因此,使用线程池虽然是件好事,但是还是得处处留意这些“坑”。线程池很有可能会“吃”掉程序抛出的异常,完全不打印堆栈信息,导致我们对程序的错误一无所知。
<font color=red>解决方案</font>
解决这个问题的一个最简单的方式就是放弃submit(),改用execute()。将上述的任务提交代码改成:
pools.execute(new DivTask(100,i));
或者你使用下面的方法改造你的submit():
Future re=pools.submit(new DivTask(100,i));
re.get();
上面两种方法都可以得到部分堆栈信息,如下所示:
Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
at geym.conc.ch3.trace.DivTask.run(DivTask.java:11)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor. java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor. java:617)
at java.lang.Thread.run(Thread.java:745)
33.0
100.0
50.0
25.0
显然只有部分堆栈信息是不够的,从这两个异常堆栈中我们只能知道异常是在哪里抛出的(这里是DivTask的第11行)。但是我们还希望得到另外一个更重要的信息,那就是这个任务到底是在哪里提交的?而任务的具体提交位置已经被线程池完全淹没了。顺着堆栈,我们最多只能找到线程池中的调度流程,而这对于我们几乎是没有价值的。
所以需要扩展ThreadPoolExecutor线程池,让它在调度任务之前,先保存一下提交任务线程的堆栈信息。
01 public class TraceThreadPoolExecutor extends ThreadPoolExecutor {
02 public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
03 long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
04 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
05 }
05 }
06
07 @Override
08 public void execute(Runnable task) {
09 super.execute(wrap(task, clientTrace(), Thread.currentThread()
10 .getName()));
11 }
12
13 @Override
14 public Future<?> submit(Runnable task) {
15 return super.submit(wrap(task, clientTrace(), Thread.currentThread()
16 .getName()));
17 }
18
19 private Exception clientTrace() {
20 return new Exception("Client stack trace");
21 }
22
23 private Runnable wrap(final Runnable task, final Exception clientStack,
24 String clientThreadName) {
25 return new Runnable() {
26 @Override
27 public void run() {
28 try {
29 task.run();
30 } catch (Exception e) {
31 clientStack.printStackTrace();
32 throw e;
33 }
34 }
35 };
36 }
37 }
在第23行代码中,wrap()方法的第2个参数为一个异常,里面保存着提交任务的线程的堆栈信息。该方法将我们传入的Runnable任务进行一层包装,使之能处理异常信息。当任务发生异常时,这个异常会被打印。
14 public static void main(String[] args) {
15 ThreadPoolExecutor pools=new TraceThreadPoolExecutor(0, Integer.MAX_VALUE,
16 0L, TimeUnit.SECONDS,
17 new SynchronousQueue<Runnable>());
18
19 /**
20 * 错误堆栈中可以看到是在哪里提交的任务
21 */
22 for(int i=0;i<5;i++){
23 pools.execute(new DivTask(100,i));
24 }
25 }
执行上述代码,就可以得到以下信息:
java.lang.Exception: Client stack trace
at geym.conc.ch3.trace.TraceThreadPoolExecutor.clientTrace(TraceThreadPoolExecutor.java:28)
at geym.conc.ch3.trace.TraceThreadPoolExecutor.execute(TraceThreadPoolExecutor.java:17)
at geym.conc.ch3.trace.TraceMain.main(TraceMain.java:23) //这里显示了线程的提交位置
Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
at geym.conc.ch3.trace.DivTask.run(DivTask.java:11)
at geym.conc.ch3.trace.TraceThreadPoolExecutor$1.run(TraceThreadPoolExecutor.java:37)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
33.0
100.0
25.0
50.0
熟悉的异常又回来了!现在,我们不仅可以得到异常发生的Runnable实现内的信息,我们也知道了这个任务是在哪里提交的。如此丰富的信息,我相信可以帮助我们瞬间定位问题!
Fork/Join框架
拆分汇总
使用Fork/Join进行数据处理时的总体结构如图
image.png通过Fork/Join可以提高线程利用率,和避免数据竞争
提高利用率
线程A已经把自己的任务都执行完成了,而线程B还有一堆任务等着处理,此时,线程A就会“帮助”线程B,从线程B的任务队列中拿一个任务过来处理,尽可能地达到平衡。
避免数据竞争
当线程试图帮助别人时,总是从任务队列的底部开始拿数据,而线程试图执行自己的任务时,则是从相反的顶部开始拿。
image.png下面我们来看一下ForkJoinPool的一个重要的接口:
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task)
可以向ForkJoinPool线程池提交一个ForkJoinTask任务。所谓ForkJoinTask任务就是支持fork()分解以及join()等待的任务。ForkJoinTask有两个重要的子类,RecursiveAction和RecursiveTask。它们分别表示没有返回值的任务和可以携带返回值的任务。
image.png案例
01 public class CountTask extends RecursiveTask<Long>{
02 private static final int THRESHOLD = 10000;
03 private long start;
04 private long end;
05
06 public CountTask(long start,long end){
07 this.start=start;
08 this.end=end;
09 }
10
11 public Long compute(){
12 long sum=0;
13 boolean canCompute = (end-start)<THRESHOLD;
14 if(canCompute){
15 for(long i=start;i<=end;i++){
16 sum +=i;
17 }
18 }else{
19 //分成100个小任务
20 long step=(start+end)/100;
21 ArrayList<CountTask> subTasks=new ArrayList<CountTask>();
22 long pos=start;
23 for(int i=0;i<100;i++){
24 long lastOne=pos+step;
25 if(lastOne>end)lastOne=end;
26 CountTask subTask=new CountTask(pos,lastOne);
27 pos+=step+1;
28 subTasks.add(subTask);
29 subTask.fork();//这里进行了递归
30 }
31 for(CountTask t:subTasks){
32 sum+=t.join();
33 }
34 }
35 return sum;
36 }
37
38 public static void main(String[]args){
39 ForkJoinPool forkJoinPool = new ForkJoinPool();
40 CountTask task = new CountTask(0,200000L);
41 ForkJoinTask<Long> result = forkJoinPool.submit(task);
42 try{
43 long res = result.get();
44 System.out.println("sum="+res);
45 }catch(InterruptedException e){
46 e.printStackTrace();
47 }catch(ExecutionException e){
48 e.printStackTrace();
49 }
50 }
51 }
由于计算数列的和必然是需要函数返回值的,因此选择RecursiveTask作为任务的模型。上述代码第39行,建立ForkJoinPool线程池。在第40行,构造一个计算1到200000求和的任务。在第41行将任务提交给线程池,线程池会返回一个携带结果的任务,通过get()方法可以得到最终结果(第43行)。如果在执行get()方法时,任务没有结束,那么主线程就会在get()方法时等待。
在使用ForkJoin时需要注意,如果任务的划分层次很深,一直得不到返回,那么可能出现两种情况:第一,系统内的线程数量越积越多,导致性能严重下降。第二,函数的调用层次变得很深,最终导致栈溢出。不同版本的JDK内部实现机制可能有差异,从而导致其表现不同。
下面的StackOverflowError异常就是加深本例的调用层次,在JDK 8上得到的错误。
java.util.concurrent.ExecutionException: java.lang.StackOverflowError
at java.util.concurrent.ForkJoinTask.get(ForkJoinTask.java:1000)
at geym.conc.ch3.fork.CountTask.main(CountTask.java:51)
Caused by: java.lang.StackOverflowError
此外,ForkJoin线程池使用一个无锁的栈来管理空闲线程。如果一个工作线程暂时取不到可用的任务,则可能会被挂起,挂起的线程将会被压入由线程池维护的栈中。待将来有任务可用时,再从栈中唤醒这些线程。
网友评论