Java SE 8中的主要新语言功能是lambda表达式。您可以将lambda表达式视为匿名方法;类似方法,lambdas键入了参数,一个body和一个返回类型。但是真正的消息并不是lambda表达自己,而是它们能够实现的。Lambdas可以轻松地将行为表达为数据,从而可以开发更具表现力和更强大的库。
在Java SE 8中也引入了一个这样的库,它是一个java.util.stream包(Streams),它能够在各种数据源上简化和声明性地表达可能的并行批量操作。像Streams这样的库可能已经在早期版本的Java中编写过,但是没有一个紧凑的行为数据成语,他们使用起来真的很麻烦,没有人会想使用它们。您可以将Streams视为第一个利用Java中lambda表达式的功能的库,但它并没有什么神奇的东西(尽管它被紧密集成到核心的JDK库中)。流不是语言的一部分 - 它'
关于本系列
通过该java.util.stream软件包,您可以简洁和声明性地对集合,数组和其他数据源表示可能的并行批量操作。在Java语言建筑师Brian Goetz的这个系列中,全面了解Streams库,并学习如何使用它来获得最佳的优势。
本文是系列中第一个java.util.stream深入探索图书馆的文章。本部分将向您介绍图书馆,并概述其优势和设计原则。在后续的部分中,您将学习如何使用流来汇总和汇总数据,并查看库的内部和性能优化。
使用流查询
流的最常见用途之一是表示对集合中的数据的查询。清单1显示了一个简单流管道的示例。管道收集了买方和卖方之间购买建模的交易,并计算了居住在纽约的卖家的交易总金额。
清单1.一个简单的流管道
int totalSalesFromNY
= txns.stream()
.filter(t -> t.getSeller().getAddr().getState().equals("NY"))
.mapToInt(t -> t.getAmount())
.sum();
“Streams利用最强大的计算原理:组合。”
该filter()业务仅从纽约的卖家选择交易。的mapToInt()操作选择所希望的交易的交易金额。终端sum()操作将这些金额加起来。
Although this example is pretty and easy to read, detractors might point out that the imperative (for-loop) version of this query is also simple and takes fewer lines of code to express. But the problem doesn't have to get much more complicated for the benefits of the stream approach to become evident. Streams exploit that most powerful of computing principles: composition. By composing complex operations out of simple building blocks (filtering, mapping, sorting, aggregation), streams queries are more likely to remain straightforward to write and read as the problem gets complicated than are more ad-hoc computations on the same data sources.
作为与清单1相同域名的更为复杂的查询,请考虑“按照名称排序,打印65岁以上买家交易中的卖家名称”。写这个查询的老式(命令式)方式可能产生类似清单2的东西。
清单2.一个集合的ad-hoc查询
Set sellers = new HashSet<>();
for (Txn t : txns) {
if (t.getBuyer().getAge() >= 65)
sellers.add(t.getSeller());
}
List sorted = new ArrayList<>(sellers);
Collections.sort(sorted, new Comparator() {
public int compare(Seller a, Seller b) {
return a.getName().compareTo(b.getName());
}
});
for (Seller s : sorted)
System.out.println(s.getName());
虽然这个查询只是比第一个更复杂一些,但很显然,在命令式方法下的结果代码的组织和可读性已经开始崩溃了。读者首先看到的不是计算的起点和终点,这是一个中间结果的宣告。要阅读此代码,您需要在精确地缓冲大量上下文之前,才能确定代码实际执行的操作。清单3显示了如何使用Stream重写此查询。
清单3.使用Streams表示的清单2中的查询
txns.stream()
.filter(t -> t.getBuyer().getAge() >= 65)
.map(Txn::getSeller)
.distinct()
.sorted(comparing(Seller::getName))
.map(Seller::getName)
.forEach(System.out::println);
清单3中的代码更容易阅读,因为用户既不会因为“垃圾”变量而被分散注意力,sellers而且sorted在读取代码时不必跟踪大量的上下文;代码读取几乎完全像问题语句。更易于读取的代码也比较容易出错,因为维护者更有可能首先正确识别代码的作用。
像Streams这样的图书馆采用的设计方法导致了实际的分离问题。客户负责指定“什么”的计算,但图书馆可以控制“如何”。这种分离往往与专门知识的分配平行;客户端作者通常对问题领域有更好的了解,而图书馆作家通常在执行的算法属性方面具有更多的专业知识。编写库的关键推动因素是允许这种分离问题的能力,就像传递数据一样容易地传递行为的能力,这反过来又使得API能够描述复杂计算的结构,
流管道解剖
所有流计算共享一个共同的结构:它们具有流源,零个或多个中间操作和单个终端操作。流的元素可以是对象references(Stream),也可以是原始整数(IntStream),longs(LongStream)或doubles(DoubleStream)。
由于Java程序消耗的大多数数据已经存储在集合中,许多流计算使用集合作为其源。CollectionJDK中的实现已经被增强为充当有效的流源。但是还存在其他可能的流源,例如阵列,生成器函数或内置工厂,如数字范围,可以编写自定义流适配器,以便任何数据源可以充当流源。表1显示了JDK中的一些流生成方法。
表1. JDK中的流源
方法描述
Collection.stream()从集合的元素创建流。
Stream.of(T...)从传递给工厂方法的参数创建一个流。
Stream.of(T[])从数组的元素创建一个流。
Stream.empty()创建一个空流。
Stream.iterate(T first, BinaryOperator f)创建一个由序列组成的无限流first, f(first), f(f(first)), ...
Stream.iterate(T first, Predicate test, BinaryOperator f)(仅限Java 9)类似Stream.iterate(T first, BinaryOperator f),除了流在测试谓词返回的第一个元素上终止false。
Stream.generate(Supplier f)从生成函数创建无限流。
IntStream.range(lower, upper)创建一个IntStream组成的元素从下到上,排他。
IntStream.rangeClosed(lower, upper)创建一个IntStream由下到上的元素,包括元素。
BufferedReader.lines()创建一个由一行组成的流BufferedReader.
BitSet.stream()创建一个IntStream由a中的设置位的索引组成的BitSet。
CharSequence.chars()IntStream在a中创建一个对应的charsString。
中间操作 - 例如filter()(选择符合标准的元素)map()(根据功能转换元素),distinct()(删除重复),limit()(截断特定大小的流),以及sorted()- 将流转换为另一个流。一些操作,例如mapToInt(),采用一种类型的流并返回不同类型的流;清单1的示例以一个Stream和更晚的切换开始IntStream。表2显示了一些中间流操作。
表2.中间流操作
手术内容
filter(Predicate)流的元素匹配谓词
map(Function)将所提供的功能应用于流的元素的结果
flatMap(Function>通过将提供的流承载函数应用于流的元素而产生的流的元素
distinct()流的元素,重复的元素被删除
sorted()流的元素,按照自然顺序排列
Sorted(Comparator)流的元素,由提供的比较器排序
limit(long)流的元素截短到提供的长度
skip(long)流的元素,丢弃前N个元素
takeWhile(Predicate)(仅限Java 9)流的元素在提供的谓词不是的第一个元素上截断true
dropWhile(Predicate)(仅限Java 9)流的元素,丢弃所提供谓词所在元素的初始段true
中间操作总是懒惰:调用中间操作只是在流管道中设置下一个阶段,但不启动任何工作。中级操作进一步分为无状态和有状态操作。无状态操作(例如filter()或map())可以独立地对每个元素进行操作,而状态操作(例如sorted()或distinct())可以包含影响其他元素的处理的先前看到的元素的状态。
当执行终端操作(例如,缩减(sum()或max()),应用程序(forEach())或search(findFirst()))时,数据集的处理开始。终端操作产生结果或副作用。执行终端操作时,流管道终止,如果要再次遍历相同的数据集,可以设置新的流管道。表3显示了一些终端流操作。
表3.终端流操作
手术描述
forEach(Consumer action)将提供的操作应用于流的每个元素。
toArray()从流的元素创建一个数组。
reduce(...)将流的元素聚合为摘要值。
collect(...)将流的元素聚合到汇总结果容器中。
min(Comparator)根据比较器返回流的最小元素。
max(Comparator)根据比较器返回流的最大元素。
count()返回流的大小。
{any,all,none}Match(Predicate)返回流的任何/ all / none是否与提供的谓词匹配。
findFirst()返回流的第一个元素(如果存在)。
findAny()返回流的任何元素(如果存在)。
流与收藏
虽然流可以在表面上类似于集合 - 您可能会将这两者视为包含数据 - 实际上它们显着不同。集合是数据结构;其主要关注点是记忆中的数据的组织,并且一段时间内仍然存在一个集合。通常可以将集合用作流管道的源或目标,但流的重点是计算,而不是数据。数据来自其他地方(集合,数组,生成函数或I / O通道),并通过一系列计算步骤进行处理,以产生结果或副作用,此时流完成。流不为其处理的元素提供存储,并且流的生命周期更像是一个时间点 - 调用终端操作。不同于集合,流也可以是无限的;limit()相应地,一些操作( ,findFirst())是短路的,并且可以在有限计算的无限流上操作。
集合和流也在执行其操作的方式上有所不同。收藏活动是渴望和变异的;当remove()在aList上调用该方法时,在调用返回后,您将知道列表状态已被修改以反映删除指定的元素。对于流,只有终端操作是渴望的;其他人都很懒。流操作表示对其输入(也是流)的功能转换,而不是数据集上的突变操作(过滤流生成其流是元素是输入流子集的新流,但不会从中删除任何元素资源)。
将流管道表示为一系列功能转换,可实现几个有用的执行策略,如懒惰,短路和操作融合。短路使得管道能够成功终止,而不检查所有数据;诸如“找到第一笔交易超过$ 1,000”之类的查询不需要在匹配发现后再检查任何更多的事务。操作融合意味着可以对数据单次执行多个操作;在清单1的例子中,
诸如清单1和清单3中的查询的强制性版本通常用于实现中间计算结果的集合,例如过滤或映射的结果。这些结果不仅可以使代码混乱,而且还会使执行错乱。中间集合的实现仅用于实现而不是结果,并且将计算周期消耗到将中间结果组织成仅被丢弃的数据结构中。
相比之下,流管线将其操作尽可能少地传递给数据,通常是单次通过。(有条理的中间操作,如排序,可以引入需要多次执行的障碍)。流管道的每个阶段都会根据需要轻松地生成其元素,计算元素,并将它们直接馈送到下一个阶段。您不需要集合来保存过滤或映射的中间结果,因此您可以节省填充(和垃圾收集)中间集合的工作量。而且,遵循“深度第一”而不是“宽度第一”
除了使用流进行计算之外,您可能需要考虑使用流从API方法返回聚合,以前您可能已返回数组或集合。返回流通常更有效,因为您不必将所有数据复制到新的数组或集合中。回流也往往更灵活;库选择返回的形式可能不是调用者需要的,而且很容易将流转换为任何集合类型。(返回流的主要情况不合适,落后回归实物收集更好,
排比
将计算结构化为功能转换的有益后果是,您可以轻松地在顺序和并行执行之间切换,同时对代码进行最小的更改。流计算的顺序表达式和相同计算的并行表达式几乎相同。清单4显示了如何并行执行清单1中的查询。
清单4.清单1的并行版本
int totalSalesFromNY
= txns.parallelStream()
.filter(t -> t.getSeller().getAddr().getState().equals("NY"))
.mapToInt(t -> t.getAmount())
.sum();
“将流管道表示为一系列功能转换,可实现几项有用的执行策略,如懒惰,并行,短路和操作融合。”
第一行对并行流而不是顺序的请求是与清单1的唯一区别,因为Streams库有效地从计算策略的描述和结构中确定执行它的策略。以前,并行需要对代码进行完全重写,这不仅是昂贵的,而且通常也是容易出错的,因为所产生的并行代码看起来不像顺序版本。
所有流操作都可以顺序或并行执行,但请记住并行性不是魔术性能灰尘。并行执行可能比与顺序执行速度相同或更慢。最好是从顺序流开始,并且当你知道你将获得加速的优势时应用并行性。本系列的后续部分将返回分析用于并行性能的流管线。
精美的打印
因为Streams库是协调计算,而是执行计算涉及到由客户端提供的lambdas的回调,那么这些lambda表达式可以做的是受到某些限制。违反这些约束可能导致流管道失败或计算不正确的结果。此外,对于具有副作用的羔羊,在某些情况下,这些副作用的时间(或存在)可能是令人惊讶的。
大多数流量操作要求传递给它们的羔羊是无干扰和无状态的。不干扰意味着它们不会修改流源;无状态意味着他们不会访问(读取或写入)在流操作的一生中可能改变的任何状态。为减少操作(例如,计算诸如摘要数据sum,min或max)传递给这些操作必须是lambda表达式关联(或符合类似的要求)。
这些要求部分来自事实,即如果流水线并行执行,则流库可以访问数据源或从多个线程同时调用这些lambdas。需要限制以确保计算仍然正确。(这些限制也会导致更直观,更容易理解的代码,而不考虑并行性。)您可能会试图说服自己,您可以忽略这些限制,因为您不认为特定的管道将永远不会运行平行,但最好是抵制这种诱惑,否则你会在你的代码中埋下时间炸弹。
所有并发风险的根源是共享的可变状态。共享可变状态的一个可能来源是流源。如果源是传统的集合ArrayList,则Streams库会假定它在流操作过程中保持不变。(明确设计用于并发访问的集合,例如ConcurrentHashMap,不受此假设的影响)。不仅在流操作期间,不干扰要求不包括其他线程的源突变,而是传递给流操作本身的lambdas也应避免突变来源。
除了不修改流源之外,传递给流操作的lambdas应该是无状态的。例如,清单5中的代码,试图消除任何前一个元素的两倍的元素违反了这个规则。
清单5.使用状态lambdas的流管道(不要这样做!)
HashSet twiceSeen = new HashSet<>();
int[] result
= elements.stream()
.filter(e -> {
twiceSeen.add(e * 2);
return twiceSeen.contains(e);
})
.toArray();
如果并行执行,这个管道将产生不正确的结果,原因有两个。首先,访问该twiceSeen集合是从多个线程完成的,没有任何协调,因此不是线程安全的。第二,因为数据被分区,所以不能保证当处理给定的元素时,该元素之前的所有元素都已被处理。
这是最好的,如果传递给流操作的lambda表达式是完全无副作用-也就是说,他们没有任何突变基于堆的状态或者在执行过程中执行任何I / O。如果他们确实有副作用,他们有责任提供任何必要的协调,以确保这些副作用是线程安全的。
此外,甚至不能保证所有副作用都将被执行。例如,在清单6中,库可以自由地避免执行map()完全传递的lambda。因为源具有已知的大小,所以map()操作被认为是大小保留,并且映射不影响计算结果,库可以通过不执行映射来优化计算!(除了消除与调用映射函数相关的工作之外,这种优化可以将计算从O(n)转换为O(1)。
清单6.具有可能无法执行的副作用的流管道
int count =
anArrayList.stream()
.map(e -> { System.out.println("Saw " + e); e })
.count();
唯一的情况是你会注意到这个优化的效果(除了计算速度快得多),如果lambda被传递到map()有副作用 - 在这种情况下,如果这些副作用不会发生,你可能会感到惊讶。能够进行这些优化取决于流操作是功能转换的假设。大多数时候,我们喜欢它,当图书馆使我们的代码运行更快,我们没有努力。能够做到这样优化的代价是,我们必须接受一些限制,我们通过流淌的行动可以做什么,还有一些我们依赖于副作用。(总体,
第1部分的结论
该java.util.stream库提供了一种简单而灵活的方式来在各种数据源(包括集合,数组,生成函数,范围或自定义数据结构)上表达可能并行的功能性查询。一旦你开始使用它,你会被钩住!在下一期中看的流库的最强大的功能之一:聚集。
网友评论