前言
并行编程势不可挡,Java从1.7开始就提供了Fork/Join 支持并行处理。java1.8 进一步加强。
并行处理就是将任务拆分子任务,分发给多个处理器同时处理,之后合并。
一、ForkJoinPool
ForkJoinPool是JDK7引入的线程池,核心思想是将大的任务拆分成多个小任务(即fork),然后在将多个小任务处理汇总到一个结果上(即join),非常像MapReduce处理原理。同时,它提供基本的线程池功能,支持设置最大并发线程数,支持任务排队,支持线程池停止,支持线程池使用情况监控,也是AbstractExecutorService的子类,主要引入了“工作窃取”机制,在多CPU计算机上处理性能更佳。
1.1 work-stealing(工作窃取算法)
work-stealing(工作窃取):ForkJoinPool提供了一个更有效的利用线程的机制,当ThreadPoolExecutor还在用单个队列存放任务时,ForkJoinPool已经分配了与线程数相等的队列,当有任务加入线程池时,会被平均分配到对应的队列上,各线程进行正常工作,当有线程提前完成时,会从队列的末端“窃取”其他线程未执行完的任务,当任务量特别大时,CPU多的计算机会表现出更好的性能。
1.2 常用方法
ForkJoinTask:我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制,通常情况下我们不需要直接继承ForkJoinTask类,而只需要继承它的子类,Fork/Join框架提供了以下两个子类:
- RecursiveAction:用于没有返回结果的任务。
- RecursiveTask:用于有返回结果的任务。
ForkJoinPool :ForkJoinTask需要通过ForkJoinPool来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。
线程池监控
在线程池使用监控方面,主要通过如下方法:
- isTerminated:判断线程池对应的workQueue中是否有待执行任务未执行完;
- awaitTermination:判断线程池是否在约定时间内完成,并返回完成状态;
- getQueuedSubmissionCount:获取所有待执行的任务数;
- getRunningThreadCount:获取正在运行的任务数。
1.3 例子
提交有返回值的任务
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.stream.IntStream;
/**
* @Description 提交有返回值的任务
*/
public class ForkJoinRecursiveTask {
/**
* 最大计算数
*/
private static final int MAX_THRESHOLD = 100;
public static void main(String[] args) {
//创建ForkJoinPool
ForkJoinPool pool = new ForkJoinPool();
//异步提交RecursiveTask任务
ForkJoinTask<Integer> forkJoinTask = pool.submit(new CalculatedRecursiveTask(0, 1000));
try {
//根据返回类型获取返回值
Integer result = forkJoinTask.get();
System.out.println("结果为:" + result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
pool.shutdown();
}
}
private static class CalculatedRecursiveTask extends RecursiveTask<Integer> {
private final int start;
private final int end;
public CalculatedRecursiveTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
//判断计算范围,如果小于等于5,那么一个线程计算就够了,否则进行分割
if ((end - start) <= MAX_THRESHOLD) {
//返回[start,end]的总和
return IntStream.rangeClosed(start, end).sum();
} else {
//任务分割
int middle = (end + start) / 2;
CalculatedRecursiveTask task1 = new CalculatedRecursiveTask(start, middle);
CalculatedRecursiveTask task2 = new CalculatedRecursiveTask(middle + 1, end);
//执行
task1.fork();
task2.fork();
//等待返回结果
return task1.join() + task2.join();
}
}
}
}
提交无返回值的任务
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
/**
* @Description 提交无返回值的任务
*/
public class ForkJoinRecursiveAction {
/**
* 最大计算数
*/
private static final int MAX_THRESHOLD = 100;
private static final AtomicInteger SUM = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
//创建ForkJoinPool
ForkJoinPool pool = new ForkJoinPool();
//异步提交RecursiveAction任务
pool.submit(new CalculatedRecursiveTask(0, 1000));
//等待3秒后输出结果,因为计算需要时间
pool.awaitTermination(1, TimeUnit.SECONDS);
System.out.println("结果为:" + SUM);
pool.shutdown();
}
private static class CalculatedRecursiveTask extends RecursiveAction {
private final int start;
private final int end;
public CalculatedRecursiveTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected void compute() {
//判断计算范围,如果小于等于5,那么一个线程计算就够了,否则进行分割
if ((end - start) <= MAX_THRESHOLD) {
//因为没有返回值,所有这里如果我们要获取结果,需要存入公共的变量中
SUM.addAndGet(IntStream.rangeClosed(start, end).sum());
} else {
//任务分割
int middle = (end + start) / 2;
CalculatedRecursiveTask task1 = new CalculatedRecursiveTask(start, middle);
CalculatedRecursiveTask task2 = new CalculatedRecursiveTask(middle + 1, end);
//执行
task1.fork();
task2.fork();
}
}
}
}
虽然ForkJoin实际的代码非常复杂,但是通过这个例子应该了解到ForkJoinPool底层的分治算法和工作窃取原理。ForkJoin不仅在java8之后的stream中广泛使用。golang等其他语言的协程机制,也是采用类似的原理来实现的。
二、Stream API
Java 8 引入了许多特性,Stream API是其中重要的一部分。区别 InputStream OutputStream,Stream API 是处理对象流而不是字节流。
执行原理如下,流分串行和并行两种执行方式
// 串行执行流
stream().filter(e -> e > 10).count();
// 并行执行流
.parallelStream().filter(e -> e > 10).count()
三、ParallelStreams
Java 8为ForkJoinPool添加了一个通用线程池,这个线程池用来处理那些没有被显式提交到任何线程池的任务。它是ForkJoinPool类型上的一个静态元素,它拥有的默认线程数量等于运行计算机上的处理器数量。当调用Arrays类上添加的新方法时,自动并行化就会发生。比如用来排序一个数组的并行快速排序,用来对一个数组中的元素进行并行遍历。自动并行化也被运用在Java 8新添加的Stream API中。
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
numbers.parallelStream().forEach(out::println);
3.1 ParallelStreams 执行原理
并行流就是一个把内容拆分成多个数据块,用不同线程分别处理每个数据块的流。对收集源调用parallelStream方法就能将集合转换为并行流。
并行执行时,java将流划分为多个子流,分散在不同CPU并行处理,然后进行合并。
并行比串行更快,取决于两方面条件:
- 处理器核心数量,并行处理核心数越多自然处理效率会更高。
- 处理的数据量越大,优势越强。
3.2 ParallelStreams注意事项
3.2.1 因为是并行流,所以所涉及到的数据结构,需要使用线程安全的,比如
listByPage.parallelStream().forEach(str-> {
//使用线程安全的数据结构
//ConcurrentHashMap
//CopyOnWriteArrayList
//等等进行操作
});
3.2.2 默认优先用在CPU密集型计算中
- 用在IO密集比如HTTP请求也可以,实际使用是情况而定。
由于默认并行流使用的是全局的线程池,线程数量是根据cpu核数设置的,所以如果某个操作占用了线程,将影响全局其他使用并行流的操作。默认情况,fork/join 池会为每个处理器分配一个线程。
所以折中的方案是自定义线程池来执行某个并行流操作
public static void main(String[] args) throws Exception {
List<String> list = Arrays.asList("1", "2", "3", "4", "5");
ForkJoinPool forkJoinPool = new ForkJoinPool(10);
forkJoinPool.execute(() -> {
list.parallelStream().forEach(number -> {
try {
System.out.println(Thread.currentThread().getName());
Thread.sleep(5000);
} catch (InterruptedException e) {
}
});
});
ForkJoinPool forkJoinPool2 = new ForkJoinPool(3);
ForkJoinTask<?> forkJoinTask = forkJoinPool2.submit(() -> {
list.parallelStream().forEach((number) -> {
try {
System.out.println(Thread.currentThread().getName());
Thread.sleep(5000);
} catch (InterruptedException e) {
}
});
});
// 阻塞,等线程执行完毕
//forkJoinTask.get();
// 阻止主线程关闭
Thread.sleep(10000L);
}
3.2.3 使用并行流时,不要使用collectors.groupingBy、collectors.toMap
使用并行流时,不要使用collectors.groupingBy、collectors.toMap,替代为collectors.groupingByConcurrent、collectors.toConcurrentMap,或直接使用串行流。
原因,并行流执行时,通过操作Key来合并多个map的操作比较昂贵。详细大家可以查看官网介绍。
https://docs.oracle.com/javase/tutorial/collections/streams/parallelism.html#concurrent_reduction
Map<String, List<Person>> byGender = roster.stream()
.collect(Collectors.groupingBy(Person::getGender));
ConcurrentMap<String, List<Person>> byGender = roster.parallelStream()
.collect(Collectors.groupingByConcurrent(Person::getGender));
ParallelStreams 默认使用 ForkJoinPool.commonPool()线程池。
注意:默认情况下,你写的 ParallelStreams 都是通过该线程池调度执行,整个应用程序都共享这个线程池。
针对 Stream API 一些局限性,Github上有个开源库做了补充。
https://github.com/pivovarit/parallel-collectors
四、并行流和顺序流转换
parallel 和 sequential
Integer reduce = Stream.iterate(0, n -> n + 2).limit(10000).reduce(1, Integer::sum);
// 将顺序流转化为并行流
Integer reduce1 = Stream.iterate(0, n -> n + 2).limit(10000).parallel().reduce(1, Integer::sum);
// 将并行流转为顺序流
Integer reduce2 = Stream.iterate(0, n -> n + 2).limit(10000).parallel().map(integer -> integer + 2).sequential().reduce(1, Integer::sum);
最后一次parallel或sequential调用会影响整个流水线
配置并行流使用的线程池:
- 1、并行流内部使用了默认的ForkJoinPool。它默认的线程数量就是你的处理器数量,这个值是由Runtime.getRuntime().availableProcessors()得到的。
- 2、可以通过系统属性java.util.concurrent.ForkJoinPool.common.parallelism来修改线程池大小
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12");
System.out.println( System.getProperty("java.util.concurrent.ForkJoinPool.common.parallelism"));
- 3、这是一个全局设置,因此它会对代码中所有的并行流产生影响。反过来说,目前我们还无法专为某个并行流指定这个值。一般而言,让ForkJoinPool的大小等于处理器数量是个不错的默认值,除非你有很充足的理由,否则强烈建议你不要修改它。
正确的姿势使用并行流
并行流并不总是比顺序流快。所以正确的姿势使用并行流是尤为重要的,不然适得其反。
决定某个特定情况下是否有必要使用并行流。可以参考一下几点建议
-
1、如果有疑问,测量。并行流有时候会和你的直觉不一致,所以在考虑选择顺序流还是并行流时,很重要的建议就是用适当的基准来检查其性能。
-
2、留意装箱。自动装箱和拆箱操作会大大降低性能。Java 8中有原始类型流(IntStream、LongStream和DoubleStream)来避免这种操作,但凡有可能都应该用这些流
-
3、有些操作本身在并行流上的性能就比顺序流差。特别是limit和findFirst等依赖于元素顺序的操作,它们在并行流上执行的代价非常大。例如,findAny会比findFirst性能好,因为它不一定要按顺序来执行。你总是可以调用unordered方法来把有序流变成无序流。那么,如果你需要流中的N个元素而不是专门要前N个的话,对无序并行流调用limit可能会比单个有序流(比如数据源是一个List)更高效。
-
4、考虑流的操作流水线的总计算成本。设N是要处理的元素的总数,Q是一个元素通过流水线的大致处理成本,则N*Q就是这个对成本的一个粗略的定性估计。Q值较高就意味着使用并行流时性能好的可能性比较大。
-
5、对于较小的数据量,选择并行流几乎从来都不是一个好的决定。并行处理少数几个元素的好处还抵不上并行化造成的额外开销。
-
6、考虑流背后的数据结构是否易于分解。例如,ArrayList的拆分效率比LinkedList高得多,因为前者用不着遍历就可以平均拆分,后者则必须遍历。另外,用range工厂方法创建的原始类型流也可以快速分解。可以参考一下表格:
| 数据源 | 性能 |
| ------ | ------ |
| ArrayList | 极佳 |
| LinkedList | 差 |
| IntStrean.range | 极佳 |
| Strean.iterate | 差 |
| HashSet | 好 |
| TreeSet | 好 | -
7、流自身的特点以及流水线中的中间操作修改流的方式,都可能会改变分解过程的性能。例如,一个SIZED流可以分成大小相等的两部分,这样每个部分都可以比较高效地并行处理,但筛选操作可能丢弃的元素个数无法预测,从而导致流本身的大小未知。
-
8、还要考虑终端操作中合并步骤的代价是大是小(例如Collector中的combiner方法)。如果这一步代价很大,那么组合每个子流产生的部分结果所付出的代价就可能会超出通过并行流得到的性能提升。
参考:
https://www.cnblogs.com/sky233/p/13052380.html
https://blog.csdn.net/niyuelin1990/article/details/78658251
https://blog.csdn.net/qq_34748010/article/details/124533694
https://www.cnblogs.com/GGuoLiang/p/13616999.html
https://blog.csdn.net/weixin_43144460/article/details/119250381
网友评论