RxJava进阶之源码分析(part 1)- map() 操作符

作者: qing的世界 | 来源:发表于2016-04-17 14:47 被阅读1015次

    终于到了分析源码的部分了。很多朋友在使用过RxJava之后都会觉得这个库很玄妙,竟然能把事件发生的源不停的通过不同的操作符改变。比如说这次介绍的map就是,在抽象的概念上,我们经常要求使用者要把map操作符当成改变源stream的一个方法,也就是说map把整个事件的发射流重新构造了一次。

    示例图上面,整个事件流变成了一个完全不一样的流

    但是其实map操作符真的创造了一个新的流么?

    肯定不是!

    这个答案是非常明显的,如果RxJava的操作符都是这样要等源发射流全部分析完再重新构造的话(一个map重新遍历一次的话),效率上肯定说不过去。所以我们可以肯定一点,即使再多个map(),我们肯定也是一边发射源stream的元素一边进行map()里面的转换的。RxJava并不是一个新的语言,java没有的它也没有,一切你们看到的操作符,只是对java现有的一些常用api的整合而已。尤其是我们经常用的subscribeOn,observeOn,map,等等,都只是把引用,线程池,ExecutorService玩耍出来的组合。

    当然,我的意思不是说RxJava很简单。。。。只是作者们在Java的基础上,非常优雅的把java现有的东西完美的结合起来,使得这个库使用起来就像函数式编程的感觉。

    那RxJava到底是怎么优雅的实现的呢?

    在我们深入代码之前,我们先来补充一些细节:

    1.使用链式调用产生新对象,优雅的让新对象持有源对象的引用的。

    比如 下面一个例子:

    一个优雅的链式调用

    大家可以看到line 13,这个方法generateNext()返回了一个新的Ob对象,但是把自己-this,传给了新生成的对象,新的对象就获取了源对象的引用了。于是在我不断的链式调用方法generateNext()的时候,虽然最后获取的是最后一个Ob对象,但是我的新对象已经拥有一条完整的,通向源对象的链了。不同于以往我们一般的setNext()这种方法,generateNext()返回一个新的Ob对象,所以程序可以以一个链式的结构写出来。

    这里可以提前告诉大家,RxJava里面的Observable就是这样的,每一次链式的调用操作符,比如map,返回的都是一个新的Observable,而新的Observable是持有旧的Observable的引用的(其实也不是整个Observable,是Observable的一个成员对象)。


    2.一个Observable所需要的元素

    一个Observable需要两个最重要的元素,一个是OnSubscribe对象,一个是Subscriber。前者负责定义整个Observable所要执行的action,后者则是定义怎样处理每一个发射的元素,一般来说后者都是用户自己实现的。

    比如以下例子:

    发射1,2

    这个例子很简单,我们创建了一个发射数字1和2的流,在create方法里面,我们创建了一个新的对象OnSubscribe,它定义了我们的事件发生的顺序的元素的个数种类。而subscribe()方法里面我们创建了一个新的Subscriber对象。让我们看看create()方法做了些啥。

    step1 step2

    仅仅是return了一个Observable,然后把OnSubscribe对象的引用拿到手而已。

    然后每次我们call subscribe()方法的时候发生了啥呢?我们把代码跟踪到subscribe(),

    注意line 8191 不是很清楚这个hook的意义,但是不影响我们理解

    看到了嘛?每次在subscribe()的时候,我们会调用我们Observable里面的onSubscribe对象的call方法,参数就是我们传进subscribe()的subscriber对象(当然严格意义上来讲并不是同一个subscriber,subscribe()方法在前面一点的地方稍微包装了一下我们传进的subscriber,但是我们可以理解为同一个)。

    可以理解为一个subscriber

    所以总结一句,每一个要发射元素的Observable,都必须会有一个OnSubscribe对象和Subscriber。前者定义执行的顺序、事件,后者处理。


    3.分析!

    ok,在了解了这些先决知识之后,我们可以深入分析map到底是怎么工作的了。

    把一个新的OperatorMap传进lift() lift的重点在于生成新的subscriber,把它传进上一个Observable的OnSubscribe对象,line 162

    让我们重点看看图2,lift()方法生成了一个新的Observable,新的Observable是干嘛的?

    1.新的Observable的OnSubscribe对象的call()方法里面,我们把用户定义的Subscriber对象,line 156-158 ,通过我们的转换操作符map,转换成了一个新的subscriber

    2.生成新的subscriber,传给上一个Observable的onSubscribe对象执行call(). 大家一定要仔细看清楚,line 162的这个OnSubscribe,是新的Observable的对象还是旧的Observable的对象。

    简单的用图来解释一下可能更清楚:

    新生成的Observable会在用户call subscribe()的时候,把原有的,用户自己定义的subscriber修改了之后(lift()里面做的),传给上一个Observable的OnSubscribe对象处理。


    那么在第一步的时候,我们对用户原有的subscriber动了什么手脚?

    这是第二重要的点,用户原有的subscriber会在lift()里面,被map这个Operator改造

    改造了!

    这个Operator就是OperatorMap。让我们看看这个Operator的call方法究竟做了些啥,让原有的subscriber变成一个怎样的新的subscriber。

    很简单有木有!

    乍一看!好简单。

    的确,map这个操作符的确很简单,让我们来看看它把原来的,用户定义的subscriber怎么改造了。

    1.line 39, 我们可以看到call()方法返回一个新的subscriber

    2.line 54, 这个o对象是啥?没错,就是原来旧的subscriber,用户自己创建的subscriber。

    3.line 54,  transformer就是我们转换的方法,也就是map()里面的Func1对象。

    也就是说,新的subscriber每次在执行onNext的时候,其实是把上一层Observable发射的元素先用map里面在转换方程转换成新的类型,然后再用下一层的subscriber去发射新的类型的元素。

    比如我们把一组Integer转换成String对象,在上图里面的新的Subscriber的泛型T就是Integer,旧的Susbcriber(用户自己定义的Subscriber)的泛型就是R。

    这里就很有意思了,这个过程里面发生了很多有趣的事情。

    我们在subscribe()之后,会把原有的subscriber不停的包装,每map()一次就会包装一次,一路传给最顶端,最原始的Observable,然后在发射元素的时候,会不停的调用转化方程,发送给下一级的subscriber。

    为什么说这个过程很有趣呢?有趣的地方在于,这个过程像在爬山一样,最开始我们要一路往上走,最下面的Observable持有上一层的Observable的引用,而在最顶端的原始Observable要发射元素的时候,subscriber已经是包装过好多层的subscriber了,执行onNext() 的时候,会一路往下调用下一层的subscriber的onNext().,所以回到我们在文章最开头讲的,每一次一个元素在发射之后,我们会即刻执行该元素的转换(也就是不停的执行下一级的onNext()),而不是重新构造一个新的流。

    换句话说,Observable的执行链一路向上,Subscriber的执行链一路向下,他们产生链的方向恰好是相反的。

    不过想想也是合情合理的,因为从逻辑上来讲,我们create的一个Observable的是一个原始的Observable,只拥有原始数据,但是用户却是提供的一个不同参数类型的,最终类型版本的Subscriber,原本方向就是相反的。

    让我们再把思维拓展一下,发生多层map的过程是咋样的:

    下上下!

    再看完整篇文章之后,再来体会一下这张图,是不是豁然开朗了呢?产生新的Observable向下,产生新的Subscriber向上,执行onNext向下。

    三个字总结,

    下上下!

    下上下!搓出了大招!

    创建新的Observable一路向下,创建新的Subscriber一路向上,最后每次发射元素的时候一路向下直到到达用户创建的subscriber为止。

    希望大家能好好理解一下这里设计的精妙之处。下周会带来subscribeOn()操作符的源码讲解。

    大家周末愉快

    相关文章

      网友评论

      • 97da4f1c6bb7:你好 ! lift它返回的是一个Observable,为什么里面的onSubscribe引用还是上一个 Observable的引用?
        qing的世界:154 行,返回一个新的Observable,但是你看新的Observable还是持有上一个Observable里面的OnSubscribe 对象的引用啊。它这样做的目的是让下层的observable持有上一层的引用,这样你subscribe之后final subscriber会一路往上传到第一层里面。

      本文标题:RxJava进阶之源码分析(part 1)- map() 操作符

      本文链接:https://www.haomeiwen.com/subject/fyuolttx.html