![](https://img.haomeiwen.com/i12204552/dfb5c9e68af8f1bb.png)
1.概念
Fork/Join框架是一个实现了ExecutorService接口的多线程处理器。它可以把一个大的任务划分为若干个小的任务并发执行,充分利用可用的资源,进而提高应用的执行效率。
Fork/Join实现了ExecutorService,所以它的任务也需要放在线程池中执行。它的不同在于它使用了工作窃取算法,空闲的线程可以从满负荷的线程中窃取任务来帮忙执行。(我个人理解的工作窃取大意就是:由于线程池中的每个线程都有一个队列,而且线程间互不影响。那么线程每次都从自己的任务队列的头部获取一个任务出来执行。如果某个时候一个线程的任务队列空了,而其余的线程任务队列中还有任务,那么这个线程就会从其他线程的任务队列中取一个任务出来帮忙执行。就像偷取了其他人的工作一样)。
Fork/Join框架的核心是继承了AbstractExecutorService的ForkJoinPool类,它保证了工作窃取算法和ForkJoinTask的正常工作。
Oracle官方定义原文:
The fork/join framework is an implementation of the ExecutorService interface that helps you take advantage of multiple processors. It is designed for work that can be broken into smaller pieces recursively. The goal is to use all the available processing power to enhance the performance of your application.
As with any ExecutorService implementation, the fork/join framework distributes tasks to worker threads in a thread pool. The fork/join framework is distinct because it uses a work-stealing algorithm. Worker threads that run out of things to do can steal tasks from other threads that are still busy.
The center of the fork/join framework is the ForkJoinPool class, an extension of the AbstractExecutorService class. ForkJoinPool implements the core work-stealing algorithm and can execute ForkJoinTask processes.
2.基本用法
1).Fork/Join就是要讲一个大的任务分割成若干小的任务,所以第一步是要做任务的分割。实现FrokJoinTask需要一个继承了RecursiveTask或RecursiveAction的基类,并根据自身业务情况将上面的代码放入基类的coupute方法中。RecursiveTask和RecursiveAction都继承了FrokJoinTask,它俩的区别就是RecursiveTask有返回值而RecursiveAction没有。主要重写compute方法,大致如下:
if (这个任务足够小){
执行要做的任务
} else {
将任务分割成两小部分
执行两小部分并等待执行结果
}
2).做好了基类就可以开始调用了,调用时首先我们需要Fork/Join线程池ForkJoinPool,然后向线程池中提交一个ForkJoinTask并得到结果。ForkJoinPool的submit方法的入参是一个ForkJoinTask,返回值也是一个ForkJoinTask,它提供一个get方法可以获取到执行结果。
大致如下:
ForkJoinPool pool = new ForkJoinPool();
ForkJoinTask<Object> future = pool.submit(forkJoinService);
Object result= future.get();
pool.shutdown();
3.示例代码
import java.util.Date;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
public class ForkJoinTask extends RecursiveTask<List<Object>> {
//定义最小拆分任务数量
private static final int HOLD= 2;
private List<UserInfo> userInfoList;
private UserService userService;
ForkJoinTask(List<UserInfo> userInfoList,UserService userService){
this.userInfoList= userInfoList;
this.userService= userService;
}
//任务拆分算法
@Override
protected List<Object> compute() {
List<Object> results= new ArrayList<>();
boolean canNotCompute= userInfoList.size()<=HOLD;
if(canNotCompute){
for(UserInfo userInfo:userInfoList){
results.add(userService.getUserInfoByProvince(userInfo.province));
}
return results;
}else {
int middle = userInfoList.size() / 2;
List<UserInfo> subList1= userInfoList.subList(0,middle);
List<UserInfo> subList2= userInfoList.subList(middle,userInfoList.size());
ForkJoinTask left= new ForkJoinTask(subList1,userService);
ForkJoinTask right= new ForkJoinTask(subList2,userService);
left.fork();
right.fork();
List<Object> join=left.join();
join.addAll(right.join());
return join;
}
}
//测试
public static void main(String[] args) throws ExecutionException, InterruptedException {
List<UserInfo> userInfos= parseUserInfoList();
UserService userService= new UserService();
//1.使用forkjoin执行任务
Date date= new Date();
ForkJoinPool pool = new ForkJoinPool(20);
ForkJoinTask task= new ForkJoinTask(userInfos,userService);
Future<List<Object>> future= pool.submit(task);
List<Object> list= future.get();
System.out.println(list);
System.out.println("fork join spend date: "+(new Date().getTime()-date.getTime()));
//2.使用同步执行任务
date= new Date();
List<Object> results= new ArrayList<>();
for(UserInfo userInfo:userInfos){
results.add(userService.getUserInfoByProvince(userInfo.province));
}
System.out.println(results);
System.out.println("synchronize spend date: "+(new Date().getTime()-date.getTime()));
}
//模拟数据请求,Sleep100毫秒
private static class UserService{
public String getUserInfoByProvince(String id){
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
return id;
}
}
//构造要分组的数据
private static List<UserInfo> parseUserInfoList(){
List<UserInfo> list= new ArrayList<>();
int i=0;
while (i<100){
list.add(new UserInfo("ligang"+i,10+i,"0000"+i));
i++;
}
return list;
}
private static class UserInfo{
private String name;
private int age;
private String province;
UserInfo(String name,int age,String province){
this.age= age;
this.name= name;
this.province= province;
}
}
}
运行结果:
[00000, 00001, 00002, 00003, 00004, 00005, 00006, 00007, 00008, 00009, 000010, 000011, 000012, 000013, 000014, 000015, 000016, 000017, 000018, 000019, 000020, 000021, 000022, 000023, 000024, 000025, 000026, 000027, 000028, 000029, 000030, 000031, 000032, 000033, 000034, 000035, 000036, 000037, 000038, 000039, 000040, 000041, 000042, 000043, 000044, 000045, 000046, 000047, 000048, 000049, 000050, 000051, 000052, 000053, 000054, 000055, 000056, 000057, 000058, 000059, 000060, 000061, 000062, 000063, 000064, 000065, 000066, 000067, 000068, 000069, 000070, 000071, 000072, 000073, 000074, 000075, 000076, 000077, 000078, 000079, 000080, 000081, 000082, 000083, 000084, 000085, 000086, 000087, 000088, 000089, 000090, 000091, 000092, 000093, 000094, 000095, 000096, 000097, 000098, 000099]
fork join spend date: 785
[00000, 00001, 00002, 00003, 00004, 00005, 00006, 00007, 00008, 00009, 000010, 000011, 000012, 000013, 000014, 000015, 000016, 000017, 000018, 000019, 000020, 000021, 000022, 000023, 000024, 000025, 000026, 000027, 000028, 000029, 000030, 000031, 000032, 000033, 000034, 000035, 000036, 000037, 000038, 000039, 000040, 000041, 000042, 000043, 000044, 000045, 000046, 000047, 000048, 000049, 000050, 000051, 000052, 000053, 000054, 000055, 000056, 000057, 000058, 000059, 000060, 000061, 000062, 000063, 000064, 000065, 000066, 000067, 000068, 000069, 000070, 000071, 000072, 000073, 000074, 000075, 000076, 000077, 000078, 000079, 000080, 000081, 000082, 000083, 000084, 000085, 000086, 000087, 000088, 000089, 000090, 000091, 000092, 000093, 000094, 000095, 000096, 000097, 000098, 000099]
synchronize spend date: 10471
可见使用fork join在性能上提升十几倍。
网友评论