上一篇 <<<Callable与Future模式
下一篇 >>>Threadlocal
Fork/Join:是Java7提供的并行执行任务的框架,是一个把大任务分割成若干小任务,最终汇总小任务的结果得到大任务结果的框架小任务可以继续不断拆分n多个小任务。
实现原理
![](https://img.haomeiwen.com/i25147367/3451001d4c4d945b.png)
1、将大任务分解为很多小任务执行,加速执行效率
2、由于每个线程处理的速度不一样,如果先执行完任务的队列的线程,窃取其他没有执行完任务队列。
伪代码说明
if(任务很小){
直接计算得到结果
}else{
分拆成N个子任务
调用子任务的fork()进行计算
调用子任务的join()合并计算结果
}
核心代码
核心类
a、RecursiveAction:用于没有返回结果的任务
b、RecursiveTask:用于有返回结果的任务
核心方法
Compute()方法计算
fork() 方法类似于Thread.start(),但是它并不立即执行任务,而是将任务放入工作队列中,拆分子任务。
join()合并子任务 支持Join,即任务结果的合并
invoke方法提交的任务,调用线程直到任务执行完成才会返回,也就是说这是一个同步方法,且有返回结果;
execute方法提交的任务,调用线程会立即返回,也就是说这是一个异步方法,且没有返回结果;
submit方法提交的任务,调用线程会立即返回,也就是说这是一个异步方法,且有返回结果(返回Future实现类,可以通过get获取结果)。
1000条短信群发实战演示
public class ForkJoinSms extends RecursiveAction {
/**
* 存放手机号码
*/
private List<String> phones;
private int start;
private int end;
// 分割阈值
private int max = 100;
@Override
protected void compute() {
if (end - start < max) {
// 动态读取start到end之间的数据
phones = null;
// 结果发送
System.out.println(Thread.currentThread().getName() + ",start:" + start + ",end:" + end);
} else {
// 使用二分法做数据的分割
int l = (end + start) / 2;
ForkJoinSms left = new ForkJoinSms(start, l);
ForkJoinSms rigt = new ForkJoinSms(l + 1, end);
left.fork();
rigt.fork();
}
}
public static void main(String[] args) {
ForkJoinSms forkJoinSms = new ForkJoinSms(1, 1000);
ForkJoinPool forkJoinPool = new ForkJoinPool();
forkJoinPool.invoke(forkJoinSms);
}
}
日志结果:
- ForkJoinPool-1-worker-4,start:189,end:250
- ForkJoinPool-1-worker-7,start:1,end:63
- ForkJoinPool-1-worker-7,start:501,end:563
- ForkJoinPool-1-worker-1,start:64,end:125
- ForkJoinPool-1-worker-1,start:251,end:313
- ForkJoinPool-1-worker-1,start:876,end:938
- ForkJoinPool-1-worker-1,start:376,end:438
- ForkJoinPool-1-worker-6,start:564,end:625
- ForkJoinPool-1-worker-6,start:626,end:688
- ForkJoinPool-1-worker-3,start:689,end:750
- ForkJoinPool-1-worker-2,start:439,end:500
相关文章链接:
多线程基础
线程安全与解决方案
锁的深入化
锁的优化
Java内存模型(JMM)
Volatile解决JMM的可见性问题
Volatile的伪共享和重排序
CAS无锁模式及ABA问题
Synchronized锁
Lock锁
AQS同步器
Condition
CountDownLatch同步计数器
Semaphore信号量
CyclicBarrier屏障
线程池
并发队列
Callable与Future模式
Threadlocal
Disruptor框架
如何优化多线程总结
网友评论