思想:分而治之
用来做什么
ForkJoinPool是ExecutorService接口的实现,它专为可以递归分解成小块的工作而设计。fork / join框架将任务分配给线程池中的工作线程,充分利用多处理器的优势,提高程序性能。使用fork / join框架的第一步是编写执行一部分工作的代码。类似的伪代码如下:
如果(当前工作部分足够小)
直接做这项工作
其他
把当前工作分成两部分调用这两个部分并等待结果
将此代码包装在ForkJoinTask子类中,通常是RecursiveTask (可以返回结果)或RecursiveAction.
先来看一个例子,下边的例子是模拟读取一个大文件的过程,使用了java多线程中的Callable方式,每个线程最多读取10个,这样的多线程一起读取的方式来提升效率的。
package forkjoin;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class ExecuteTask_Demo {
/*
思考:现在有很多网络地址存在ArrayList中,我需要做网络请求,
为了并发执行,我就需要将这个列表进行拆分
*/
static ArrayList<String> urls = new ArrayList<String>() { //假设当前是一个从外部读取的资源文件
{
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
add("http://www.baidu.com");
add("http://www.sina.com");
}
};
public static String doRequest(String url) {
//模拟网络请求
return "Kane ... read ... " + url + "\n";
}
static class Task implements Callable<String> {
int start;
int end;
public Task(int start, int end) {
this.start = start;
this.end = end;
}
@Override
public String call() throws Exception {
String result = "";
for (int i = start; i < end; i++) {
result += doRequest(urls.get(i));
}
return result;
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService pool = Executors.newFixedThreadPool(4);
List<Future> futures = new ArrayList<>();
int size = urls.size();
int groupSize = 10;
int groupCount = (size - 1) / groupSize + 1;
for (int groupIndex = 0; groupIndex < groupCount - 1; groupIndex++) {
int leftIndex = groupSize * groupIndex;
int rightIndex = groupSize * (groupIndex + 1);
Future<String> future = pool.submit(new Task(leftIndex, rightIndex));
futures.add(future);
}
for (Future future : futures) {
System.out.println(future.get());
}
}
}
而ForkJoinPool其实本质上来说也是类似的的方式,但是它是使用了二分拆分的方式(类似二分查找),对任务进行拆分,然后拆分的任务结束后再合并起来,一起返回。
forkjoinpool 的例子如下
package forkjoin;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
public class ForkJoinTest {
/*
思考:现在有很多网络地址存在ArrayList中,我需要做网络请求,
为了并发执行,我就需要将这个列表进行拆分
*/
static ArrayList<String> urls = new ArrayList<String>() { //假设当前是一个从外部读取的资源文件
// 忽略代码
}
public static String doRequest(String url) {
//模拟网络请求
return "Kane ... read ... " + url + "\n";
}
static class Job extends RecursiveTask<String> {
List<String> urls;
int start;
int end;
public Job(List<String> urls, int start, int end) {
this.urls = urls;
this.start = start;
this.end = end;
}
@Override
protected String compute() { //定义任务拆分的规则,是forkjoinpool 的 核心内容
int count = end - start;
if (count <= 10) {
//直接执行
String rs = "";
for (int i = start; i < end; i++) {
String response = doRequest(urls.get(i));
rs += response;
}
return rs;
} else {
//拆分
int x = (start + end) / 2;
Job job1 = new Job(urls, start, x);
job1.fork();
Job job2 = new Job(urls, x, end);
job2.fork();
String rs = "";
rs += job1.join();
rs += job2.join();
return rs;
}
}
}
static ForkJoinPool forkJoinPool = new ForkJoinPool(3, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true);
public static void main(String[] args) throws ExecutionException, InterruptedException {
Job job = new Job(urls, 0, urls.size());
ForkJoinTask<String> forkJoinTask = forkJoinPool.submit(job);
String result = forkJoinTask.get();
System.out.println(result);
}
}
意图梳理
实现思路
- 每个Worker线程都维护一个任务队列,即ForkJoinWorkerThread中的任务队列
- 任务队列是双向队列,这样可以同时实现LIFO和FIFO
- 子任务会被加入到原先任务所在Worker线程的任务队列
- Worker线程用LIFO的方法取出任务,后进队列的任务先取出来(子任务总是后加入队列,但是需要先执行)
- 当任务队列为空,会随机从其他的worker的队列中拿走一个任务执行(工作窃取: steal work)
- 如果一个Worker线程遇到了join操作,而这时候正在处理其他任务,会等到这个任务结束。否则直接返回
- 如果一个Worker线程窃取任务失败,它会用yield或者sleep之类的方法休息一会儿,再尝试(如果所有线程都是空闲状态,即没有任务运行,那么该线程也会进入阻塞状态等待新任务的到来)
适用
- 使用尽可能少的线程池-在大多数情况下,最好的决定是为每个应用程序或系统使用一个线程池如果不需要特定调整,请使用默认的公共线程池
- 使用合理的阈值将ForkJoinTask拆分为子任务
- 避免在ForkJoinTask中出现任何阻塞
适合数据处理、结果汇总、统计等场景;
java8实例:java.util.Arrays类用于其parallelSort()方法
结语:工作窃取带来的性能提升偏理论,API的复杂性较高,实际研发中可控性来说不如其他API。
如果觉得有收获就点个赞吧,更多知识,请点击关注查看我的主页信息哦~
网友评论