最近刚好用到了ForkJoinPool, 就再简要回顾复习一下ForkJoinPool。
Java7 提供了ForkJoinPool来支持将一个任务拆分成多个“小任务”并行计算,再把多个“小任务”的结果合并成总的计算结果。
ForkJoinPool是ExecutorService的实现类,因此是一种特殊的线程池。
使用方法:创建了ForkJoinPool实例之后,就可以调用ForkJoinPool的submit(ForkJoinTask task) 或invoke(ForkJoinTask task)方法来执行指定任务了。
其中ForkJoinTask代表一个可以并行、合并的任务。ForkJoinTask是一个抽象类,它还有两个抽象子类:RecusiveAction和RecusiveTask。其中RecusiveTask代表有返回值的任务,而RecusiveAction代表没有返回值的任务。
详见
ForkJoinPool是基于并行计算的, 将一个问题划分为多个子问题,知道子问题可以在单个线程中直接解决,然后收集子任务的结果,最终完成整改大的任务。
ForkJoin算法是基于工作窃取算法(work-stealing algorithm),确保充分使用CPU资源。
我们详细了解一下工作窃取算法(work-stealing algorithm).
假设系统有4个处理器,任务T可以划分为12个子任务,如果所示的T1 到 T12。每个处理自有三个子任务。 但是假设3号处理器很繁忙,而2号处理器比较空闲 (也就是2号处理器完成了所有任务或者处于等待状态),然后2号处理器将会询问3号处理器,是否需要帮忙并接过一部分3号处理器的任务,所以2号处理器从3号处理器“窃取”了一部分工作。
下图描述了整个过程
这里写图片描述
我们在仔细看看ForkJoin的内部细节, 图2
这里写图片描述
上面的图2显示了递归方法如何将任务划分为子任务,直到子任务很简单可以直接执行。
可以通过new直接创建ForkJoinPool实例 对象:
ForkJoinPool pool = new ForkJoinPool(parallelism);
其中的parallelism 是并行处理的线程,默认是is处理器的个数。
为了支持并行集合与并行流,Java提供了通过的ForkJoinPool对象。我们可以使用ForkJoinPool的静态方法获得通用的pool对象:
ForkJoinPool commonPool = ForkJoinPool.commonPool();
可以调用getParallelism(), 查看当前commonPool的并行处理的线程数字。:
commonPool.getParallelism();
我们一斐波那契数列数列计算为例,来说明ForkJoinPool的用法。
第一步,定义RecusiveTask
package com.yq.forkJoinPool;
import java.util.concurrent.RecursiveTask;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* @author EricYang
* @version 2018/6/23 16:11
*/
public class FibonacciComputation extends RecursiveTask<Integer> {
public static final Log log = LogFactory.getLog(FibonacciComputation.class);
private final int number;
public FibonacciComputation(int number) {
this.number = number;
}
@Override
public Integer compute() {
//如果当前要计算的小于等于1, 直接返回
if (number <= 1) {
return number;
}
//如果要计算的斐波那契数列大于1, 我们就分成n-1和n-2, 等n-1和n-2计算完毕后将这两者的结果合并
FibonacciComputation f1 = new FibonacciComputation(number - 1);
f1.fork();
log.info("Current Thread Name" + Thread.currentThread().getName());
FibonacciComputation f2 = new FibonacciComputation(number - 2);
return f2.compute() + (Integer)f1.join();
}
}
第二步,创建ForkJoinPool并提交任务
package com.yq.forkJoinPool;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.concurrent.ForkJoinPool;
/**
* Simple to Introduction
* className: FibonacciComutationMain
*
* @author EricYang
* @version 2018/6/23 16:12
*/
public class FibonacciComutationMain {
public static final Log log = LogFactory.getLog(FibonacciComutationMain.class);
public static void main(String args[]){
//斐波那契数列 计算第20个斐波那契数列
// 以递归的方法定义:F(1)=1,F(2)=1, F(n)=F(n-1)+F(n-2)(n>=2,n∈N*)
int number = 20;
int poolSize = Runtime.getRuntime().availableProcessors();
//ForkJoinPool基本上默认就是Runtime.getRuntime().availableProcessors()个县城
ForkJoinPool pool = new ForkJoinPool(poolSize);
long beforeTime = System.currentTimeMillis();
log.info("Parallelism => "+ pool.getParallelism());
Integer result = (Integer) pool.invoke(new FibonacciComputation(number));
log.info("Total Time in MilliSecond Taken -> "+ (System.currentTimeMillis() - beforeTime));
log.info(number +"the element of Fibonacci Number = "+result);
}
}
网友评论