1 前言
还记的很久之前一位很时尚的面试官问过,"知道什么是fork/join吗","我不会","回去等通知吧"。面试就是这么残酷,一个不懂就gg,所以平时还是要多多学习,注重基础的积累,框架在牛也是从基础类库开始构造出来的。
那么什么是fork/join框架
下面实现代码的源码位置 :
https://github.com/ishaveanyone/skill-pool/blob/master/jvm/src/main/java/com/dist/ForkJoin.java
2 fork/join框架
fork/join框架是jdk7提出的一个复杂任务处理方案,它提供了一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。
可以通过下面的这张图进行抽象理解
其实从fork/join英文也可以进行简单理解,fork英文中是叉,也就是分支,join是连接,也就是合并,其实将大的任务分解成一个个足够小的任务进行完成最后合并一个结果是不是就是我们很熟悉的分治思想。
分治:字面上的解释是“分而治之”,就是把一个复杂的问题分成两个或更多的相同或相似的子问题,
再把子问题分成更小的子问题……直到最后子问题可以简单的直接求解,原问题的解即子问题的解的合并。
这个技巧是很多高效算法的基础,如排序算法(快速排序,归并排序),傅立叶变换(快速傅立叶变换)……
3 工作窃取算法
<i>摘自:java并发编程艺术</i>
工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。那么,为什么需要使用工作
窃取算法呢?假如我们需要做一个比较大的任务,可以把这个任务分割为若干互不依赖的子任务,为了减少线程间
的竞争,把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程队列
一一对应。比如A线程负责处理A队列里的任务。但是,有的线程会先把自己队列里的任务干完,而其他线程对应的
队列里还有任务等待处理。干完活的线程与其等着,不如去帮他线程干活,于是它就去其他线程的队列里窃取一个
任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会
使用端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。
其实这是一个分治算法解决问题的思想,并不是在java特有的实现方案,大白话来讲就是将分治的多个任务执行放在一个队列,就是用一个队列来管理一组任务,每一个队列对应一个工作线程,如果当前的线程队列任务执行完了。它可以取检查其他线程是否还没有执行完所有的任务,如果是,那么他会进行任务窃取,帮助执行,加快了任务的执行时间。
4 java7中提供的fork/join框架
ForkJoinTask: java中fork/join 框架的任务我们通过ForkJoinTask来实现,这是一个抽象类,它提供在任务中执行fork()和join()操作的机制。通常情况,我们不需要直接继承ForkJoinTask类,只需要继承它的子类,Fork/Join框架提供了以下两个子类:
RecursiveAction: 只是一个动作,比如消费,不需要返回结果的任务。
RecursiveTask: 有返回结果的任务。
ForkJoinPool: ForkJoinTask需要通过ForkJoinPool来执行。这个就是实现了工作窃取算法的线程池,该类在1.8中被优化了,同时1.8在Executors类中还新增了两个newWorkStealingPool工厂方法。
4.1 使用RecursiveAction
实现一个功能,模拟消费队列数据的场景如下:
/**
* fork join 提供了两个任务的实现子类
*/
//是一个 动作没有返回值的动作使用这个类
class MyRecursiveAction extends RecursiveAction{
public static void main(String[] args) throws InterruptedException {
MyRecursiveAction myRecursiveAction=new MyRecursiveAction();
myRecursiveAction.resouces=new ArrayList(){{
add("1");
add("2");
add("3");
add("4");
}};
ForkJoinPool forkJoinPool=new ForkJoinPool();
forkJoinPool.execute(myRecursiveAction);
forkJoinPool.awaitTermination(100, TimeUnit.MILLISECONDS);//需要注意的是,forkjoin开启异步执行,所以必须给出一定的cpu调用的时间,否则主线程执行退出,任务来不及开启
}
//消费一个队列的数据
List<String> resouces=new ArrayList<>();
@Override
protected void compute() {
if(resouces.size()<=2){
// System.out.println(1);
System.out.println("进来了");
resouces.forEach(o->{
System.out.println(o);
});
// resouces.clear();
}else{
MyRecursiveAction lMyRecursiveAction=new MyRecursiveAction();
MyRecursiveAction rMyRecursiveAction=new MyRecursiveAction();
lMyRecursiveAction.resouces=resouces.subList(0,resouces.size()/2);
rMyRecursiveAction.resouces=resouces.subList(resouces.size()/2,resouces.size());
lMyRecursiveAction.fork();//分支
rMyRecursiveAction.fork();
}
}
}
4.2 使用RecursiveTask
实现一个功能,求数组中所有的偶数的场景:
//如果有返回值叫做任务
//下面模仿一个任务找出一个数组中的所有的偶数
class MyRecursiveTask extends RecursiveTask<Integer> {
private int [] array;
int maxLenth=2;
int count=0;
public void setArray(int [] array ){
this.array=array;
}
public static void main(String[] args) {
int[] array={1,2,3,4,5,6,7,8,1,2,3,4,5,6,7,8,1,2,3,4,5,6,7,8,1,2,3,4,5,6,7,8,1,2,3,4,5,6,7,8,1,2,3,4,5,6,7,8,1,2,3,4,5,6,7,8,1,2,3,4,5,6,7,8,1,2,3,4,5,6,7,8,1,2,3,4,5,6,7,8,1,2,3,4,5,6,7,8,1,2,3,4,5,6,7,8,1,2,3,4,5,6,7,8,1,2,3,4,5,6,7,8,1,2,3,4,5,6,7,8,1,2,3,4,5,6,7,8,1,2,3,4,5,6,7,8,1,2,3,4,5,6,7,8,1,2,3,4,5,6,7,8,1,2,3,4,5,6,7,8,1,2,3,4,5,6,7,8,1,2,3,4,5,6,7,8,1,2,3,4,5,6,7,8,1,2,3,4,5,6,7,8,1,2,3,4,5,6,7,8,1,2,3,4,5,6,7,8,1,2,3,4,5,6,7,8,1,2,3,4,5,6,7,8,1,2,3,4,5,6,7,8,1,2,3,4,5,6,7,8,1,2,3,4,5,6,7,8,1,2,3,4,5,6,7,8,1,2,3,4,5,6,7,8,1,2,3,4,5,6,7,8,1,2,3,4,5,6,7,8,1,2,3,4,5,6,7,8,1,2,3,4,5,6,7,8,1,2,3,4,5,6,7,8,1,2,3,4,5,6,7,8,1,2,3,4,5,6,7,8,1,2,3,4,5,6,7,8,1,2,3,4,5,6,7,8,1,2,3,4,5,6,7,8,1,2,3,4,5,6,7,8,1,2,3,4,5,6,7,8};
MyRecursiveTask myRecursiveTask=new MyRecursiveTask();
myRecursiveTask.setArray(array);
ForkJoinPool forkJoinPool=new ForkJoinPool();
Future<Integer> feature=forkJoinPool.submit(myRecursiveTask);
try {
// forkJoinPool.awaitTermination(100, TimeUnit.MILLISECONDS);
Thread.sleep(5);
System.out.println(myRecursiveTask.count);
// System.out.println(feature.get());
System.out.println(myRecursiveTask.join());
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
protected Integer compute() {
if(array.length<=maxLenth){
//如果长度小于10 那么直接进行计算
for (int i : array) {
if(i%2==0){
count++;
}
}
}else {
MyRecursiveTask leftMyRecursiveTask = new MyRecursiveTask();
MyRecursiveTask rightMyRecursiveTask = new MyRecursiveTask();
leftMyRecursiveTask.setArray(Arrays.copyOfRange(array, 0, array.length / 2));
rightMyRecursiveTask.setArray(Arrays.copyOfRange(array, array.length / 2 , array.length ));
leftMyRecursiveTask.fork();
rightMyRecursiveTask.fork();
int lc = leftMyRecursiveTask.join();
int rc = rightMyRecursiveTask.join();
count = lc + rc + count;
}
return count;
}
}
4.3 Fork/Join框架的实现原理
其实就是通过ForkJoinPool线程来执行不同队列的任务(使用工作窃取算法),ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成,ForkJoinTask数组负责将存放程序提交给ForkJoinPool的任务,而ForkJoinWorkerThread数组负责执行这些任务。
4.3.1 fork:
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
final void push(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a; ForkJoinPool p;
int b = base, s = top, n;
if ((a = array) != null) { // ignore if queue removed
int m = a.length - 1; // fenced write for task visibility
U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
U.putOrderedInt(this, QTOP, s + 1);
if ((n = s - b) <= 1) {
if ((p = pool) != null)
p.signalWork(p.workQueues, this);
}
else if (n >= m)
growArray();
}
}
每一次fork当前工作线程就会把这个任务加入到自己的workquene工作队列中。并且进行后续的执行,调用sinalWork方法唤醒线程执行该任务。
4.3.2 join
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
首先,它调用了doJoin()方法,通过doJoin()方法得到当前任务的状态来判断返回什么结果,任务状态有6种:(1.8)
static final int DONE_MASK = 0xf0000000; // mask out non-completion bits
static final int NORMAL = 0xf0000000; // must be negative 正常完成
static final int CANCELLED = 0xc0000000; // must be < NORMAL 被取消了
static final int EXCEPTIONAL = 0x80000000; // must be < CANCELLED 抛出异常
static final int SIGNAL = 0x00010000; // must be >= 1 << 16 信号,被其他任务唤醒
static final int SMASK = 0x0000ffff; // short bits for tags
在doJoin()方法里,首先通过查看任务的状态,看任务是否已经执行完成,如果执行完成,则直接返回任务状态;如果没有执行完,则从任务数组里取出任务并执行。如果任务顺利执行完成,则设置任务状态为NORMAL,如果出现异常,则记录异常,并将任务状态设置为EXCEPTIONAL。
网友评论