RxJava进阶之源码分析(part 2)- subscribe

作者: qing的世界 | 来源:发表于2016-04-24 18:17 被阅读2624次

    今天要给大家介绍一下RxJava里面线程切换的第一个操作符subscribeOn()操作符。

    说到subscribeOn,可能很多同学还不太清楚实际使用它的时机。那么我们先来看看一个小例子。

    简单的subscribeOn

    大家可以先猜想一下打印的结果。再结合上一篇的map操作符源码分析的结论来看看,每次的log的顺序。

    1、map和onNext方法分别发生在哪个线程?

    2、log打印的顺序是怎么样?是每次把同一个数字的map里面的log和onNext先打印了再打印下一个?还是说先把所有的数字的map里面的log打印完毕之后再打印onNext?

    你答对了吗?

    让我们来看看实际的打印结果:

    每一个数字处理完了才会处理下一个,在同一个线程里面处理

    如果充分理解了上一篇map操作符的同学想必已经答对了。如果没有理解的同学建议好好看看上一篇,消化好了再来重头看看这篇。因为map是非常基础的操作符,理解好map对理解接下来的线程转换操作符非常有帮助。

    没错,相信很多同学都已经知道了,subscribeOn操作符会影响在subscribeOn上面的代码的执行线程,比如上图里面,subscribeOn将发射数字和map的执行过程放在了io线程里面执行。(observeOn刚好相反,会将执行在observeOn下面的代码放入所指定的线程中执行)

    code above !

    ok,那么假如说咱们的需求改变了,发射的时候我们要运行在IO线程,但是map操作符的变换我们想在安卓的主线程执行!

    嘿嘿,学以致用。咱们现在就动手,代码变成了下面的:

    让我们看看能不能行

    哈哈,欢天喜地,让我们来看看线程是不是真的如我们所愿改变好了:

    wtf?

    不对啊???!!!

    说好的subscribeOn会改变上面代码的执行线程嘛???怎么不对了

    说好的幸福呢?

    先放上结论:同一个Observable,无论你call多少次subscribeOn,只有第一个subscribeOn会起作用。

    那究竟为什么呢?

    咱们先上一个小小的“比喻”;

    Log发生在哪个线程?1?2?3?

    大家应该都可以理解,上图中的Log将会在线程3打印出来。纵使你在外面套再多层thread,最后Log只会在最里面的线程执行。RxJava里面subscribeOn就是这么个意思。只不过不是这么直白的用Thread来执行,而是把所有的代码包装进Runnable里面,然后把Runnable丢进一个个ExecutorService里面执行。

    不了解ExecutorService的同学先看看这里:http://tutorials.jenkov.com/java-util-concurrent/executorservice.html

    那么我们就来一探究竟吧!跟踪代码!

    图1 图2 图3

    subscribeOn在执行只后会生成一个中间层Observable,ScalarSynchronousObservable,然后再执行lift(),然后就像上一篇文章里面讲的lift(),也会返回一个新的Observable。那么这一下我们就有了三个Observable了。注意这个中间层Observable也是持有源Observable的引用的。

    多生成了一个中间层Observable

    如果还熟悉咱们的分析流程的话,就知道接下来我们应该从哪个Observable开始分析了。

    大家应该还记得,Observable在转换的时候,最关键的就是那个lift()方法,生成新的subscriber传递给上一层的Observable,那么我们看看subscribeOn的操作符在Rxjava里面的对应类OperatorSubscribeOn(在图1的8234行出现),是怎么把原有的subscriber改造的

    图3

    在RxJava中,每一个Scheduler都对应了一个ExecutorService,每一个ExecutorService都拥有自己的线程池,这个线程池所有的线程都有相同的前缀。点击这里查看怎样给ExecutorService分配线程池。

    所以在上图中line 41就是通过Scheduler获取对应的ExecutorService。而这个操作符生成的新的subscriber是一个Observable类型的subscriber。每次在执行onNext的时候会把onNext中的参数的Observable发射到线程池中subscribe().(line 57, schedule()方法出卖了inner对象的本质,ExecutorService的生成过程大家可以自己跟踪代码查看)

    大家可能看到这里就有点点晕了,这哪跟哪啊。。。为什么要把原有的subscriber包装给一个Observable类型的subscriber呢?

    这时候我们别着急,先仔细想想一个最重要的,贯穿在整个RxJava所有操作符的一个问题:

    在lift方法里面,我们新生成的subscriber会传给谁使用?

    lift方法里面生成的新的subscriber会传给上一个Observable使用。那在subscribeOn的这个例子里面,上一层的Observable是谁?

    是中间生成的那个Observable!它的执行过程是啥样的,咱们来瞅瞅

    中间层的Observable只做了一件事情

    显然,这个中间层ScalarSynchronousObservable,它只做了一件事情,把它上一层的Observable传进OnSubscribe的call方法里面(还记得它的构造函数的调用嘛?参数就是上一层的Observable),交给下一层传上来的subscriber处理,执行其onNext方法。

    call里面的subscriber,就是我们在图3里面生成的新的Subscriber。

    再回想一下新生成subscriber的onNext做了啥?

    这下就很明显了,这个中间层Observable,会在我们subscribe只后,把它上一层的Observable发射到另一个线程中subscribe();也就是说一个简单的subscribeOn,如下图:

    图5 图6

    图5和图6的效果是一样的!也就说,subscribeOn最简单的解释,就是把上面的代码,或者说上一层的Observable丢到一个新的线程去发射元素了。

    用一个图来表达执行的流程:

    流程

    相信这也就解释了为什么,每个Observable只有第一个subscribeOn会起作用了。

    但是问题来了,假如说我在subscribeOn之后再执行了若干map操作呢?为何之后的map也是发射在subscribeOn指定的线程呢?

    其实想想很简单,虽然我们在subscribeOn之后,只会将上一层的Observable发射在新的线程,但是最上层的Observable拿到的subscriber已经是经由最下层subscriber一路包装上来的,包含所有转换的subscriber,也就是说,我们所有的map的转换,不管是发生在subscribeOn之前还是之后,最后都会在subscribeOn指定的线程里面执行

    纵使在subscribeOn之后还有多层的map

    subscribeOn是一个非常重要的线程切换操作符,希望大家可以结合上一篇文章,好好理解它的用法。同时,因为我并没有详细介绍,Scheduler是怎样生成对应的ExecutorService,以及ExecutorService是怎么样维护自己的线程池的,大家可以自行跟踪代码看看不同Scheduler的实现。在看完IO,Computation,newThread这三个线程调度器之后,大家可以在不看源代码的情况下,想想AndroidScheduler.mainThread()这个安卓的主线程调度器可以怎么实现,这样有助于大家举一反三的思考。

    今天就到这,下周我会继续讲解observeOn操作符。RxJava的系列也就到了尾声啦!

    各位周末愉快!热热身晚上准备dota2五连黑了

    相关文章

      网友评论

        本文标题:RxJava进阶之源码分析(part 2)- subscribe

        本文链接:https://www.haomeiwen.com/subject/kkefrttx.html