1、什么是ForkJoinPool
虽然目前处理器核心数已经发展到很大数目,但是按任务并发处理并不能完全充分的利用处理器资源,因为一般的应用程序没有那么多的并发处理任务。基于这种现状,考虑把一个任务拆分成多个单元,每个单元分别得到执行,最后合并每个单元的结果。 有些线程一直处于忙碌状态,而有些线程却一直空闲状态,这么可以充分的利用处理器资源
Fork/Join框架是JAVA7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干小任务,最终汇总每个小任务结果得到大任务结果的框架。
ForkJoinPool主要采用分治算法 和 工作窃取算法
2、分治法
分治法的基本思想是将一个规模为N的问题分解为K个规模较小的子问题,这些子问题的相互独立且与原问题的性质相同,求出子问题的解之后,将这些解合并,就可以得到原有问题的解。是一种分目标完成的程序算法。

3、工作窃取算法
一个大任务拆分成多个小任务,为了减少线程间的竞争,把这些子任务分别放到不同的队列中,并且每个队列都有单独的线程来执行队列里的任务,线程和队列一一对应。
但是会出现这样一种情况:A线程处理完了自己队列的任务,B线程的队列里还有很多任务要处理。
A是一个很热情的线程,想过去帮忙,但是如果两个线程访问同一个队列,会产生竞争,所以A想了一个办法,从双端队列的尾部拿任务执行。而B线程永远是从双端队列的头部拿任务执行。

注意:线程池中的每个线程都有自己的工作队列(PS,这一点和ThreadPoolExecutor不同,ThreadPoolExecutor是所有线程公用一个工作队列,所有线程都从这个工作队列中取任务),当自己队列中的任务都完成以后,会从其它线程的工作队列中偷一个任务执行,这样可以充分利用资源。(特别注意)
工作窃取算法的优点:
利用了线程进行并行计算,减少了线程间的竞争。
工作窃取算法的缺点:
1、如果双端队列中只有一个任务时,线程间会存在竞争。(只有一个任务的时候需要特殊处理)
2、窃取算法消耗了更多的系统资源,如会创建多个线程和多个双端队列。
4、主要类
1、ForkJoinTask:
使用该框架,需要创建一个ForkJoin任务,它提供在任务中执行fork和join操作的机制。一般情况下,我们并不需要直接继承ForkJoinTask类,只需要继承它的子类,它的子类有两个:
RecursiveAction:用于没有返回结果的任务。
RecursiveTask:用于有返回结果的任务。
2、ForkJoinPool:
任务ForkJoinTask需要通过ForkJoinPool来执行。
5、例子
计算1到40的累加和,当间隔大于limit,则继续递归
package com.concurrency2;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class MyTest13 {
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool(2);
MyTask myTask = new MyTask(1, 40);
int result = forkJoinPool.invoke(myTask);
System.out.println(result);
forkJoinPool.shutdown();
}
}
class MyTask extends RecursiveTask<Integer> {
private int limit = 4;
private int firstIndex;
private int lastIndex;
public MyTask(int firstIndex, int lastIndex) {
this.firstIndex = firstIndex;
this.lastIndex = lastIndex;
}
@Override
protected Integer compute() {
int result = 0;
int gap = lastIndex - firstIndex;
boolean flag = gap <= limit;
if (flag) {
System.out.println(Thread.currentThread().getName());
for(int i = firstIndex;i <= lastIndex;i ++)
{
result += i;
}
} else {
int middleIndex = (firstIndex + lastIndex) / 2;
MyTask leftTask = new MyTask(firstIndex, middleIndex);
MyTask rightTask = new MyTask(middleIndex + 1, lastIndex);
invokeAll(leftTask, rightTask);
int leftTaskResult = leftTask.join();
int rightTaskResult = rightTask.join();
result = leftTaskResult + rightTaskResult;
}
return result;
}
}
输出
ForkJoinPool-1-worker-1
ForkJoinPool-1-worker-0
ForkJoinPool-1-worker-1
ForkJoinPool-1-worker-0
ForkJoinPool-1-worker-1
ForkJoinPool-1-worker-0
ForkJoinPool-1-worker-1
ForkJoinPool-1-worker-0
820
网友评论