背书中(引用书上的话):Java7中提供了用于并行执行任务的Fork/Join框架, 可以把任务分成若干个分任务,最终汇总每个分任务的结果得到总任务的结果。这篇我们来看看Fork/Join框架。
先举个栗子
一个字符串数组,需要把每个元素中的*字符的索引返回,并求和(自己编了个栗子,没有撒实际意义),用Fork/Join框架来实现,可以定义一个处理字符串数组的总任务,然后把总任务拆分,把数组中每个字符串交给子任务去处理,然后等待子任务执行完毕,汇总结果,并返回:
package thread.ForkJoin;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
/**
* @Description: .
* @Author: ZhaoWeiNan .
* @CreatedTime: 2017/6/21 .
* @Version: 1.0 .
*/
public class StringTask extends RecursiveTask<Integer>{
//要处理的字符串
private String dest;
public StringTask(String dest) {
this.dest = dest;
}
//父类RecursiveTask是一个抽象类,所以需要实现compute方法
@Override
protected Integer compute() {
if (dest == null || "".equals(dest))
return 0;
//判断字符串中 * 的索引,并返回
return dest.indexOf("*");
}
}
class ArrayTask extends RecursiveTask<Integer>{
//需要处理的字符串数组
private String[] array;
public ArrayTask(String[] array) {
this.array = array;
}
@Override
protected Integer compute() {
if (array == null || array.length < 1)
return 0;
//申明一个StringTask变量,作为子任务
StringTask stringTask;
//定义一个子任务队列,用于任务执行完毕后,获取子任务的执行结果
List<StringTask> list = new ArrayList<>();
int sum = 0;
//把字符串数组的中每一个字符串分给多个StringTask子任务去处理
for (String s : array){
//创建一个变量,作为子任务去处理字符串
stringTask = new StringTask(s);
//执行子任务
stringTask.fork();
//加入子任务队列
list.add(stringTask);
}
for (StringTask task : list){
//等子任务执行完毕,获取子任务执行的结果,并累加
sum += task.join();
}
return sum;
}
}
class Demo{
public static void main(String[] args){
//初始化字符串数组
String[] array = new String[]{"#####*####","##*########","###*#######","#*############"};
//创建一个总任务,处理字符串数组
ArrayTask arrayTask = new ArrayTask(array);
//创建执行任务的线程池ForkJoinPool对象
ForkJoinPool forkJoinPool = new ForkJoinPool();
//执行总任务
ForkJoinTask<Integer> forkJoinTask = forkJoinPool.submit(arrayTask);
//返回任务的结果
try {
System.out.println(forkJoinTask.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
代码放到了开源中国:http://git.oschina.net/zhaoweinan/forkjoin,有兴趣的小伙伴可以拿去
总的来说,Fork/Join框架就是一个用来并行执行任务的框架,可以把一个大任务,分成若干个子任务,等各个子任务执行完毕,可以把他们的执行结果获取到,并汇聚,起到了并行执行任务作用。
Fork/Join框架的构成
1.ForkJoinPool
ForkJoinPool类图ForkJoinPool继承了AbstractExecutorService抽象类,AbstractExecutorService实现了ExecutorService接口,由此看来ForkJoinPool也是线程池家族的一员,
过滤了下方法,只显示了公共方法,并截取了一下
ForkJoinPool使用invoke、execute、submit用来执行任务。
2.ForkJoinTask
ForkJoinTask类图ForkJoinTask是Fork/Join框架使用的任务类,实现了Future接口,我们一般使用它的两个子类RecursiveTask和RecursiveAction,
RecursiveAction类图
public abstract class RecursiveAction extends ForkJoinTask<Void> {
private static final long serialVersionUID = 5232453952276485070L;
RecursiveAction适用于没有返回结果的任务,
RecursiveTask类图public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
private static final long serialVersionUID = 5232453952276485270L;
RecursiveTask适用于有返回值的任务。
Work-Stealing (工作窃取)
粗略说一下Work-Stealing,ForkJoinPool具有 Work-Stealing (工作窃取)的能力,什么意思呢?就拿文章开头的栗子来说,把处理字符串数组的大任务,分成了若干个处理字符串的子任务,这些子任务线程执行完毕后,不会闲着,回去执行别的子任务,通俗的来说,Work-Stealing (工作窃取)就是线程从其他队列里面获取任务来执行。
Work-Stealing的优点
充分利用了线程,提高了线程并行执行任务的效率,并减少了线程间竞争带来的系统开销。
Work-Stealing的缺点
存在竞争的情况,而且占用了更多的系统资源。
Fork/Join框架原理
ForkJoinPool分析
贴一张ForkJoinPool的类图
大小不好调整,就截取一般吧
注意箭头所指的两个属性,
ForkJoinTask<?>数组submissionQueue,存放程序加到ForkJoinPool的任务
private ForkJoinTask<?>[] submissionQueue;
ForkJoinWorkerThread类继承了Thread,是一个线程类, ForkJoinWorkerThread[] workers就是一个线程数组,负责去执行submissionQueue中的任务
ForkJoinWorkerThread[] workers;
.....
public class ForkJoinWorkerThread extends Thread
ForkJoinTask分析
fork方法
获取当前ForkJoinWorkerThread线程,调用ForkJoinWorkerThread的pushTask方法执行ForkJoinTask任务
public final ForkJoinTask<V> fork() {
//获取当前ForkJoinWorkerThread线程,调用ForkJoinWorkerThread的pushTask方法执行任务
((ForkJoinWorkerThread) Thread.currentThread())
.pushTask(this);
return this;
}
再来看看ForkJoinWorkerThread的pushTask方法:
final void pushTask(ForkJoinTask<?> t) {
ForkJoinTask<?>[] q; int s, m;
if ((q = queue) != null) { // ignore if queue removed
long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
UNSAFE.putOrderedObject(q, u, t);
queueTop = s + 1; // or use putOrderedInt
if ((s -= queueBase) <= 2)
//调用线程池ForkJoinPool的signalWork方法
pool.signalWork();
else if (s == m)
growQueue();
}
}
ForkJoinWorkerThread的pushTask方法把任务ForkJoinTask加到了ForkJoinTask[]任务数组中,并调用了ForkJoinPool线程池的signalWork方法唤醒线程或者创建一个线程去执行任务,粗略的贴一下signalWork的关键代码:
private void addWorker() {
Throwable ex = null;
ForkJoinWorkerThread t = null;
try {
t = factory.newThread(this);
} catch (Throwable e) {
ex = e;
}
if (t == null) { // null or exceptional factory return
long c; // adjust counts
do {} while (!UNSAFE.compareAndSwapLong
(this, ctlOffset, c = ctl,
(((c - AC_UNIT) & AC_MASK) |
((c - TC_UNIT) & TC_MASK) |
(c & ~(AC_MASK|TC_MASK)))));
// Propagate exception if originating from an external caller
if (!tryTerminate(false) && ex != null &&
!(Thread.currentThread() instanceof ForkJoinWorkerThread))
UNSAFE.throwException(ex);
}
else
t.start();
}
最终调用到了这里,来执行任务。
join方法
从文章开头的栗子来看,join方法会阻塞当前线程,等待获取任务执行的结果
//百度了这四种状态的含义
private static final int NORMAL = -1; //NORMAL已完成
private static final int CANCELLED = -2; //CANCELLED已取消
private static final int EXCEPTIONAL = -3; //EXCEPTIONAL出现异常
private static final int SIGNAL = 1; //SIGNAL信号
public final V join() {
//先调用doJoin方法判断上面定义的四个状态
if (doJoin() != NORMAL)
return reportResult();
else
return getRawResult();
}
join方法先调用doJoin方法判断任务的状态,看看doJoin方法,
private int doJoin() {
Thread t; ForkJoinWorkerThread w; int s; boolean completed;
//获取当前ForkJoinWorkerThread线程
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
//如果状态是小于0 也就是 -1,-2,-3 分别代表已完成、已取消、出现异常
//直接返回状态
if ((s = status) < 0)
return s;
if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) {
//如果状态为1|SIGNAL|信号
//执行任务
try {
completed = exec();
} catch (Throwable rex) {
//出现异常,把状态改为-3|EXCEPTIONAL|出现异常,返回
return setExceptionalCompletion(rex);
}
if (completed)
//执行成功,把状态改为-1|NORMAL|已完成,返回
return setCompletion(NORMAL);
}
return w.joinTask(this);
}
else
return externalAwaitDone();
}
doJoin查看任务的状态,如果状态是-1|NORMAL|已完成,-2|CANCELLED|已取消,-3|EXCEPTIONAL|出现异常,证明任务已经执行完毕,返回状态位,如果状态是 1|SIGNAL|信号,则去执行任务,如果执行成功返回-1|NORMAL|已完成,出现异常返回-3|EXCEPTIONAL|出现异常。
再来看看返回结果的reportResult方法和getRawResult方法:
private V reportResult() {
int s; Throwable ex;
//如果状态为-2|CANCELLED|已取消,抛出一个CancellationException异常
if ((s = status) == CANCELLED)
throw new CancellationException();
if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
UNSAFE.throwException(ex);
//调用getRawResult方法返回结果
return getRawResult();
}
reportResult方法,先会判断状态,如果状态为-2|CANCELLED|已取消,则抛出一个CancellationException异常,否则调用getRawResult方法返回结果:
public abstract V getRawResult();
getRawResult方法在ForkJoinTask类是抽象方法,具体实现在他的两子类中。
RecursiveAction子类:
public final Void getRawResult() { return null; }
所以说RecursiveAction子类使用于没有返回值的任务。
RecursiveTask子类:
public final V getRawResult() {
return result;
}
RecursiveTask子类适用于有返回值的任务。
并行执行任务的Fork/Join框架是说完了。
欢迎大家来交流,指出文中一些说错的地方,让我加深认识。
谢谢大家!
网友评论