在这篇文章中您主要可以看到以下内容:
- 流简介
- 流操作
- 收集器
- 并行流
前言
关于Java8的其他信息和代码的完整部分可以在这里找到。
流简介
通过与集合进行对比进而说明流的特性。集合是一种数据结构,主要目的是以一定的时/空复杂度提供访问和存储服务,所以对于集合来说,数据才是最重要的,而对于流来说,更加关心对这些数据可以进行什么操作,也就是说流是面向计算的,对比来看,主要有以下一些区别
- 流不会被存储下来供程序再次使用,流在执行终端操作之后会销毁,而集合根据声明方式可以被多个方法或对象访问。
- 流和集合在对内部元素进行操作的时候都需要进行迭代,不同的是流是内部迭代,集合是外部迭代,内部迭代的意思是java会自动进行迭代过程,而我们只需要将在迭代过程中需要执行的操作通过lambda表达式进行传递即可,流会自动优化需要执行的操以提高执行效率,而外部迭代是指通过foreach或者for等控制流语句进行显示迭代,虽然在外部迭代中也可以手动进行优化,但是难度高且优化程度有限。java8 in action书中关于内部迭代和外部迭代有一个很有意思的说明,建议去看看。
当然,流和集合也有很强的关联,因为在程序中存储数据的主要方式就是集合,而面向计算的流的数据也主要来自于集合,当然,流也可以来自于函数生成、文件转化等。如果一个流的数据源是一个有序集合,那么这个流将会是一个有序流,反之,如果一个流的数据源是一个无序集合,那么这个流将会是一个无序流,顺序对于并行来说并不是一件好事情。
流操作
流操作主要有两类,一类是中间操作,一类是终端操作。中间操作是指那些会返回一个流的操作,所以可以将多个中间操作串联起来,完成一个完整的功能,值得注意的是,除非操作链上触发一个终端操作,否则中间操作都是不执行的,因为当终端操作确定之后,需要执行的中间操作也就确定了,然后java就可以根据这些操作进行优化,也就是说,终端操作在一个操作链中是最后一个且唯一的操作。终端操作会返回一个流之外类型的值,表示操作链的最终结果。
中间操作
过滤截断和跳过(Filter、Limit、Skip)
过滤filter,接受一个谓词,就是返回值为boolean的函数接口,使用函数中的方法对流中的每个元素进行判断,保留那些返回值为true的元素。
使用例子如下:
// 还是用苹果来测试把
List<Apple> testList = new ArrayList<>();
testList.add(new Apple("red", 400, "FirstApple"));
testList.add(new Apple("red", 600, "SecondApple"));
testList.add(new Apple("green", 400, "ThirdApple"));
testList.add(new Apple("green", 600, "FourthApple"));
// filter
public static void filterTest(List<Apple> appleList, Predicate<Apple> p) {
appleList.stream().filter(p).map(Apple::getAppleName).forEach(System.out::println);
}
// filter过滤出重量大于500的苹果
StreamTest.filterTest(testList, a -> a.getWeight() > 500);
还有个比较特殊的过滤器:distinct,如果您对数据库有所涉猎,那么对这个过滤器一定不会陌生,他的功能也正如你所想的那样,去除流中重复的元素,去除方式是通过对比流中元素的hashcode,如果hashcode相同还要进行equals比较。
截断limit,参数是一个整形值,表示获取几个元素,如果流是一个无序流,那么返回的结果也是无序的,也就是说可能出现两次执行结果返回不同结果的情况,比如并行情况下进行截断操作。
一个使用例子如下:
// limit
public static void limitTest(List<Apple> apples) {
apples.stream().limit(2).map(Apple::getAppleName).forEach(System.out::println);
// limit获取前两个苹果的信息
StreamTest.limitTest(testList);
}
当然,还有一个与limit互补的操作,skip,跳过一定数量的元素,使用例子如下:
// skip
public static void skipTest(List<Apple> apples) {
apples.stream().skip(2).map(Apple::getAppleName).forEach(System.out::println);
}
// skip前两个苹果
StreamTest.skipTest(testList);
映射(Map)
简单来说map操作就是将流中的内容从一个东西变成另一个东西,比如上面很多例子中将Stream<Apple>映射成了Stream<String>,使用Map操作一定要十分注意经过该操作后流中元素的类型变化,这样才能在后续执行中得到预期的结果,另外Map操作接受函数签名类似于(T)->(R)的方法变量,其中输入可以是多个,但是一定要有返回值。
还有个比较特殊的映射,flatmap,该操作可以把操作链上产生的多个流合并成一个流,先看个例子:
Stream<String> ss = Arrays.asList("this","is","a","test").stream().map(s->s.split("")).flatMap(Arrays::stream);
ss.forEach(System.out::println);
在上面的例子中如果将flatmap替换成map将会得到下面的结果:
Stream<Stream<String>> sss = Arrays.asList("this", "is", "a", "test").stream().map(s -> s.split("")).map(Arrays::stream);
原因在于:最开始流中的元素类型是String,经过第一个map之后变成了List<String>,经过Arrays::stream之后List变成了Stream<String>,到这一步为止,流中的每个元素都是一个Stream<String>,所以整个流就表示成为Stream<Stream<String>>,说明一个流由多个流组成,而flatmap的作用就是将这些流整合成一个流。
终端操作
匹配和查找(Match、Find)
匹配主要有以下几种:anyMatch、allMatch、noneMatch,作用分别是匹配任意一个、所有都匹配、所有都不匹配,anyMatch、allMatch和noneMatch将返回一个boolean结果,它们都接受一个谓词((T)->(Boolean))。例子如下:
// allMatch
public static void allMatchTest(List<Apple> apples, Predicate<Apple> p) {
boolean flag = apples.stream().allMatch(p);
System.out.println(flag);
}
// 验证所有苹果重量是否都大于300
StreamTest.allMatchTest(testList, a -> a.getWeight() > 300);
// noneMatch
public static void noneMatchTest(List<Apple> apples, Predicate<Apple> p) {
boolean flag = apples.stream().noneMatch(p);
System.out.println(flag);
}
// 验证是否没有苹果重量大于500的
StreamTest.noneMatchTest(testList, a -> a.getWeight() > 500);
// anyMatch
public static void anyMatchTest(List<Apple> apples, Predicate<Apple> p) {
boolean flag = apples.stream().anyMatch(p);
System.out.println(flag);
}
// 验证是否由重量小于300的苹果
StreamTest.anyMatchTest(testList, a -> a.getWeight() < 300);
查找主要有findAny和findFirst,它们不接受参数,它们的作用是和其他操作结合,形成短路的效果,比如我只需要找到一个符合条件的值而不是遍历整个流找出所有符合条件的值。返回值将是一个Optional类性,Optional类型屏蔽了返回值为空的情况(避免可能出现的空指针异常)。可以使用get、orelse获取其中的元素,orelse要求提供一个参数,在Optional没有元素的情况下将返回该参数。findAny和findFirst的区别主要是并行方面,findFirst主要针对的是顺序流,而顺序流在并行方面会有诸多限制,而findAny不会有这些限制,所以通常来说findAny的效率将比findFirst好很多。具体的使用例子如下:
// findAny
public static void findAnyTest(List<Apple> apples, Predicate<Apple> p) {
Optional<Apple> optional = apples.stream().filter(p).findAny();
try {
System.out.println(optional.get().getAppleName());
} catch (NoSuchElementException e) {
System.out.println("can not find");
}
}
// findFirst
public static void findFirstTest(List<Apple> apples, Predicate<Apple> p) {
Optional<Apple> optional = apples.stream().filter(p).findFirst();
try {
System.out.println(optional.get().getAppleName());
} catch (NoSuchElementException e) {
System.out.println("can not find");
}
// 寻找任何一个颜色为红色的苹果
StreamTest.findAnyTest(testList, a -> "red".equals(a.getColor()));
// 寻找第一个重量小于300的苹果
StreamTest.findFirstTest(testList, a -> a.getWeight() < 300);
归约(reduce)
归约的功能是将多个值合并成一个值,但是不能改变容器的类型,这和Collect有本质的区别,虽然在一些情况下两者可以互换。reduce接受两个参数,一个初始值(非必需),一个多输入,单返回值类型的函数变量,具体的例子如下:
// reduce
public static void reduceTest(List<Apple> apples, BinaryOperator<Integer> binaryOperator) {
Integer i = apples.stream().map(Apple::getWeight).reduce(0, binaryOperator);
System.out.println(i);
}
// 计算所有苹果的重量
StreamTest.reduceTest(testList, (a, b) -> a + b);
reduce操作对于那些满足一定条件的数据,比如满足结合律,或者说操作无状态(后面介绍),那么reduce将以并行的方式来执行操作。比如上述例子中的加法操作,将会以并行的方式进行运算。
操作的状态和界
如果一个操作为了能够获取结果需要添加内部状态来进行记录,那么这个操作就是有状态的,比如求和操作,需要一个内部状态来进行累加,而像filter,map等操作是完全不需要额外内部状态的,因此他们是无状态的。如果一个操作是有状态的,且这个操作需要访问的流的长度是可确定的,比如max,min等,访问长度为1,那么这个操作又是有界的,如果需要访问的流的长度不能确定(随着流的长度变化而变化),比如distinct,sort等操作,这有可能需要访问所有流元素后,才能得出结果,比如一个质数流倒序,这将是不能完成的,所以这类操作是无界的。对于无状态操作来说,并行是十分容易的,对于有状态有界的操作来说,精心设计小心使用也是可以在并行的情况下工作良好的,但是对于有状态且无界的操作来说,并行将变得比较困难且不可控。
数值流
与基础类型特化的函数接口相似,流也提供了对原始类型的支持,免去了拆箱与装箱的操作。基础数据流就是在流的前面加上相应的基础类型符号,比如IntStream、LongStream等等,当然,也可以由普通流直接生成,生成方式是mapToXxx(Xxx表示对应的基础类型),之后可以通过boxed()转换成普通的对象流。除了常见的流的操作,针对数值流,java8提供了一些方便的操作:最大最小值、平均值、求和、计数等。使用例子如下:
OptionalDouble result = apples.stream().mapToInt(Apple::getWeight).average();
mapToInt将Stream<Apple>转化成了Stream<int>,即IntStream,然后使用求取平均值的函数,该函数返回一个OptionalDouble,如果流中没有内容,那么将会返回一个null值,OptionalDouble会处理这种情况。
收集器(Collect)
Collect是一个终端操作,上面介绍的所有终端操作都可以使用Collect来完成,只是针对适合的情况使用上面介绍的终端操作将会更加方便。Collect预定义了很多常用的收集器,可以搭配使用完成复杂的功能,如果这些接口仍旧不能满足你的需求,你可以实现Collector接口并定义自己的收集器
规约和汇总
这部分提供的功能和上面介绍的有些类似。maxBy、minBy,它们都接受一个Comparator,可以找出流中的最大和最小值。summingXxx、averagingXxx,可以求出流中对应Xxx类型的和和平均值,如果这些你都需要,那么summarizingXxx可以满足要求,它可以一次求出上述的所有值,这些值会保存在XxxSummaryStatistics的类里,一个示例如下:
IntSummaryStatistics result = apples.stream().collect(Collectors.summarizingInt(Apple::getWeight));
还有一个joining函数,它允许你将流中的元素作为字符串进行连接:
String result = apples.stream().map(Apple::getAppleName).collect(Collectors.joining(" "));
分组(groupBy)分区(partitioningBy)
按照指定的键对流中的元素进行分组,groupBy接受两个参数,第一个参数是用于分组的键值,第二个参数是收集器。groupBy函数将根据第一个参数将流划分成几个子流,然后对每个子流应用收集器,然后groupBy函数将收集器结果作为value,将键值作为key,组合成一个map作为最后结果。
因为groupBy的第二个参数接受收集器,所以可以给第二个参数传递groupBy以达到多层次分组的效果,但是执行流程依旧是从外层依次往里层执行,即每层都将流分成不同的子流,然后对该子流应用下一层的收集器。
当然也可以传递其他的收集器,搭配groupBy使用。
CollectingAndThen可以用来处理返回为Optional对象为空的情况。
一个例子:
// 先按照苹果的重量分组,然后安装苹果的颜色分组,最后用苹果的名字来表示分组的结果
Map<Integer, Map<String, List<String>>> result =
apples
.stream()
.collect(
Collectors.groupingBy(
Apple::getWeight,
Collectors.groupingBy(
Apple::getColor,
Collectors.mapping(Apple::getAppleName, Collectors.toList()))));
分区是分组的特殊情况,与分组不同的是分区的第一个参数是一个谓词,按照谓词的判断将数据分为两组,分区也支持嵌套分区。
定义自己的收集器
当系统提供的收集器无法满足需求或者使用后无法获得较好的性能,可以尝试定义自己的收集器。定义自己的收集器需要实现Collector接口,下面是实现计算吸血鬼数字(think in java中的一道练习题)的一个收集器。结合这个例子来说明定义收集器的步骤:
@Override
public Supplier<List<Integer>> supplier() {
return () -> new ArrayList<Integer>();
}
实现自己的收集器需要完成Collector接口定义的5个方法,第一个方法是supplier()方法,这个方法返回一个Supplier<T>函数,Collect会调用这个函数生成用于记录结果的数据结构,比如上面返回一个ArrayList用来记录结果。
@Override
public BiConsumer<List<Integer>, Integer> accumulator() {
return (List<Integer> list, Integer i) -> {
if (judgeTheNumWhetherVampireNum(i)) list.add(i)
};
}
第二个方法是accumulator,这个方法返回一个Biconsumer函数,这个函数将被用来实现具体的规约过程,它接受两个参数,一个是第一个方法生成的用于记录结果的数据结构,第二个是当前需要处理的流中的元素,比如上面,如果这个数字是吸血鬼数字,就将这个数字记录下来。
@Override
public BinaryOperator<List<Integer>> combiner() {
return (List<Integer> list1, List<Integer> list2) -> {
list1.addAll(list2);
return list1;
};
}
第三个方法是combiner方法,这个方法返回一个BinaryOperator,collect将调用这个方法返回的函数来合并并行过程中产生的各个子流。如果流不支持并行,那么这个方法可以返回空。
@Override
public Function<List<Integer>, List<Integer>> finisher() {
return Function.identity();
}
第四个方法是finisher方法,返回一个Function函数,这个函数将用来将规约结果转换为最终结果。
@Override
public Set<Characteristics> characteristics() {
return Collections.unmodifiableSet(
EnumSet.of(
Characteristics.IDENTITY_FINISH,
Characteristics.CONCURRENT,
Characteristics.UNORDERED));
}
这个函数主要就是用来指明自定义的收集器的一些特性,有三个可选项:
- IDENTITY_FINISH:归约结果不受流中项目的遍历和累积顺序的影响。
- CONCURRENT:accumulator函数可以从多个线程同时调用,且该收集器可以并行归约流。如果收集器没有标为UNORDERED,那它仅在用于无序数据源时才可以并行归约。
- UNORDERED:这表明完成器方法返回的函数是一个恒等函数,可以跳过。这种情况下,累加器对象将会直接用作归约过程的最终结果。这也意味着,将累加器A不加检查地转换为结果R是安全的。
然后可以这样来使用这个收集器:
List<Integer> result =
IntStream.range(1000, 9999).filter(i -> i % 100 != 0).boxed().collect(new VampireNumber());
并行流
并行通常来说都是十分复杂的问题,流尽可能得将并行简化,但是任然不能规避很多并行问题,比如共享变量得问题,就如前文所述,对于一个无状态的流进行并行化通常来说是比较容易的,但是对于有状态的流是比较困难的。
并行并不是免费的,他将耗费一部分资源,在决定并行之前需要考量通过并行获得的好处是否能够抵消并行需要的开销,比如对一个小数据集进行并行化就不是一个很好的决定,另外选择合适的数据结构也是十分重要的,同时流的所有操作并不是都适合并行化,通过sequential和parallel可以精确控制每个操作是使用并行的方式还是串行的方式。下面是一些数据结构对并行的支持程度:
网友评论