1 用并行流并行处理数据
sequential() 顺序流
parallel() 并行流
并行流内部使用了默认的ForkJoinPool,默认的线 程 数 量 就 是 你 的 处 理 器 数 量 ,
这 个 值 是 由 Runtime.getRuntime().availableProcessors()得到的。
可 以 通 过 系 统 属 性 java.util.concurrent.ForkJoinPool.common.
parallelism来改变线程池大小:
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism","12");
long start = System.nanoTime();
long duration = (System.nanoTime() - start) / 1_000_000;
留意装箱。自动装箱和拆箱操作会大大降低性能。

2 分支/合并框架
工作窃取概念
分支/合并框架工程用一种称为工作窃取( work stealing)的技术来解决这个问题。在实际应
用中,这意味着这些任务差不多被平均分配到ForkJoinPool中的所有线程上。每个线程都为分
配给它的任务保存一个双向链式队列,每完成一个任务,就会从队列头上取出下一个任务开始执
行。基于前面所述的原因,某个线程可能早早完成了分配给它的所有任务,也就是它的队列已经
空了,而其他的线程还很忙。这时,这个线程并没有闲下来,而是随机选了一个别的线程,从队
列的尾巴上“偷走”一个任务。这个过程一直继续下去,直到所有的任务都执行完毕,所有的队
列都清空。这就是为什么要划成许多小任务而不是少数几个大任务,这有助于更好地在工作线程
之间平衡负载。
一般来说,这种工作窃取算法用于在池中的工作线程之间重新分配和平衡任务。
分支/合并框架的目的是以递归方式将可以并行的任务拆分成更小的任务,
然后将每个子任务的结果合并起来生成整体结果。
它是ExecutorService接口的一个实现,它把子任务分配给线程池(称为ForkJoinPool)中的工作线程。
首先来看看如何定义任务和子任务。
使用 RecursiveTask
要把任务提交到这个池,必须创建RecursiveTask<R>的一个子类,
其中R是并行化任务(以及所有子任务)产生的结果类型,或者如果任务不返回结果,
则是RecursiveAction类型(当然它可能会更新其他非局部机构)。
要定义RecursiveTask, 只需实现它唯一的抽象方法
compute:
protected abstract R compute();
这个方法同时定义了将任务拆分成子任务的逻辑,以及无法再拆分或不方便再拆分时,生成
单个子任务结果的逻辑。正由于此,这个方法的实现类似于下面的伪代码:
if (任务足够小或不可分) {
顺序计算该任务
} else {
将任务分成两个子任务
递归调用本方法,拆分每个子任务,等待所有子任务完成
合并每个子任务的结果
}

用分支/合并框架执行并行求和
package com.hw;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.LongStream;
public class ForkJoinSumCalculator
extends java.util.concurrent.RecursiveTask<Long> {
private final long[] numbers;
private final int start;
private final int end;
public static final long THRESHOLD = 10_000;
public ForkJoinSumCalculator(long[] numbers) {
this(numbers, 0, numbers.length);
}
private ForkJoinSumCalculator(long[] numbers, int start, int end) {
this.numbers = numbers;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
int length = end - start;
if (length <= THRESHOLD) {
return computeSequentially();
}
ForkJoinSumCalculator leftTask =
new ForkJoinSumCalculator(numbers, start, start + length / 2);
leftTask.fork();
ForkJoinSumCalculator rightTask =
new ForkJoinSumCalculator(numbers, start, start + length / 2);
leftTask.fork();
Long rightResult = rightTask.compute();
Long leftResult = leftTask.join();
return leftResult + rightResult;
}
private long computeSequentially() {
long sum = 0;
for (int i = start; i < end; i++) {
sum += numbers[i];
}
return sum;
}
// 测试
public static long forkJoinSum(long n) {
long[] numbers = LongStream.rangeClosed(1, n).toArray();
ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
return new ForkJoinPool().invoke(task);
}
}
3 Spliterator
Spliterator是Java 8中加入的另一个新接口;这个名字代表“可分迭代器” ( splitableiterator)。
和Iterator一样, Spliterator也用于遍历数据源中的元素,但它是为了并行执行而设计的。
虽然在实践中可能用不着自己开发Spliterator,但了解一下它的实现方式会对并行流的工作原理有更深入的了解。
Java 8已经为集合框架中包含的所有数据结构提供了一个默认的Spliterator实现。
集合实现了Spliterator接口,接口提供了一个spliterator方法。
这个接口定义了若干方法,如下面的代码清单所示。
public interface Spliterator<T> {
boolean tryAdvance(Consumer<? super T> action);
Spliterator<T> trySplit();
long estimateSize();
int characteristics();
}

final String SENTENCE =
" Nel mezzo del cammin di nostra vita " +
"mi ritrovai in una selva oscura" +
" ché la dritta via era smarrita ";
Stream<Character> stream = IntStream.range(0, SENTENCE.length())
.mapToObj(SENTENCE::charAt);
总结:
内部迭代让你可以并行处理一个流,而无需在代码中显式使用和协调不同的线程。
虽然并行处理一个流很容易,却不能保证程序在所有情况下都运行得更快。并行软件的
行为和性能有时是违反直觉的,因此一定要测量,确保你并没有把程序拖得更慢。
像并行流那样对一个数据集并行执行操作可以提升性能,特别是要处理的元素数量庞大,
或处理单个元素特别耗时的时候。
从性能角度来看,使用正确的数据结构,如尽可能利用原始流而不是一般化的流,几乎
总是比尝试并行化某些操作更为重要。
分支/合并框架让你得以用递归方式将可以并行的任务拆分成更小的任务,在不同的线程
上执行,然后将各个子任务的结果合并起来生成整体结果。
Spliterator定义了并行流如何拆分它要遍历的数据。
Lambda表达式有助于避免使用面向对象设计模式时容易出现的僵化的模板代码,典型的
比如策略模式、模板方法、观察者模式、责任链模式,以及工厂模式。
Lambda表达式会让栈跟踪的分析变得更为复杂。
流提供的peek方法在分析Stream流水线时,能将中间变量的值输出到日志中,是非常有用的工具。
网友评论