-
并行流
parallelStream
并行流就是一个把内容分成多个数据块,并用不同的线程分别处理每个数据块的流。这样就可以自动把给定操作的工作负荷分配给多核处理器的所有内核,让这些处理器都忙起来。
例如:下面的方法,接收数字n作为参数,返回从1到给定参数的所有数字的和public static long sequentialSum(long n) { return Stream.iterate(1L, i -> i + 1)//生成自然数无限流 .limit(n)//限制前n个数 .reduce(0L, Long::sum);//对所有数字求和
-
iterate
,Stream API提供由两个静态方法操你个函数生成流(常见的还有三种方法构建流,由值构建、由数组构建、由文件构建),Stream.iterate
和Stream.generate
可以创建无限流,会根据给定的函数按需创建。注意与generate
区别:
generate
不是一次对每个新生成的值应用函数,例如以下使用的供应源是无状态的,即它不会在任何地方记录任何值,以备以后计算使用。iterate
供应源不一定是无状态的。可以创建存储状态的供应源,它可以修改状态,并在为流生成下一个值时使用Stream.generate(Math::random) .limit(5) .forEach(System.out::println);
这时,若n特别大的时候,就可以做并行处理,将顺序流转换为并行流:对顺序流调用parallel
即可:在处理时,会把Stream在内部分成几块,在不同的块独立并行归纳操作,最后,同一个归纳操作会将各个子流的部分归纳结果合并起来,得到整个原始流的归纳结果。
public static long parallelSum(long n) {
return Stream.iterate(1L, i -> i + 1)
.limit(n)
.parallel()//将流转换为并行流
.reduce(0L, Long::sum);
}
image.png
但是,在实际操作时,对顺序流调用parallel
方法并不意味着对流有任何实际的变化。它内部实际上就是设了一个boolean
标志,表示你想让调用parallel
之后进行的所有操作都并行执行。对并行流调用sequential
方法就可以把它变成顺序流。将两个方法结合起来,就可以更细化地控制在遍历流时哪些操作要并行执行,哪些要顺序执行。例如:
stream.parallel()
.filter(...)
.sequential()
.map(...)
.parallel()//最后一次parallel或sequential调用会影响整个流水线
.reduce();
并行流默认使用的线程使用了默认的ForkJoinPool
,它默认的线程数量就是你的处理器数量。
-
性能
实际上,上面利用并行版本会比顺序版本满很多,主要有两方面原因:-
iterate
生成的是装箱的对象,必须拆箱成数字才能求和; - 很难把
iterate
分成多个独立块来并行执行,由于某些流操作会比其他车操作更容易并行化,之所以iterate
很难被分成独立执行的小块,是因为每次应用这个函数都要依赖前一次应用的结果,也就说,在这种情况下,归纳进程不像图7.1那样进行,整张数字列表在归纳过程开始时没有准备好,因而无法有效地把流划分为小块来并行处理。把流标记成并行,其实是给顺序处理增加了开销,它还要把每次求和操作分到一个不同的线程上。
所以说,并行编程可能很复杂,如果用的不对(
iterate
),会让整体的性能变差。 -
-
解决方法
使用LongStream.rangeClosed
方法,和iterate
相比有两个优点:-
LongStream.rangeClosed
直接产生原始类型的long
数字,没有装箱拆箱的开销。 -
LongStream.rangeClosed
会生成数字范围,很容易拆分为独立的小块。public static long parallelRangedSum(long n) { return LongStream.rangeClosed(1, n) .parallel() .reduce(0L, Long::sum); }
-
并行化是需要付出代价的,并行化过程本身需要对流做递归划分,把每个子流的归纳操作分配到不同的线程,然后把这些操作的结果合并成一个值。但在多个内核之间移动数据的代价也可能比你想的要大,所以很重要的一点是要保证在内核中并行执行工作的时间比在内核之间传输数据的时间长。
-
正确使用并行流
上面直接使用并行化的错误原因就是,使用的算法改变了某些共享状态。- 留意装箱。自动装箱和拆箱操作会大大降低性能。Java 8中有原始类型流(
IntStream
、LongStream
、DoubleStream
)来避免这种操作, - 有些操作本身在并行流上的性能就比顺序流差。特别是
limit
和findFirst
等依赖于元素顺序的操作,它们在并行流上执行的代价非常大。例如,findAny
会比findFirst
性能好,因为它不一定要按顺序来执行。 - 对于较小的数据量,选择并行流几乎从来都不是一个好的决定。并行处理少数几个元素的好处还抵不上并行化造成的额外开销。
- 要考虑流背后的数据结构是否易于分解。例如,
ArrayList
的拆分效率比LinkedList
高得多,因为前者用不着遍历就可以平均拆分,而后者则必须遍历。另外,用range
工厂方法创建的原始类型流也可以快速分解。 - 还要考虑终端操作中合并步骤的代价是大是小(例如
Collector
中的combiner
方法)。如果这一步代价很大,那么组合每个子流产生的部分结果所付出的代价就可能会超出通过并行流得到的性能提升。
- 留意装箱。自动装箱和拆箱操作会大大降低性能。Java 8中有原始类型流(
根据可分解性总结的一些流数据源适不适合并行
源 | 可分解性 |
---|---|
ArrayList |
极佳 |
LinkedList |
差 |
IntStream.range |
极差 |
Stream.iterate |
差 |
HashSet |
好 |
TreeSet |
好 |
分支/合并框架
分支/合并框架的目的是以递归方式将可以并行的任务拆分成更小的任务,然后将每个子任务的结果合并起来生成整体结果。它是ExecutorService
接口的一个实现,它把子任务分配给线程池(称为ForkJoinPool
)中的工作线程。
-
使用
RecursiveTask
把任务提交到这个池,必须创建RecursiveTask<R>的一个子类,其中R是并行化任务(以及所有子任务)产生的结果类型,或者如果任务不返回结果,则是RecursiveAction类型,则需要实现它的抽象方法compute
:
protected abstract R compute();
这个方法同时定义了将任务拆分成子任务的逻辑,以及无法再拆分或不方便再拆分时,生成单个子任务结果的逻辑。if (任务足够小或不可分) { 顺序计算该任务 } else { 将任务分成两个子任务 递归调用本方法,拆分每个子任务,等待所有子任务完成 合并每个子任务的结果 }
例如:利用分支/合并框架执行并行求和
public class ForkJoinSumCalculator
extends java.util.concurrent.RecursiveTask<Long> {
private final long[] numbers;
private final int start;
private final int end;
public static final long THRESHOLD = 10_000;//不再将任务分成子任务的大小
public ForkJoinSumCalculator(long[] numbers) {
this(numbers, 0, numbers.length);
}
private ForkJoinSumCalculator(long[] numbers, int start, int end) {
this.numbers = numbers;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
int length = end - start;
if (length <= THRESHOLD) {//如果大小小于或等于阈值,顺序计算结果
return computeSequentially();
}
ForkJoinSumCalculator leftTask =
new ForkJoinSumCalculator(numbers, start, start + length/2);
leftTask.fork();//利用另一个ForkJoinPool线程异步执行新创建的子任务
ForkJoinSumCalculator rightTask =
new ForkJoinSumCalculator(numbers, start + length/2, end);//创建一个任务为数组的后一半数组求和
Long rightResult = rightTask.compute();//同步执行第二个子任务,有可能允许进一步递归划分
Long leftResult = leftTask.join();//读取第一个子任务的结果,如果尚未完成就等待
return leftResult + rightResult;
}
//在子任务不再可分时计算结果的简单算法
private long computeSequentially() {
long sum = 0;
for (int i = start; i < end; i++) {{
sum += numbers[i];
}
return sum;
}
}
把数字数组传给ForkJoinSumCalculator
的构造函数,即可实现并行对前n个自然数求和:
public static long forkJoinSum(long n) {
long[] numbers = LongStream.rangeClosed(1, n).toArray();
//ForkJoinTask为RecursiveTask的父类
ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
return new ForkJoinPool().invoke(task);//invoke用来执行某个对象的目标方法
}
-
工作窃取:
image.png
分支/合并框架工程用一种称为工作窃取(work stealing)用于在线程池中的工作线程之间重新分配和平衡任务。在实际应用中,这意味着这些任务差不多被平均分配到ForkJoinPool中的所有线程上。每个线程都为分配给它的任务保存一个双向链式队列,每完成一个任务,就会从队列头上取出下一个任务开始执行。基于前面所述的原因,某个线程可能早早完成了分配给它的所有任务,也就是它的队列已经空了,而其他的线程还很忙。这时,这个线程并没有闲下来,而是随机选了一个别的线程,从队列的尾巴上“偷走”一个任务。这个过程一直继续下去,直到所有的任务都执行完毕,所有的队列都清空,有助于更好地在工作线程之间平衡负载。
-
Spliterator接口
和Iterator
一样,Spliterator
也用于遍历数据源中的元素,但它是为了并行执行而设计的。public interface Spliterator<T> { boolean tryAdvance(Consumer<? super T> action);//类似于Iterator,按顺序一个一个使用Spliterator中的元素,如果有其他元素需要遍历就返回true Spliterator<T> trySplit();//可以把一些元素划出去分给第二个Spliterator,让他们并行处理 long estimateSize();//估计剩下还有多少元素需要遍历 int characteristics();//代表Spliterator本身特征集的编码。可以用这些特征来更好地控制和优化它的使用。 }
将
Stream
拆分成多个部分的算法是一个递归过程,如图所示。第一步是对第一个Spliterator
调用trySplit
,生成第二个Spliterator
。第二步对这两个Spliterator
调用trysplit
,这样总共就有了四个Spliterator
。这个框架不断对Spliterator
调用trySplit
直到它返回null
,表明它处理的数据结构不能再分割,如第三步所示。最后,这个递归拆分过程到第四步就终止了,这时所有的Spliterator
在调用trySplit
时都返回了null
。
//7.3.2自定义Spliterator
网友评论