美文网首页
响应式编程开源库 RxJava2——Stream API

响应式编程开源库 RxJava2——Stream API

作者: 阿扎泼柴 | 来源:发表于2019-05-11 18:13 被阅读0次

上一篇响应式编程开源库 RxJava2——关键名词做了一些基本知识的储备。有利于我们更好的理解RxJava内部原理。下面就正式进入RxJava的学习。
在正式进入之前我们先看一看Java 8中的Stream API,因为学习它也是有利于对RxJava的学习,很有殊途同归的意味。

1.Stream API

Stream是Java8的一大亮点,是对容器对象功能的增强,它专注于对容器对象进行各种非常便利、高效的聚合操作(aggregate operation)或者大批量数据操作。Stream API借助于同样新出现的Lambda表达式,极大的提高编程效率和程序可读性。

2.什么是流

Stream不是集合元素,它不是数据结构并不保存数据,它是有关算法和计算的,它更像一个高级版本的Iterator。原始版本的Iterator,用户只能显式地一个一个遍历元素并对其执行某些操作;高级版本的Stream,用户只要给出需要对其包含的元素执行什么操作,比如,“过滤掉长度大于 10 的字符串”、“获取每个字符串的首字母”等,Stream会隐式地在内部进行遍历(在第一部分中提到的迭代器模式的定义就是:提供一种方法顺序访问一个聚合对象中各个元素,而又不需暴露该对象的内部表示。),做出相应的数据转换。Stream就如同一个迭代器(Iterator),单向,不可往复,数据只能遍历一次,遍历过一次后即用尽了,就好比流水从面前流过,一去不复返。

而和迭代器又不同的是,Stream可以并行化操作,迭代器只能命令式地、串行化操作。顾名思义,当使用串行方式去遍历时,每个item读完后再读下一个item。而使用并行去遍历时,数据会被分成多个段,其中每一个都在不同的线程中处理,然后将结果一起输出。
口说无凭,还是通过源码来了解Stream是怎么并行化操作的。比如前篇提到过的将一个List转化为流;

ArrayList<Integer> list=new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            list.add(i);
        }
        Stream<Integer> stream = list.stream();
default Stream<E> stream() {
        return StreamSupport.stream(spliterator(), false);
    }
 public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
        Objects.requireNonNull(spliterator);
        return new ReferencePipeline.Head<>(spliterator,
                                            StreamOpFlag.fromCharacteristics(spliterator),
                                            parallel);
    }

通过源码看一下stream()方法,发现是调用了StreamSupport中的stream方法。它里面包含一个Spliterator它的作用就是用于遍历和分割“源”元素的对象。和Java8以前的iterator相似,只是现在能并行处理源数据。内部实现细节具体可以参考Java 8 之Stream Spliterator

3.流的构成

当我们使用一个流的时候,通常包括三个基本步骤:获取一个数据源(source)→ 数据转换 → 执行操作获取想要的结果。每次转换原有Stream对象不改变,返回一个新的Stream对象(可以有多次转换),这就允许对其操作可以像链条一样排列,变成一个管道。如下图所示:



注意其中的描述:每次转换原有Stream对象不改变,返回一个新的Stream对象(可以有多次转换)。这就是我们前面讲的可变的对象和不可变的对象的运用,所以在Stream中是严格控制对象的不可变性的。

4.流的操作类型

流的操作类型分为两种:

Intermediate(中间转换的操作):一个流可以后面跟随零个或多个intermediate操作。其目的主要是打开流,做出某种程度的数据映射/过滤,然后返回一个新的流,交给下一个操作使用。这类操作都是惰性化的(lazy),就是说,仅仅调用到这类方法,并没有真正开始流的遍历。

Terminal(获取最终结果的操作):一个流只能有一个terminal操作,当这个操作执行后,流就被使用“光”了,无法再被操作。所以,这必定是流的最后一个操作。Terminal操作的执行,才会真正开始流的遍历,并且会生成一个结果,或者一个副作用。

在对一个Stream进行多次转换操作(Intermediate 操作),每次都对Stream的每个元素进行转换,而且是执行多次,这样时间复杂度就是N(转换次数)个for循环里把所有操作都做完的总和吗?其实不是这样的,转换操作都是lazy的,多个转换操作只会在Terminal操作的时候融合起来,一次循环完成。我们可以这样简单的理解,Stream里有个操作函数的集合,每次转换操作就是把转换函数放入这个集合中,在Terminal 操作的时候循环Stream对应的集合,然后对每个元素执行所有的函数。

还有一种操作被称为short-circuiting(打断操作)。用以指:对于一个intermediate操作,如果它接受的是一个无限大(infinite/unbounded)的Stream,但返回一个有限的新Stream(弱水三千,只取一瓢?);对于一个terminal操作,如果它接受的是一个无限大的Stream,但能在有限的时间计算出结果。
当操作一个无限大的 Stream,而又希望在有限时间内完成操作,则在管道内拥有一个short-circuiting操作是必要的。这个操作是什么意思呢?举个例子,现在招聘,只要其中有一个符合条件,我们就终止招聘。这时候我们就不用继续下去了,节约了资源成本并且高效。

流的常见操作类型

Intermediate(中间转换操作):
map (mapToInt, flatMap 等)、 filter、 distinct、 sorted、 peek、 limit、 skip、 parallel、 sequential、 unordered

Terminal(获取最终结果的操作):
forEach、 forEachOrdered、 toArray、 reduce、 collect、 min、 max、 count、 anyMatch、 allMatch、 noneMatch、 findFirst、 findAny、 iterator

Short-circuiting(打断操作):
anyMatch、 allMatch、 noneMatch、 findFirst、 findAny、 limit

5.流的应用

简单说,对Stream的使用就是实现一个 filter(筛选)-map(映射)-reduce(聚合) 过程,产生一个最终结果,或者导致一个副作用。下面举例说明,我们有一个1-10的int集合,现在要选出偶数,并且按大小排列打印出来。我们一般的做法是:

ArrayList<Integer> list1=new ArrayList<>();
        for (int i = 0; i < list.size(); i++) {
            if (i%2==0){
                list1.add(list.get(i));
            }
        }
        Collections.sort(list1);
        for (int i = 0; i < list1.size(); i++) {
            Log("偶数排序后"+list1.get(i));
        }

如果用Stream API,中间操作filter对偶数进行筛选,sorted对数据排序,forEach对每个数据进行了打印。
filter(过滤器): Stream 有一个只接受 Predicate 这个函数式接口的方法。我们可以在 Predicate 里写作用在数据上的逻辑代码。
map(映射):Stream 有一个只接受 Function 这个函数式接口的方法。我们可以在 Function 里写按照我们的要求转换数据的逻辑代码。

list.stream().filter(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) {
                return integer%2==0;
            }
        }).sorted(new Comparator<Integer>() {
            @Override
            public int compare(Integer o1, Integer o2) {
                return o1>o2?o1:o2;
            }
        }).forEach(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) {
                Log("偶数排序后"+integer);
            }
        });

通过两种方式的对比,一般做法我们进行了两次自我管理的遍历。使用Stream API时,我们只需要把数据源转化成数据流,就可以利用各种中间转换得到想要的结果,而并不用关心遍历。当执行terminal 操作时,才会开始遍历一次数据,并对每个数据做出相应的转换,这时候的遍历不用我们关心。


对于Stream API的理解可以用水流来理解,有一杯水里面有石头沙子,我们要得到纯净的水就要过滤掉石头和沙子。


我们参照Intermediate操作和Terminal操作的定义来理解。过滤石头为第一步Intermediate操作,它会作用在整个流中,过滤沙子为第二步Intermediate操作,也会作用在整个流中,获取到水是最终的Terminal操作。Terminal操作的执行,会打开水流,对它进行过滤,得到想要的水。这就是多个转换操作只会在Terminal操作的时候融合起来,一次循环完成的意思。

6.对最近学习的概念的总结

基于这段代码,来实际看一看前面讲的东西是怎么运用到Stream API中的,或者说是怎么运用到Rx中的。

list.stream().filter(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) {
                return integer%2==0;
            }
        }).sorted(new Comparator<Integer>() {
            @Override
            public int compare(Integer o1, Integer o2) {
                return o1>o2?o1:o2;
            }
        }).forEach(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) {
                Log("偶数排序后"+integer);
            }
        });
  • 迭代器模式: 不关心怎么遍历。
  • 函数式接口: 可以决定该接口的功能,比如Predicate中方法 boolean test(T t);传入一个参数t返回boolean,所以它有判断功能。 Function接口中的方法 R apply(T t);传入一个参数t,返回另一个对象R,所以它的功能是转换。
  • 高阶函数: filter接收Predicate函数作为参数,返回Stream函数。
  • 纯函数 : 输出由输入决定,没有副作用(不会对其他的进行修改)test 、compare 、accept等方法。

相关文章

网友评论

      本文标题:响应式编程开源库 RxJava2——Stream API

      本文链接:https://www.haomeiwen.com/subject/xrfxaqtx.html