1.java.util.stream包的介绍
在元素流上支持函数式操作,例如在集合的map-reduce转换:
int sum = widgets.stream()
.filter(b -> b.getColor() == RED)
.mapToInt(b -> b.getWeight())
.sum();
The key abstraction introduced in this package is stream. The classes Stream,
IntStream, LongStream, and DoubleStream are streams over objects and the primitive
int, long and double types. Streams differ from collections in several ways:
* No storage. A stream is not a data structure that stores elements; instead, it conveys
elements from a source such as a data structure, an array, a generator function, or an
I/O channel, through a pipeline of computational operations.
* Functional in nature. An operation on a stream produces a result, but does not modify
its source. For example, filtering a Stream obtained from a collection produces a new
Stream without the filtered elements, rather than removing elements from the source
collection.
* Laziness-seeking. Many stream operations, such as filtering, mapping, or duplicate
removal, can be implemented lazily, exposing opportunities for optimization. For
example, "find the first String with three consecutive vowels" need not examine all the
input strings. Stream operations are divided into intermediate (Stream-producing)
operations and terminal (value- or side-effect-producing) operations. Intermediate
operations are always lazy.
* Possibly unbounded. While collections have a finite size, streams need not. Short-
circuiting operations such as limit(n) or findFirst() can allow computations on infinite
streams to complete in finite time.
* Consumable. The elements of a stream are only visited once during the life of a
stream. Like an Iterator, a new stream must be generated to revisit the same
elements of the source.
此包中引入的关键抽象是流。 Stream、IntStream、LongStream和DoubleStream类是对象和原始int、long和double类型的流。 Streams在如下几个方面与集合不同:
- 没有存储空间。流不是存储元素的数据结构;相反,它将来自数据结构、数组、生成器函数或I / O通道等源的元素传送给计算操作的管道。
- 天生函数式。对流的操作会产生结果,但不会修改其源。例如,过滤从集合中获取的Stream会生成一个新Stream,而不是从源集合中删除元素。
- 惰性操作。许多流操作(例如过滤、映射或删除重复)可以懒惰地实现,从而暴露出优化的机会。例如,“查找具有三个连续元音的第一个字符串”不需要检查所有输入字符串。流操作分为中间(流生成)操作和终止(生成值或副作用)操作。中间操作总是懒惰执行。
- 可能是无限的。虽然集合需要大小有限,但流不需要。诸如limit(n)或findFirst()之类的短路操作可以允许无限流上的计算在有限时间内完成。
- 消费型。流的元素在流的生命期间仅能访问一次。与迭代器一样,必须生成新流以重新访问源的相同元素。
可以通过多种方式获得流:
- 集合Collection的stream()和parallelStream()方法
- 从数组获得:Arrays.stream(Object[])
- stream类的静态工厂方法:
Stream.of(Object[])
IntStream.range(int, int)
Stream.iterate(Object, UnaryOperator); - 文件的行:BufferedReader.lines()
- 文件路径流:Files
- 随机数流:Random.ints()
- 其他的流生成方法:
BitSet.stream()
Pattern.splitAsStream(java.lang.CharSequence),
JarFile.stream()
1.1 Stream operations and pipelines流操作及流水线
Stream operations are divided into intermediate and terminal
operations, and are combined to form stream pipelines. A stream
pipeline consists of a source (such as a Collection, an array, a
generator function, or an I/O channel); followed by zero or more
intermediate operations such as Stream.filter or Stream.map;
and a terminal operation such as Stream.forEach or
Stream.reduce.
Intermediate operations return a new stream. They are always
lazy; executing an intermediate operation such as filter() does not
actually perform any filtering, but instead creates a new stream
that, when traversed, contains the elements of the initial stream
that match the given predicate. Traversal of the pipeline source
does not begin until the terminal operation of the pipeline is
executed.
Terminal operations, such as Stream.forEach or IntStream.sum,
may traverse the stream to produce a result or a side-effect. After
the terminal operation is performed, the stream pipeline is
considered consumed, and can no longer be used; if you need to
traverse the same data source again, you must return to the data
source to get a new stream. In almost all cases, terminal
operations are eager, completing their traversal of the data
source and processing of the pipeline before returning. Only the
terminal operations iterator() and spliterator() are not; these are
provided as an "escape hatch" to enable arbitrary client-
controlled pipeline traversals in the event that the existing
operations are not sufficient to the task.
Processing streams lazily allows for significant efficiencies; in a
pipeline such as the filter-map-sum example above, filtering,
mapping, and summing can be fused into a single pass on the
data, with minimal intermediate state. Laziness also allows
avoiding examining all the data when it is not necessary; for
operations such as "find the first string longer than 1000
characters", it is only necessary to examine just enough strings
to find one that has the desired characteristics without examining
all of the strings available from the source. (This behavior
becomes even more important when the input stream is infinite
and not merely large.)
Intermediate operations are further divided into stateless and
stateful operations. Stateless operations, such as filter and map,
retain no state from previously seen element when processing a
new element -- each element can be processed independently of
operations on other elements. Stateful operations, such as
distinct and sorted, may incorporate state from previously seen
elements when processing new elements.
Stateful operations may need to process the entire input before
producing a result. For example, one cannot produce any results
from sorting a stream until one has seen all elements of the
stream. As a result, under parallel computation, some pipelines
containing stateful intermediate operations may require multiple
passes on the data or may need to buffer significant data.
Pipelines containing exclusively stateless intermediate
operations can be processed in a single pass, whether
sequential or parallel, with minimal data buffering.
Further, some operations are deemed short-circuiting operations.
An intermediate operation is short-circuiting if, when presented
with infinite input, it may produce a finite stream as a result. A
terminal operation is short-circuiting if, when presented with
infinite input, it may terminate in finite time. Having a short-
circuiting operation in the pipeline is a necessary, but not
sufficient, condition for the processing of an infinite stream to
terminate normally in finite time.
流操作分为中间操作和终止操作,并组合成流水线。流水线由源(例如集合,数组,生成器函数或I / O通道)组成;然后是零个或多个中间操作,例如Stream.filter或Stream.map;和一个终止操作,如Stream.forEach或Stream.reduce。
中间操作返回一个新流。它们总是惰性的;执行诸如filter()之类的中间操作实际上并不执行任何过滤,而是创建一个新流,当遍历时,该流包含与给定谓词匹配的初始流的元素。在执行流水线的终止操作之前,不会遍历流水线源。
终止操作(例如Stream.forEach或IntStream.sum)可以遍历流以产生结果或副作用。在执行终止操作之后,流水线被认为已消耗掉了,不能再使用;如果需要再次遍历同一数据源,则必须从数据源获取新流。在几乎所有情况下,终止操作都很渴望,在返回之前完成数据源的遍历和流水线的处理。只有终止操作iterator()和spliterator()不是;这些是作为“逃生舱口”提供的,以便在现有操作不足以执行任务时启用任意客户端控制的流水线遍历。
懒惰地处理流可以显著提高效率;在诸如上面的filter-map-sum示例的流水线中,过滤,映射和求和可以融合到数据的单个pass中,具有最小的中间状态。懒惰还允许在不必要时避免检查所有数据;对于诸如“查找超过1000个字符的第一个字符串”之类的操作,只需检查足够的字符串以找到具有所需特征的字符串,而不检查源中可用的所有字符串。 (当输入流是无限的而不仅仅是很大的时候,这种行为变得更加重要。)
中间操作进一步分为无状态操作和有状态操作。无状态操作(例如过滤器和映射)在处理新元素时不保留先前看到的元素的状态 - 每个元素都可以独立于其他元素上的操作进行处理。有状态操作(例如distinct和sorted)可以在处理新元素时包含先前看到的元素的状态。
有状态操作可能需要在生成结果之前处理整个输入。例如,在查看流的所有元素之前,不能通过对流进行排序来产生任何结果。因此,在并行计算下,一些包含有状态中间操作的流水线可能需要对数据进行多趟处理,或者可能需要缓冲重要数据。仅包含无状态中间操作的管道可以一次处理完成,无论是顺序还是并行,具有最小的数据缓冲。
此外,一些操作被认为是短路操作。如果在呈现无限输入时,它可能产生有限流,则中间操作是短路的。如果在呈现无限输入时它可以在有限时间内终止,则终止操作是短路的。在流水线中进行短路操作是处理无限流以在有限时间内正常终止的必要但不充分的条件。
1.2 Parallelism
Processing elements with an explicit for-loop is inherently serial.
Streams facilitate parallel execution by reframing the
computation as a pipeline of aggregate operations, rather than as
imperative operations on each individual element. All streams
operations can execute either in serial or in parallel. The stream
implementations in the JDK create serial streams unless
parallelism is explicitly requested. For example, Collection has
methods Collection.stream() and Collection.parallelStream(),
which produce sequential and parallel streams respectively;
other stream-bearing methods such as IntStream.range(int, int)
produce sequential streams but these streams can be efficiently
parallelized by invoking their BaseStream.parallel() method. To
execute the prior "sum of weights of widgets" query in parallel,
we would do
具有显式for循环的元素处理本质上是串行的。 Streams通过将计算重新定义为聚合操作的流水线而不是作为每个单独元素的命令操作来促进并行执行。 所有流操作都可以串行或并行执行。 除非明确请求并行性,否则JDK中的流实现会创建串行流。 例如,Collection有方法Collection.stream()和Collection.parallelStream(),它们分别产生顺序和并行流; 其他流方法(如IntStream.range(int,int))生成顺序流,但可以通过调用它们的BaseStream.parallel()方法有效地并行化这些流。 要并行执行先前的“sum of weights of widgets”查询,可以这样做:
int sumOfWeights = widgets.parallelStream()
.filter(b -> b.getColor() == RED)
.mapToInt(b -> b.getWeight())
.sum();
The only difference between the serial and parallel versions of
this example is the creation of the initial stream, using
"parallelStream()" instead of "stream()". When the terminal
operation is initiated, the stream pipeline is executed sequentially
or in parallel depending on the orientation of the stream on which
it is invoked. Whether a stream will execute in serial or parallel
can be determined with the isParallel() method, and the
orientation of a stream can be modified with the
BaseStream.sequential() and BaseStream.parallel() operations.
When the terminal operation is initiated, the stream pipeline is
executed sequentially or in parallel depending on the mode of the
stream on which it is invoked.
Except for operations identified as explicitly nondeterministic,
such as findAny(), whether a stream executes sequentially or in
parallel should not change the result of the computation.
Most stream operations accept parameters that describe user-
specified behavior, which are often lambda expressions. To
preserve correct behavior, these behavioral parameters must be
non-interfering, and in most cases must be stateless. Such
parameters are always instances of a functional interface such
as Function, and are often lambda expressions or method
references.
此示例的串行和并行版本之间的唯一区别是使用“parallelStream()”而不是“stream()”创建初始流。当启动终止操作时,根据调用它的流的方式,顺序地或并行地执行流水线。可以使用isParallel()方法确定流是以串行还是并行方式执行,并且可以使用BaseStream.sequential()和BaseStream.parallel()操作修改流的方式。当启动终止操作时,根据调用它的流的模式,顺序地或并行地执行流水线。
除了标识为显式非确定性的操作(例如findAny())之外,流是顺序执行还是并行执行不应更改计算结果。
大多数流操作接受描述用户指定行为的参数,这些参数通常是lambda表达式。为了保持正确的行为,这些行为参数必须是非干扰的(大多数情况意味着不能修改数据源),并且在大多数情况下必须是无状态的。这些参数总是函数接口的实例,例如Function,并且通常是lambda表达式或方法引用。
1.3 Non-interference
Streams enable you to execute possibly-parallel aggregate
operations over a variety of data sources, including even non-
thread-safe collections such as ArrayList. This is possible only if
we can prevent interference with the data source during the
execution of a stream pipeline. Except for the escape-hatch
operations iterator() and spliterator(), execution begins when the
terminal operation is invoked, and ends when the terminal
operation completes. For most data sources, preventing
interference means ensuring that the data source is not modified
at all during the execution of the stream pipeline. The notable
exception to this are streams whose sources are concurrent
collections, which are specifically designed to handle concurrent
modification. Concurrent stream sources are those whose
Spliterator reports the CONCURRENT characteristic.
Accordingly, behavioral parameters in stream pipelines whose
source might not be concurrent should never modify the stream's
data source. A behavioral parameter is said to interfere with a
non-concurrent data source if it modifies, or causes to be
modified, the stream's data source. The need for non-
interference applies to all pipelines, not just parallel ones. Unless
the stream source is concurrent, modifying a stream's data
source during execution of a stream pipeline can cause
exceptions, incorrect answers, or nonconformant behavior. For
well-behaved stream sources, the source can be modified before
the terminal operation commences and those modifications will
be reflected in the covered elements. For example, consider the
following code:
Streams使您能够在各种数据源上执行可能并行的聚合操作,甚至包括非线程安全的集合,例如ArrayList。只有在执行流水线期间能够防止干扰数据源时,才有可能实现这一点。除了escape-hatch操作iterator()和spliterator()之外,在调用终止操作时开始执行,并在终止操作完成时结束。对于大多数数据源,防止干扰意味着确保在流水线的执行期间根本不修改数据源。值得注意的例外是其源是并发集合的流,这些集合专门用于处理并发修改。并发流源是Spliterator报告CONCURRENT特性的源。
因此,源流可能不是并发的,这样的流水线中的行为参数永远不应该修改流的数据源。如果行为参数修改或导致修改流的数据源,则该行为参数会干扰非并发数据源。不干涉适用于所有流水线,而不仅仅是并行流水线。除非流源是并发的,否则在执行流水线期间修改流的数据源可能会导致异常、错误答案或不一致的行为。对于性能良好的流源,可以在终止操作开始之前修改源,并且这些修改将反映在所覆盖的元素中。例如,请考虑以下代码:
List<String> l = new ArrayList(Arrays.asList("one", "two"));
Stream<String> sl = l.stream();
l.add("three");
String s = sl.collect(joining(" "));
First a list is created consisting of two strings: "one"; and "two".
Then a stream is created from that list. Next the list is modified
by adding a third string: "three". Finally the elements of the
stream are collected and joined together. Since the list was
modified before the terminal collect operation commenced the
result will be a string of "one two three". All the streams returned
from JDK collections, and most other JDK classes, are well-
behaved in this manner; for streams generated by other libraries,
see Low-level stream construction for requirements for building
well-behaved streams.
首先创建一个包含两个字符串的列表:“one”和“two”。 然后从该列表创建流。 接下来,通过添加第三个字符串来修改列表:“three”。 最后,收集流的元素并将它们连接在一起。 由于在终止收集操作开始之前修改了列表,结果将是一串“one two three”。 从JDK集合和大多数其他JDK类返回的所有流都以这种方式表现良好; 对于其他库生成的流,请参阅低级流构造,以了解构建行为良好的流的要求。
1.4 Stateless behaviors
Stream pipeline results may be nondeterministic or incorrect if
the behavioral parameters to the stream operations are stateful.
A stateful lambda (or other object implementing the appropriate
functional interface) is one whose result depends on any state
which might change during the execution of the stream pipeline.
An example of a stateful lambda is the parameter to map() in:
如果流操作的行为参数是有状态的,则流水线结果可能是不确定的或不正确的。 有状态lambda(或实现适当功能接口的其他对象)的结果取决于在流水线执行期间可能发生变化的任何状态。 有状态lambda的一个例子是map()的参数:
Set<Integer> seen = Collections.synchronizedSet(new HashSet<>());
stream.parallel().map(e -> { if (seen.add(e)) return 0; else return e; })...
Here, if the mapping operation is performed in parallel, the
results for the same input could vary from run to run, due to
thread scheduling differences, whereas, with a stateless lambda
expression the results would always be the same.
Note also that attempting to access mutable state from
behavioral parameters presents you with a bad choice with
respect to safety and performance; if you do not synchronize
access to that state, you have a data race and therefore your
code is broken, but if you do synchronize access to that state,
you risk having contention undermine the parallelism you are
seeking to benefit from. The best approach is to avoid stateful
behavioral parameters to stream operations entirely; there is
usually a way to restructure the stream pipeline to avoid
statefulness.
这里,如果映射操作是并行执行的,则由于线程调度差异,相同输入的结果可能因运行而不同,而对于无状态lambda表达式,结果将始终相同。
另请注意,尝试从行为参数访问可变状态会给您在安全性和性能方面做出错误的选择; 如果您不同步对该状态的访问,则会出现数据争用,因此您的代码已损坏,但如果您同步访问该状态,则存在争用的风险会破坏您希望从中受益的并行性。 最好的方法是完全避免有状态的行为参数作为流操作; 通常有方法可以重构流水线以避免有状态。
1.5 Side-effects
Side-effects in behavioral parameters to stream operations are,
in general, discouraged, as they can often lead to unwitting
violations of the statelessness requirement, as well as other
thread-safety hazards.
If the behavioral parameters do have side-effects, unless
explicitly stated, there are no guarantees as to the visibility of
those side-effects to other threads, nor are there any guarantees
that different operations on the "same" element within the same
stream pipeline are executed in the same thread. Further, the
ordering of those effects may be surprising. Even when a pipeline
is constrained to produce a result that is consistent with the
encounter order of the stream source (for example,
IntStream.range(0,5).parallel().map(x -> x*2).toArray() must
produce [0, 2, 4, 6, 8]), no guarantees are made as to the order
in which the mapper function is applied to individual elements, or
in what thread any behavioral parameter is executed for a given
element.
Many computations where one might be tempted to use side
effects can be more safely and efficiently expressed without side-
effects, such as using reduction instead of mutable accumulators.
However, side-effects such as using println() for debugging
purposes are usually harmless. A small number of stream
operations, such as forEach() and peek(), can operate only via
side-effects; these should be used with care.
As an example of how to transform a stream pipeline that
inappropriately uses side-effects to one that does not, the
following code searches a stream of strings for those matching a
given regular expression, and puts the matches in a list.
通常,不鼓励行为参数对流操作的副作用,因为它们通常会导致无意中违反无状态要求以及其他线程安全危险。
如果行为参数确实有副作用,除非明确说明,否则不能保证这些副作用对其他线程的可见性,也不保证对同一流水线中“相同”元素的不同操作在同一个线程中执行。此外,这些效果的排序可能令人惊讶。即使约束流水线产生的结果必须与流源的操作顺序一致(例如,IntStream.range(0,5).parallel().map(x -> x*2).toArray()必须产生[0,2,4,6,8]),不保证映射器函数应用于单个元素的顺序,或者对给定元素执行任何行为参数的线程。
许多带有副作用的计算可以更安全和有效地表达为不带副作用的方式,例如使用reduction而不是可变累加器。但是,诸如使用println()进行调试的副作用通常是无害的。少量的流操作,例如forEach()和peek(),只能通过副作用运行;这些应该小心使用。
作为如何将不适当地使用副作用的流水线转换为不使用副作用的水线的示例,以下代码在字符串流中搜索与给定正则表达式匹配的字符串,并将匹配放在列表中。
ArrayList<String> results = new ArrayList<>();
stream.filter(s -> pattern.matcher(s).matches())
.forEach(s -> results.add(s)); // Unnecessary use of side-effects!
This code unnecessarily uses side-effects. If executed in parallel,
the non-thread-safety of ArrayList would cause incorrect results,
and adding needed synchronization would cause contention,
undermining the benefit of parallelism. Furthermore, using side-
effects here is completely unnecessary; the forEach() can simply
be replaced with a reduction operation that is safer, more
efficient, and more amenable to parallelization:
此代码不必要地使用副作用。 如果并行执行,ArrayList的非线程安全性将导致不正确的结果,并且添加所需的同步将导致争用,从而破坏并行性的好处。 此外,在这里使用副作用是完全没有必要的; forEach()可以简单地用更安全、更高效、更易于并行化的reduction操作替换:
List<String>results =
stream.filter(s -> pattern.matcher(s).matches())
.collect(Collectors.toList()); // No side-effects!
1.6 Ordering
Streams may or may not have a defined encounter order.
Whether or not a stream has an encounter order depends on the
source and the intermediate operations. Certain stream sources
(such as List or arrays) are intrinsically ordered, whereas others
(such as HashSet) are not. Some intermediate operations, such
as sorted(), may impose an encounter order on an otherwise
unordered stream, and others may render an ordered stream
unordered, such as BaseStream.unordered(). Further, some
terminal operations may ignore encounter order, such as
forEach().
If a stream is ordered, most operations are constrained to
operate on the elements in their encounter order; if the source of
a stream is a List containing [1, 2, 3], then the result of executing
map(x -> x*2) must be [2, 4, 6]. However, if the source has no
defined encounter order, then any permutation of the values [2,
4, 6] would be a valid result.
For sequential streams, the presence or absence of an
encounter order does not affect performance, only determinism.
If a stream is ordered, repeated execution of identical stream
pipelines on an identical source will produce an identical result; if
it is not ordered, repeated execution might produce different
results.
For parallel streams, relaxing the ordering constraint can
sometimes enable more efficient execution. Certain aggregate
operations, such as filtering duplicates (distinct()) or grouped
reductions (Collectors.groupingBy()) can be implemented more
efficiently if ordering of elements is not relevant. Similarly,
operations that are intrinsically tied to encounter order, such as
limit(), may require buffering to ensure proper ordering,
undermining the benefit of parallelism. In cases where the stream
has an encounter order, but the user does not particularly care
about that encounter order, explicitly de-ordering the stream with
unordered() may improve parallel performance for some stateful
or terminal operations. However, most stream pipelines, such as
the "sum of weight of blocks" example above, still parallelize
efficiently even under ordering constraints.
流可能有也可能没有已定义的操作顺序。流是否具有操作顺序取决于源和中间操作。某些流源(例如List或数组)本质上是有序的,而其他流(例如HashSet)则不是。某些中间操作(例如sorted())可能会在其他无序流上强制执行顺序操作,而其他中间操作可能会将有序流打乱,例如BaseStream.unordered()。此外,一些终止操作可以忽略顺序,例如forEach()。
如果流有序,则大多数操作都被约束为对其有序元素进行操作;如果流的源是包含[1,2,3]的List,那么执行map(x - > x * 2)的结果必须是[2,4,6]。但是,如果源没有定义的顺序,那么值[2,4,6]的任何排列都将是有效的结果。
对于串行流,顺序的存在与否不会影响性能,只影响确定性。如果流有序,则在相同的源上重复执行相同的流管道将产生相同的结果;如果无序,重复执行可能会产生不同的结果。
对于并行流,放宽排序约束有时可以实现更高效的执行。如果元素的顺序无关,则可以更有效地实现某些聚合操作,例如过滤重复(distinct())或分组规约(Collectors.groupingBy())。类似地,与有序本质上相关的操作(例如limit())可能需要缓冲以确保正确排序,从而破坏并行性的好处。在流具有顺序但用户不特别关心该顺序的情况下,使用unordered()明确地对流进行乱序可以改善某些有状态或终止操作的并行性能。然而,大多数流水线,例如上面的“sum of weight of blocks”示例,即使在排序约束下仍然可以有效地并行化。
1.7 Reduction operations
A reduction operation (also called a fold) takes a sequence of
input elements and combines them into a single summary result
by repeated application of a combining operation, such as finding
the sum or maximum of a set of numbers, or accumulating
elements into a list. The streams classes have multiple forms of
general reduction operations, called reduce() and collect(), as
well as multiple specialized reduction forms such as sum(),
max(), or count().
Of course, such operations can be readily implemented as
simple sequential loops, as in:
归约操作(也称为折叠)对一系列输入元素重复应用组合操作将它们组合成单个汇总结果,例如查找一组数字的总和或最大值,或将元素累积到列表中。 流类具有多种形式的通用归约操作,称为reduce()和collect(),以及多种专用归约形式,如sum(),max()或count()。
当然,这样的操作可以很容易地实现为简单的顺序循环,如:
int sum = 0;
for (int x : numbers) {
sum += x;
}
However, there are good reasons to prefer a reduce operation
over a mutative accumulation such as the above. Not only is a
reduction "more abstract" -- it operates on the stream as a whole
rather than individual elements -- but a properly constructed
reduce operation is inherently parallelizable, so long as the
function(s) used to process the elements are associative and
stateless. For example, given a stream of numbers for which we
want to find the sum, we can write:
然而,有充分理由优先考虑归约操作而不是如上所述的可变累积。归约不仅“更抽象” - 它作为一个整体而不是单个元素在整个流上运行 - 而且正确构造的reduce操作本质上是可并行化的,只要用于处理元素的函数是可结合的和无状态的。 例如,给定想要找到总和的数字流,可以写作:
int sum = numbers.stream().reduce(0, (x,y) -> x+y);
// 或者
int sum = numbers.stream().reduce(0, Integer::sum);
这些归约操作可以安全地并行运行,几乎不需要修改:
int sum = numbers.parallelStream().reduce(0, Integer::sum);
Reduction parallellizes well because the implementation can
operate on subsets of the data in parallel, and then combine the
intermediate results to get the final correct answer. (Even if the
language had a "parallel for-each" construct, the mutative
accumulation approach would still required the developer to
provide thread-safe updates to the shared accumulating variable
sum, and the required synchronization would then likely eliminate
any performance gain from parallelism.) Using reduce() instead
removes all of the burden of parallelizing the reduction operation,
and the library can provide an efficient parallel implementation
with no additional synchronization required.
The "widgets" examples shown earlier shows how reduction
combines with other operations to replace for loops with bulk
operations. If widgets is a collection of Widget objects, which
have a getWeight method, we can find the heaviest widget with:
归约可以很好地并行,因为其实现可以并行地对数据的子集进行操作,然后组合中间结果以获得最终的正确答案。 (即使该语言具有“parallel for-each”结构,可变累积方法仍然需要开发人员为共享累积变量sum提供线程安全更新,然后所需的同步可能会消除并行性带来的任何性能益处。)使用reduce(),删除了并行化归约操作的所有负担,并且库可以提供有效的并行实现,而无需额外的同步。
前面显示的“widgets”示例显示了归约如何与其他操作相结合 以使用批量操作替换循环。 如果widgets是具有getWeight方法的Widget对象的集合,可以找到最重的窗口小部件:
OptionalInt heaviest = widgets.parallelStream()
.mapToInt(Widget::getWeight)
.max();
在更一般的形式中,对<T>类型的元素执行reduce操作,产生类型<U>的结果需要三个参数:
<U> U reduce(U identity,
BiFunction<U, ? super T, U> accumulator,
BinaryOperator<U> combiner);
Here, the identity element is both an initial seed value for the
reduction and a default result if there are no input elements. The
accumulator function takes a partial result and the next element,
and produces a new partial result. The combiner function
combines two partial results to produce a new partial result. (The
combiner is necessary in parallel reductions, where the input is
partitioned, a partial accumulation computed for each partition,
and then the partial results are combined to produce a final
result.)
More formally, the identity value must be an identity for the
combiner function. This means that for all u,
combiner.apply(identity, u) is equal to u. Additionally, the
combiner function must be associative and must be compatible
with the accumulator function: for all u and t, combiner.apply(u,
accumulator.apply(identity, t)) must be equals() to
accumulator.apply(u, t).
The three-argument form is a generalization of the two-argument
form, incorporating a mapping step into the accumulation step.
We could re-cast the simple sum-of-weights example using the
more general form as follows:
这里,identity元素既是归约的初始种子值,也是没有输入元素的默认结果。accumulator函数获取部分结果和下一个元素,并产生新的部分结果。combiner函数组合了两个部分结果以产生新的部分结果。 (combiner在并行归约中是必需的,其中输入被分区,为每个分区计算部分accumulation,然后组合部分结果以产生最终结果。)
更正式地说,identity必须是combiner函数的identity。这意味着对于所有u,combiner.apply(identity,u)等于u。另外,组合器函数必须是相结合的,并且必须与accumulator函数兼容:对于所有u和t,combiner.apply(u,accumulator.apply(identity,t))必须equals()accumulator.apply(u,t)。
三参数形式是两参数形式的泛化,将映射步骤结合到累积步骤中。我们可以使用更一般的形式重新构建简单的权重总和示例,如下所示:
int sumOfWeights = widgets.stream()
.reduce(0,
(sum, b) -> sum + b.getWeight()),
Integer::sum);
though the explicit map-reduce form is more readable and
therefore should usually be preferred. The generalized form is
provided for cases where significant work can be optimized away
by combining mapping and reducing into a single function.
显式的map-reduce形式更具可读性,因此通常应该是首选。 可以为重要工作的情况提供优化通用形式:将映射和归约组合成单个函数。
1.8 Mutable reduction
A mutable reduction operation accumulates input elements into a
mutable result container, such as a Collection or StringBuilder, as
it processes the elements in the stream.
If we wanted to take a stream of strings and concatenate them
into a single long string, we could achieve this with ordinary
reduction:
可变归约操作处理流中的元素将输入元素累积到可变结果容器(例如Collection或StringBuilder)中。
如果我们想要获取字符串流并将它们连接成一个长字符串,我们可以通过普通归约来实现:
String concatenated = strings.reduce("", String::concat)
We would get the desired result, and it would even work in
parallel. However, we might not be happy about the performance!
Such an implementation would do a great deal of string copying,
and the run time would be O(n^2) in the number of characters. A
more performant approach would be to accumulate the results
into a StringBuilder, which is a mutable container for
accumulating strings. We can use the same technique to
parallelize mutable reduction as we do with ordinary reduction.
The mutable reduction operation is called collect(), as it collects
together the desired results into a result container such as a
Collection. A collect operation requires three functions: a supplier
function to construct new instances of the result container, an
accumulator function to incorporate an input element into a result
container, and a combining function to merge the contents of one
result container into another. The form of this is very similar to
the general form of ordinary reduction:
会得到理想的结果,甚至可以并行工作。 但是,我们可能对性能不满意! 这样的实现将进行大量的字符串复制,并且运行时间将是字符数的O(n ^ 2)。 更高效的方法是将结果累积到StringBuilder中,StringBuilder是用于累积字符串的可变容器。 我们可以使用相同的技术来并行化可变归约,就像我们使用普通归约一样。
可变归约操作称为collect(),因为它将所需结果收集到一个结果容器(如Collection)中。 收集操作需要三个功能:构造结果容器新实例的supplier函数,将输入元素合并到结果容器中的accumulator函数,以及将一个结果容器的内容合并到另一个中的combining函数。 这种形式与普通归约的一般形式非常相似:
<R> R collect(Supplier<R> supplier,
BiConsumer<R, ? super T> accumulator,
BiConsumer<R, R> combiner);
As with reduce(), a benefit of expressing collect in this abstract
way is that it is directly amenable to parallelization: we can
accumulate partial results in parallel and then combine them, so
long as the accumulation and combining functions satisfy the
appropriate requirements. For example, to collect the String
representations of the elements in a stream into an ArrayList, we
could write the obvious sequential for-each form:
与reduce()一样,以这种抽象方式表达collect的好处是它可以直接适用于并行化:我们可以并行累积部分结果然后将它们组合起来,只要累积和组合函数满足适当的要求即可。 例如,要将流中元素的String表示形式收集到ArrayList中,我们可以使用串行的for-each形式:
ArrayList<String> strings = new ArrayList<>();
for (T element : stream) {
strings.add(element.toString());
}
或者可以使用一个可并行的collect形式:
ArrayList<String> strings = stream.collect(() -> new ArrayList<>(),
(c, e) -> c.add(e.toString()),
(c1, c2) -> c1.addAll(c2));
或者,通过将映射操作从累加器函数中拉出来,可以更简洁地表达它:
List<String> strings = stream.map(Object::toString)
.collect(ArrayList::new, ArrayList::add, ArrayList::addAll);
Here, our supplier is just the ArrayList constructor, the
accumulator adds the stringified element to an ArrayList, and the
combiner simply uses addAll to copy the strings from one
container into the other.
The three aspects of collect -- supplier, accumulator, and
combiner -- are tightly coupled. We can use the abstraction of a
Collector to capture all three aspects. The above example for
collecting strings into a List can be rewritten using a standard
Collector as:
这里,supplier是ArrayList构造函数,accumulator将stringified元素添加到ArrayList,combiner使用addAll将字符串从一个容器复制到另一个容器。
collect的三个部件 - supplier、accumulator和 combiner - 是紧密耦合的。 可以使用Collector的抽象来捕获所有这三个部件。 上面用于将字符串收集到List中的示例可以使用标准收集器重写为:
List<String> strings = stream.map(Object::toString)
.collect(Collectors.toList());
Packaging mutable reductions into a Collector has another
advantage: composability. The class Collectors contains a
number of predefined factories for collectors, including
combinators that transform one collector into another. For
example, suppose we have a collector that computes the sum of
the salaries of a stream of employees, as follows:
将可变归约包装到Collector中有另一个优点:可组合性。 Collectors类包含许多用于收集器的预定义工厂,包括将一个收集器转换为另一个收集器的组合器。 例如,假设我们有一个收集器来计算员工流的工资总和,如下所示:
Collector<Employee, ?, Integer> summingSalaries
= Collectors.summingInt(Employee::getSalary);
(The ? for the second type parameter merely indicates that we
don't care about the intermediate representation used by this
collector.) If we wanted to create a collector to tabulate the sum
of salaries by department, we could reuse summingSalaries
using groupingBy:
(第二个类型参数?仅表示我们不关心此收集器使用的中间表示。)如果我们想创建一个收集器来按部门列出工资总和,我们可以使用groupingBy重用summingSalaries:
Map<Department, Integer> salariesByDept
= employees.stream().collect(Collectors.groupingBy(Employee::getDepartment,
summingSalaries));
As with the regular reduction operation, collect() operations can
only be parallelized if appropriate conditions are met. For any
partially accumulated result, combining it with an empty result
container must produce an equivalent result. That is, for a
partially accumulated result p that is the result of any series of
accumulator and combiner invocations, p must be equivalent to
combiner.apply(p, supplier.get()).
Further, however the computation is split, it must produce an
equivalent result. For any input elements t1 and t2, the results r1
and r2 in the computation below must be equivalent:
与常规归约操作一样,只有满足适当的条件才能并行化collect()操作。 对于任何部分累积的结果,将其与空结果容器组合必须产生相等的结果。 也就是说,对于部分累积的结果p,它是任何一系列累加器和组合器调用的结果,p必须等于combiner.apply(p, supplier.get())。
此外,分裂的计算必须产生相等的结果。 对于任何输入元素t1和t2,下面计算中的结果r1和r2必须是等价的:
A a1 = supplier.get();
accumulator.accept(a1, t1);
accumulator.accept(a1, t2);
R r1 = finisher.apply(a1); // result without splitting
A a2 = supplier.get();
accumulator.accept(a2, t1);
A a3 = supplier.get();
accumulator.accept(a3, t2);
R r2 = finisher.apply(combiner.apply(a2, a3)); // result with splitting
Here, equivalence generally means according to
Object.equals(Object). but in some cases equivalence may be
relaxed to account for differences in order.
这里,相等通常是根据Object.equals(Object)判断的。 但在某些情况下,对于顺序不同可以放宽等同性。
1.9 Reduction, concurrency, and ordering
With some complex reduction operations, for example a collect()
that produces a Map, such as:
通过一些复杂的归约操作,例如生成Map的collect(),例如:
Map<Buyer, List<Transaction>> salesByBuyer
= txns.parallelStream()
.collect(Collectors.groupingBy(Transaction::getBuyer));
it may actually be counterproductive to perform the operation in
parallel. This is because the combining step (merging one Map
into another by key) can be expensive for some Map
implementations.
Suppose, however, that the result container used in this
reduction was a concurrently modifiable collection -- such as a
ConcurrentHashMap. In that case, the parallel invocations of the
accumulator could actually deposit their results concurrently into
the same shared result container, eliminating the need for the
combiner to merge distinct result containers. This potentially
provides a boost to the parallel execution performance. We call
this a concurrent reduction.
A Collector that supports concurrent reduction is marked with the
Collector.Characteristics.CONCURRENT characteristic.
However, a concurrent collection also has a downside. If multiple
threads are depositing results concurrently into a shared
container, the order in which results are deposited is non-
deterministic. Consequently, a concurrent reduction is only
possible if ordering is not important for the stream being
processed. The Stream.collect(Collector) implementation will
only perform a concurrent reduction if
* The stream is parallel;
* The collector has the Collector.Characteristics.CONCURRENT
characteristic, and;
* Either the stream is unordered, or the collector has the
Collector.Characteristics.UNORDERED characteristic.
实际上并行执行操作可能会适得其反。这是因为组合步骤(通过键将一个Map合并到另一个Map)对于某些Map实现来说可能是昂贵的。
但是,假设此归约中使用的结果容器是可并发修改的集合 - 例如ConcurrentHashMap。在这种情况下,累加器的并行调用实际上可以将它们的结果同时存入同一个共享结果容器中,从而消除了组合器合并不同结果容器的需要。这可能会提升并行执行性能。我们称之为并发归约。
支持并发归约的Collector有Collector.Characteristics.CONCURRENT特性。然而,并发收集也有缺点。如果多个线程同时将结果存入共享容器,则存储结果的顺序是不确定的。因此,只有在顺序对正在处理的流不重要的情况下,才能实现并发归约。 Stream.collect(Collector)实现只有在如下情况下才会执行并发归约
- 流是并行的;
- 收集器具有Collector.Characteristics.CONCURRENT特性;
- 流是无序的,或者收集器具有Collector.Characteristics.UNORDERED特性。
可以使用BaseStream.unordered()方法确保流是无序的。 例如:
Map<Buyer, List<Transaction>> salesByBuyer
= txns.parallelStream()
.unordered()
.collect(groupingByConcurrent(Transaction::getBuyer));
(where Collectors.groupingByConcurrent(
java.util.function.Function<? super T, ? extends K>) is the
concurrent equivalent of groupingBy).
Note that if it is important that the elements for a given key
appear in the order they appear in the source, then we cannot
use a concurrent reduction, as ordering is one of the casualties
of concurrent insertion. We would then be constrained to
implement either a sequential reduction or a merge-based
parallel reduction.
(其中Collectors.groupingByConcurrent( java.util.function.Function<? super T, ? extends K>)是groupingBy的并发等价物。
请注意,如果给定键的元素按其在源中的顺序出现很重要,那么我们就不能使用并发归约,因为排序是并发插入的牺牲品之一。 然后,我们将被限制为实现串行归约或基于归并的并行归约。
1.10 Associativity结合性
如果满足如下要求,则操作或函数op是可结合的:
(a op b) op c == a op (b op c)
如果将其扩展为四个terms,可以看出这对并行计算的重要性:
a op b op c op d == (a op b) op (c op d)
因此可以并行地计算(a op b)、(c op d),然后在计算结果上在调用op。
可结合操作包括:数值加法、min、max以及字符串连接。
1.11 Low-level stream construction
So far, all the stream examples have used methods like
Collection.stream() or Arrays.stream(Object[]) to obtain a stream.
How are those stream-bearing methods implemented?
The class StreamSupport has a number of low-level methods for
creating a stream, all using some form of a Spliterator. A
spliterator is the parallel analogue of an Iterator; it describes a
(possibly infinite) collection of elements, with support for
sequentially advancing, bulk traversal, and splitting off some
portion of the input into another spliterator which can be
processed in parallel. At the lowest level, all streams are driven
by a spliterator.
There are a number of implementation choices in implementing a
spliterator, nearly all of which are tradeoffs between simplicity of
implementation and runtime performance of streams using that
spliterator. The simplest, but least performant, way to create a
spliterator is to create one from an iterator using
Spliterators.spliteratorUnknownSize(java.util.Iterator, int). While
such a spliterator will work, it will likely offer poor parallel
performance, since we have lost sizing information (how big is
the underlying data set), as well as being constrained to a
simplistic splitting algorithm.
A higher-quality spliterator will provide balanced and known-size
splits, accurate sizing information, and a number of other
characteristics of the spliterator or data that can be used by
implementations to optimize execution.
Spliterators for mutable data sources have an additional
challenge; timing of binding to the data, since the data could
change between the time the spliterator is created and the time
the stream pipeline is executed. Ideally, a spliterator for a stream
would report a characteristic of IMMUTABLE or CONCURRENT;
if not it should be late-binding. If a source cannot directly supply
a recommended spliterator, it may indirectly supply a spliterator
using a Supplier, and construct a stream via the Supplier-
accepting versions of stream(). The spliterator is obtained from
the supplier only after the terminal operation of the stream
pipeline commences.
These requirements significantly reduce the scope of potential
interference between mutations of the stream source and
execution of stream pipelines. Streams based on spliterators with
the desired characteristics, or those using the Supplier-based
factory forms, are immune to modifications of the data source
prior to commencement of the terminal operation (provided the
behavioral parameters to the stream operations meet the
required criteria for non-interference and statelessness). See
Non-Interference for more details.
到目前为止,所有流示例都使用了Collection.stream()或Arrays.stream(Object [])等方法来获取流。那些生成流的方法是如何实现的?
StreamSupport类有许多用于创建流的低级方法,所有这些方法都使用某种形式的Spliterator。spliterator是迭代器的并行模拟器;它描述了一个(可能是无限的)元素集合,支持串行前进、批量遍历、以及将输入的某些部分分成另一个可以并行处理的spliterator(并行处理)。在最低级别,所有流都由spliterator驱动。
在实现spliterator时有许多选择,几乎所有这些都是在实现的简单性和流的运行时性能之间进行权衡。创建spliterator的最简单但性能最差的方法是使用Spliterators.spliteratorUnknownSize(java.util.Iterator, int)从迭代器创建一个。虽然这样的spliterator可以工作,但它可能会提供较差的并行性能,因为我们丢失了大小信息(基础数据集有多大),以及被限制为简单的分裂算法。
更高质量的Spliterator将提供平衡且已知大小的分割、准确的大小信息以及分离器或数据的许多其他特征,这些特征可在实现时用于优化执行。
可变数据源的Spliterators还有一个挑战:绑定到数据的时间,因为数据可能在创建Spliterator时间和流水线执行时间之间发生变化。理想情况下,流的Spliterator会报告IMMUTABLE或CONCURRENT的特征;如果不是它应该延迟绑定。如果源不能直接提供推荐的Spliterator,它可以使用Supplier间接提供的Spliterator,并通过接受Supplier的stream()版本构建流。仅在流水线的终止操作开始之后才从supplier获得Spliterator。
这些要求显着减少了流源改变和流水线执行之间潜在干扰的范围。基于具有所需特征的Spliterator的流或使用基于Supplier的工厂形式的流不受在终止操作开始之前对数据源的修改的影响(假设流操作的行为参数满足非操作的要求标准:无干扰和无状态)。有关详细信息,请参阅无干扰。
2.Stream接口
A sequence of elements supporting sequential and parallel
aggregate operations. The following example illustrates an
aggregate operation using Stream and IntStream:
支持串行和并行聚合操作的一系列元素。 以下示例说明了使用Stream和IntStream的聚合操作:
int sum = widgets.stream()
.filter(w -> w.getColor() == RED)
.mapToInt(w -> w.getWeight())
.sum();
In addition to Stream, which is a stream of object references,
there are primitive specializations for IntStream, LongStream,
and DoubleStream, all of which are referred to as "streams" and
conform to the characteristics and restrictions described here.
To perform a computation, stream operations are composed into
a stream pipeline. A stream pipeline consists of a source (which
might be an array, a collection, a generator function, an I/O
channel, etc), zero or more intermediate operations (which
transform a stream into another stream, such as
filter(Predicate)), and a terminal operation (which produces a
result or side-effect, such as count() or forEach(Consumer)).
Streams are lazy; computation on the source data is only
performed when the terminal operation is initiated, and source
elements are consumed only as needed.
Collections and streams, while bearing some superficial
similarities, have different goals. Collections are primarily
concerned with the efficient management of, and access to, their
elements. By contrast, streams do not provide a means to
directly access or manipulate their elements, and are instead
concerned with declaratively describing their source and the
computational operations which will be performed in aggregate
on that source. However, if the provided stream operations do
not offer the desired functionality, the BaseStream.iterator() and
BaseStream.spliterator() operations can be used to perform a
controlled traversal.
A stream pipeline, like the "widgets" example above, can be
viewed as a query on the stream source. Unless the source was
explicitly designed for concurrent modification (such as a
ConcurrentHashMap), unpredictable or erroneous behavior may
result from modifying the stream source while it is being queried.
Most stream operations accept parameters that describe user-
specified behavior, such as the lambda expression w ->
w.getWeight() passed to mapToInt in the example above. To
preserve correct behavior, these behavioral parameters:
* must be non-interfering (they do not modify the stream source);
* in most cases must be stateless (their result should not depend
on any state that might change during execution of the stream
pipeline).
除了Stream之外,它是一个对象引用流,还有IntStream、LongStream和DoubleStream的基本类型流,所有这些都被称为“流”,并符合此处描述的特征和限制。
为了执行计算,流操作被组合成流水线。流水线由源(可能是数组、集合、生成器函数、I / O通道等),零个或多个中间操作(将流转换为另一个流,如filter(Predicate))组成)和终止操作(产生结果或副作用,例如count()或forEach(Consumer))。流是懒惰执行,仅在启动终止操作时执行对源数据的计算,并且仅在需要时消耗源元素。
集合和流虽然有一些表面的相似之处,但却用于不同的目标。集合主要关注其元素的有效管理和访问。相反,流不提供直接访问或操纵其元素的手段,而是涉及声明性地描述它们的源以及将在该源上聚合执行的计算操作。但是,如果提供的流操作不提供所需的函数操作,则可以使用BaseStream.iterator()和BaseStream.spliterator()操作来执行受控遍历。
流管水线(如上面的“widgets”示例)可以视为流源上的查询。除非源是为并发修改而明确设计的(例如ConcurrentHashMap),否则在查询流源时修改流源可能会导致不可预测或错误的行为。
大多数流操作接受描述用户指定行为的参数,例如上面示例中传递给mapToInt的lambda表达式w - > w.getWeight()。为了保持正确的行为,这些行为参数:
- 必须是非干扰的(它们不会修改流源);
- 在大多数情况下,必须是无状态的(它们的结果不应该依赖于在执行流水线期间可能发生变化的任何状态)。
Such parameters are always instances of a functional interface
such as Function, and are often lambda expressions or method
references. Unless otherwise specified these parameters must
be non-null.
A stream should be operated on (invoking an intermediate or
terminal stream operation) only once. This rules out, for example,
"forked" streams, where the same source feeds two or more
pipelines, or multiple traversals of the same stream. A stream
implementation may throw IllegalStateException if it detects that
the stream is being reused. However, since some stream
operations may return their receiver rather than a new stream
object, it may not be possible to detect reuse in all cases.
Streams have a BaseStream.close() method and implement
AutoCloseable, but nearly all stream instances do not actually
need to be closed after use. Generally, only streams whose
source is an IO channel (such as those returned by
Files.lines(Path, Charset)) will require closing. Most streams are
backed by collections, arrays, or generating functions, which
require no special resource management. (If a stream does
require closing, it can be declared as a resource in a try-with-
resources statement.)
Stream pipelines may execute either sequentially or in parallel.
This execution mode is a property of the stream. Streams are
created with an initial choice of sequential or parallel execution.
(For example, Collection.stream() creates a sequential stream,
and Collection.parallelStream() creates a parallel one.) This
choice of execution mode may be modified by the
BaseStream.sequential() or BaseStream.parallel() methods, and
may be queried with the BaseStream.isParallel() method.
这些参数总是函数接口的实例,例如Function,并且通常是lambda表达式或方法引用。除非另有说明,否则这些参数必须为非null。
只能对流进行一次操作(调用中间或终止流操作)。例如,这排除了“forked”流,其中相同的源提供两个或更多个流水线,或者同一个流的多个遍历。如果流实现检测到正在重用流,则它可能会抛出IllegalStateException。但是,由于某些流操作可能返回其receiver而不是新的流对象,因此可能无法在所有情况下检测重用。
Streams有一个BaseStream.close()方法并实现AutoCloseable,但几乎所有的流实例实际上都不需要在使用后关闭。通常,只有源为IO通道的流(例如Files.lines(Path,Charset)返回的流)才需要关闭。大多数流都由集合、数组或生成函数支持,不需要特殊的资源管理。 (如果流确实需要关闭,则可以在try-with-resources语句中将其声明为资源。)
流水线可以串行执行或并行执行。此执行模式是流的属性。通过初始选择串行或并行执行来创建流。 (例如,Collection.stream()创建一个串行流,Collection.parallelStream()创建一个并行流。)这种执行模式的选择可以通过BaseStream.sequential()或BaseStream.parallel()方法修改,并且可以使用BaseStream.isParallel()方法查询。
网友评论