- 用来做什么
ForkJoinPool是ExecutorService(线程池服务)接口的实现,它专为可以递归分解成小块的工作而设计。
for/join框架将任务分配给线程池中的工作线程,充分利用多处理器的优势,提高程序性能。
使用fork/join框架的第一步是编写执行一部分工作的代码。类似的伪代码如下:
如果(当前工作部分足够小)
直接做这项工作
其他
把当前工作分成两部分
调用这两个部分并等待结果
将此代码包装在ForkJoinTask子类中,通常RecursiveTask(可以返回结果)或者RecursiveAction(不可以返回结果)
ForkJoinTask是RecursiveAction与RecursiveTask的父类, ForkJoinTask中使用了模板模式进行设计
,将ForkJoinTask的执行相关的代码进行隐藏,通过提供抽象类暴露用户的实际业务处理。
-
意图梳理
关键点:分解任务fork出新任务,汇集join任务执行结果
1.png
ForkJoin是由JDK1.7后提供多线并发处理框架。ForkJoin的框架的基本思想是分而治之。什么是分而治之?分而治之就是将一个复杂的计算,按照设定的阈值进行分解成多个计算,然后将各个计算结果进行汇总。相应的ForkJoin将复杂的计算当做一个任务。而分解的多个计算则是当做一个子任务。
-
工作窃取
2.png
说明:所谓工作窃取区别于传统线程池,是因为,虽然也是多线程工作,但是线程池是自己线程干自己的事情,干完了就休息,但是ForkJoin的工作窃取是当自己线程任务队列为空之后,则取其他任务队列取任务帮助完成,所以更加充分的利用CPU,性能更高。
- 实现思路
- 每个worker线程都维护一个任务队列,即ForkJoinWorkerThread的任务队列
- 任务队列是双向队列,这样可以同时实现LIFO和FIFO
- 子任务会被加入到原先任务所在Worker线程的任务队列
- Worker线程用LIFO的方法取出任务,后进队列的任务先取出来(子任务总是后加入队列,但是需要先执行)
- 当任务队列为空,会随机从其他的worker的队列中拿走一个任务执行(工作窃取:steal work)
- 如果一个worker线程遇到了join操作,而这适合正在处理其他任务,会等到这个任务结束。否则直接返回
- 如果一个worker线程窃取任务失败,它会用yield或者sleep之类的方法休息一会,在尝试(如果所有线程都是空闲状态,即没有任务运行,那么该县城也会进入阻塞状态等待新任务的到来)
- 重要:forkjoin不做具体任务拆分也不知道怎么拆分只是提供了功能,真实逻辑都在compute内部,自己实现
- 适用
- 使用尽可能少的线程池-在大多数情况下,最好的决定是为每个应用程序或系统使用一个线程池如果不需要特定的调整,则使用默认的公共线程池
- 使用合理的阙将ForkJoinTask拆分为子任务
- 避免在ForkJoinTask中出现任何阻塞
- 适合数据处理、结果汇总、统计等场景
- java8实例:java.util.Arrays类用于其parallelSort()方法
- 适合于内存操作,数据计算等,但是明显不适合文件操作,网络操作
- 基本使用
使用ForkJoin框架,需要创建一个ForkJoin的任务,而ForkJoinTask是一个抽象类,我们不需要去继承ForkJoinTask进行使用。因为ForkJoin框架为我们提供了RecursiveAction和RecursiveTask。我们只需要继承ForkJoin为我们提供的抽象类的其中一个并且实现compute方法。
private static class SumTask extends RecursiveTask<Integer> {
private int threshold ;
private static final int segmentation = 10;
private int[] src;
private int fromIndex;
private int toIndex;
public SumTask(int formIndex,int toIndex,int[] src){
this.fromIndex = formIndex;
this.toIndex = toIndex;
this.src = src;
this.threshold = src.length/segmentation;
}
@Override
protected Integer compute() {
//核心就是该方法
//可知forkjoin只提供具体抽象,但是实际任务怎么拆分还是
//看具体用户怎么书写此处的代码
//此处大意就是if内部的就是具体执行,else则是继续拆分任务,不做具体执行,实际拆分之后最终都会进入if内部
if((toIndex - fromIndex)<threshold ){
int count = 0;
System.out.println(" from index = "+fromIndex
+" toIndex="+toIndex);
for(int i = fromIndex;i<=toIndex;i++){
count+=src[i];
}
return count;
}else{
int mid = (fromIndex+toIndex)/2;
SumTask left = new SumTask(fromIndex,mid,src);
SumTask right = new SumTask(mid+1,toIndex,src);
invokeAll(left,right);
return left.join()+right.join();
}
}
}
使用ForkJoinPool进行执行
task要通过ForkJoinPool来执行,分割的子任务也会添加到当前工作线程的双端队列中,
进入队列的头部。当一个工作线程中没有任务时,会从其他工作线程的队列尾部获取一个任务(工作窃取)。
public static void main(String[] args) {
int[] array = MakeArray.createIntArray();
ForkJoinPool forkJoinPool= new ForkJoinPool();
SumTask sumTask = new SumTask(0,array.length-1,array);
long start = System.currentTimeMillis();
forkJoinPool.invoke(sumTask);
System.out.println("The count is "+sumTask.join()
+" spend time:"+(System.currentTimeMillis()-start)+"ms");
}
- Future
Future表示异步计算的结果,提供了用于检查计算是否完成、等待计算完成以及获取结果的方法
Future的类图结构
3.png
如上图可知,ForkJoin框架以及Future都是属于Future的子类或者抽象类继承类
- Callable
和runable一样的业务定义,但是本质上是有区别的:有返回值,可抛异常,同时call是运行在run里面的
public class CallDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService service= Executors.newCachedThreadPool();
Callable<String> callable=new Callable<String>() {
@Override
public String call() throws Exception {
//实际是运行在runable的run方法里面,所以也是多线程
return null;
}
};
//方式一:通过线程池-其实还是多线程运行,会发现内部就是FutureTask
// service.submit(callable);
//方式二:通过FutureTask包装callable
FutureTask<String> task=new FutureTask<String>(callable);
new Thread(task).start();
String a= task.get();//通过这种方式获取callable内部call的返回值
// System.out.println(a);null
}
}
之前如果想获取异步结果返回值,则需要手动通过countDownLatch实现,但是有了FutureTask则可以很方便的获取
- FutureTask应用
如上面的例子,FutureTask是为了更加简单的获取异步返回信息,并提供多线程或者线程池的操作,从而进行多个异步操作同时进行,而结果只取最长的任务那个,同时提供get获取返回值
4.png
总的执行时间,取决于执行最慢的逻辑
逻辑之间无依赖关系
,可同时执行,则可以应用多线程技术进行优化
- 自定义实现简单版本的FutureTask
public class TonyFutureTask<T> implements Runnable, Future {
Callable<T> callable;//业务逻辑在callable里面
T result=null;
volatile String state="NEW";//task执行状态
LinkedBlockingQueue<Thread> waiters=new LinkedBlockingQueue();
public TonyFutureTask(Callable<T> callable) {
this.callable=callable;
}
@Override
public void run() {
try {
//从这个里可知,call确实是在run方法内部执行的
result=callable.call();
} catch (Exception e) {
e.printStackTrace();
}finally {
state="END";
}
//唤醒等待者
Thread waiter=waiters.poll();
while (waiter != null) {
LockSupport.unpark(waiter);
//继续取出队列中的等待者
waiter=waiters.poll();
}
}
@Override
public T get() {
if ("END".equals(state)) {
return result;
}
waiters.offer(Thread.currentThread());//加入到等待队列,线程不继续往下执行了
while (!"END".equals(state)) {
//阻塞
LockSupport.park();
}
return result;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return false;
}
@Override
public boolean isCancelled() {
return false;
}
@Override
public boolean isDone() {
return false;
}
@Override
public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return null;
}
}
上面使用FutureTask的例子都可以使用这个自定义的task从而达到相同目的,实际上jdk内部的FutureTask也是这么实现的,只不过更具体更细节
- 线程安全级别
- 不可变的--这个类的实例是不可变的。这样的例子有String,Long,BigInterger
- 无条件的线程安全-- 这个类的实例是不可变的,但是这个类有足够的内部同步。例子:Random,ConcurrentHashMap,一般队列都是线程安全的,否则先进先出根本无法保障
- 有条件的线程安全-- 除了有些方法为进行安全的并发使用而需要外部同步之外,这种线程安全级别与无条件安全相同。例子包含:Collections.synchronized包装返回的集合,它们的迭代器是要求外部同步的。
- 非线程安全--这个类的实例是可变的。为了并发使用他们,客户必须利用自己选择的外部同步包围每个方法调用。例如:ArrayList
- 线程对立的--这个类不能被安全的被多个线程使用,即使所有的方法调用都被外围同步包围。
网友评论