误区一:Java 8 中的 Stream 只要使用 parallel 就可以并行处理,只要使用 sequential 就可以单线程处理
parallel 方法和 sequential 方法不会对流产生任何影响,只是改动了状态位而已
Stream 是否并行取决于最后一次改变状态位的方法是 parallel 还是 sequential,下面两种表达式等价:
stream.parallel().filter(null).sequential().map(null).parallel().collect(null);
stream.parallel().filter(null).map(null).collect(null);
误区二:只要简单地加上 parallel 方法,就能使 Stream 高效并行
并行要求计算的规模要够大,确保在线程中运算的时间多过分配线程的开销
并行的数据必须是无序并且无状态的,前者会影响速度,后者会影响结果正确性(状态可以理解为 Java 中共享变量)
/* 从一加到一千万 */
public class ParallelStreamToGetSumTest {
private long num = 10_000_000;
/****************************** sequectial *******************************/
@Test // 正确的顺序计算
public void sequential() {
PerformanceUtils.test(() ->
LongStream.rangeClosed(1, num).sum()); // 运行时间 19 milliseconds
}
@Test // 错误的顺序计算,没有考虑拆箱的开销
public void errorSequential() {
PerformanceUtils.test(() ->
Stream.iterate(1L, i -> i + 1L).limit(num)
.reduce(0L, Long::sum)); // 运行时间 256 milliseconds
}
/****************************** parallel *******************************/
@Test // 正确的并行计算
public void parallel() {
PerformanceUtils.test(() ->
LongStream.rangeClosed(1, num).parallel().sum()); // 运行时间 4 milliseconds
}
@Test // 错误的并行计算,没有考虑开箱的开销与违反了无序性原则(iterate 没有范围,是顺序产生的)
public void errorParallel() {
PerformanceUtils.test(() ->
Stream.iterate(1L, i -> i + 1L).limit(num)
.parallel().reduce(0L, Long::sum)); // 运行时间 1680 milliseconds
}
@Test // 错误的并行计算,使用了共享变量,导致了线程竞争,并且共享变量的累加不是原子性的,所以结果也是错误的
public void errorParallel2() {
PerformanceUtils.test(() -> {
Accumulator accumulator = new Accumulator();
Stream.iterate(1L, i -> i + 1L).limit(num).parallel().forEach(accumulator::add);
return accumulator.total;
}); // 运行时间 1125 milliseconds,但结果错误
}
/****************************** inner class *******************************/
private static class Accumulator {
private long total = 0;
private void add(long value) {
total += value;
}
}
}
P.S. PerformanceUnitls 是笔者写的性能测试工具,源码位于 https://github.com/free-myself/commons/blob/master/src/main/java/org/tree/commons/utils/PerformanceUtils.java
网友评论