美文网首页RxJava2.0RxJavaAndroid技术知识
Rxjava2入门教程四:Scheduler线程调度器

Rxjava2入门教程四:Scheduler线程调度器

作者: afa1332 | 来源:发表于2017-07-30 13:16 被阅读2599次

    如需下载源码,请访问
    https://github.com/fengchuanfang/Rxjava2Tutorial
    文章原创,转载请注明出处:
    Rxjava2入门教程四:Scheduler线程调度器
    Scheduler(线程调度器)赋予RxJava简洁明了的异步操作,可以说是RxJava中最值得称道的地方。
    在之前的代码中,Observable发射数据流,Observer接收响应数据流,以及Operators加工数据流均是在同一个线程中,实现出来的是一个同步的函数响应式。但是函数响应式的实际应用却不是这样的,大部分都是后台处理,前台响应的一个过程。Observable生成发射数据流,以及Operators加工数据流都是在后台线程中进行,而Observer在前台线程中接受并相应数据。
    Scheduler(线程调度器)可以让RxJava的线程切换变得简单明了,即使程序逻辑变得十分复杂,他依然能够保持简单明了。

    subscribeOn

    Observable<T> subscribeOn(Scheduler scheduler) 
    

    subscribeOn通过接收一个Scheduler参数,来指定对数据的处理运行在特定的线程调度器Scheduler上。
    若多次设定,则只有一次起作用。

    observeOn

    Observable<T> observeOn(Scheduler scheduler)
    

    observeOn同样接收一个Scheduler参数,来指定下游操作运行在特定的线程调度器Scheduler上。
    若多次设定,每次均起作用。

    Scheduler种类

    Schedulers.io( ):

    用于IO密集型的操作,例如读写SD卡文件,查询数据库,访问网络等,具有线程缓存机制,在此调度器接收到任务后,先检查线程缓存池中,是否有空闲的线程,如果有,则复用,如果没有则创建新的线程,并加入到线程池中,如果每次都没有空闲线程使用,可以无上限的创建新线程。

    Schedulers.newThread( ):

    在每执行一个任务时创建一个新的线程,不具有线程缓存机制,因为创建一个新的线程比复用一个线程更耗时耗力,虽然使用Schedulers.io( )的地方,都可以使用Schedulers.newThread( ),但是,Schedulers.newThread( )的效率没有Schedulers.io( )高。

    Schedulers.computation():

    用于CPU 密集型计算任务,即不会被 I/O 等操作限制性能的耗时操作,例如xml,json文件的解析,Bitmap图片的压缩取样等,具有固定的线程池,大小为CPU的核数。不可以用于I/O操作,因为I/O操作的等待时间会浪费CPU。

    Schedulers.trampoline():

    在当前线程立即执行任务,如果当前线程有任务在执行,则会将其暂停,等插入进来的任务执行完之后,再将未完成的任务接着执行。

    Schedulers.single():

    拥有一个线程单例,所有的任务都在这一个线程中执行,当此线程中有任务执行时,其他任务将会按照先进先出的顺序依次执行。

    Scheduler.from(@NonNull Executor executor):

    指定一个线程调度器,由此调度器来控制任务的执行策略。

    AndroidSchedulers.mainThread():

    在Android UI线程中执行任务,为Android开发定制。

    注:

    在RxJava2中,废弃了RxJava1中的Schedulers.immediate( )
    在RxJava1中,Schedulers.immediate( )的作用为在当前线程立即执行任务,功能等同于RxJava2中的Schedulers.trampoline( )。
    而Schedulers.trampoline( )在RxJava1中的作用是当其它排队的任务完成后,在当前线程排队开始执行接到的任务,有点像RxJava2中的Schedulers.single(),但也不完全相同,因为Schedulers.single()不是在当前线程而是在一个线程单例中排队执行任务。

    示例一:使用一次subscribeOn和一次observeOn

    schedulerDemo1.jpg

    运行代码后,控制台打印如下:

    System.out: 发射线程:RxCachedThreadScheduler-1---->发射:0
    System.out: 发射线程:RxCachedThreadScheduler-1---->发射:1
    System.out: 接收线程:main---->接收:0
    System.out: 发射线程:RxCachedThreadScheduler-1---->发射:2
    System.out: 接收线程:main---->接收:1
    System.out: 发射线程:RxCachedThreadScheduler-1---->发射:3
    System.out: 接收线程:main---->接收:2
    System.out: 发射线程:RxCachedThreadScheduler-1---->发射:4
    System.out: 接收线程:main---->接收:3
    System.out: 接收线程:main---->接收:4
    

    通过subscribeOn(Schedulers.io())指定Observable在Schedulers.io( )调度器的线程中,每隔1秒发射一次数据,通过observeOn(AndroidSchedulers.mainThread())指定Observer在Android UI线程中接收数据。

    示例二:使用两次subscribeOn和一次observeOn

    schedulerDemo2.jpg

    通过subscribeOn(Schedulers.io())指定Observable在Schedulers.io( )调度器的线程中,每隔1秒发射一次数据,通过subscribeOn(Schedulers.newThread())指定map操作符在Schedulers.newThread()的调度器线程中处理数据,通过observeOn(AndroidSchedulers.mainThread())指定Observer在Android UI线程中接收数据。
    运行结果如下:

    System.out: 发射线程:RxCachedThreadScheduler-1---->发射:0
    System.out: 处理线程:RxCachedThreadScheduler-1---->处理:0
    System.out: 发射线程:RxCachedThreadScheduler-1---->发射:1
    System.out: 接收线程:main---->接收:0
    System.out: 处理线程:RxCachedThreadScheduler-1---->处理:1
    System.out: 接收线程:main---->接收:1
    

    我们发现发射和处理数据均是在RxCachedThreadScheduler线程中,第二次通过subscribeOn指定的线程不起作用。

    示例三:使用一次subscribeOn和两次observeOn

    schedulerDemo3.jpg

    通过subscribeOn(Schedulers.io())指定Observable在Schedulers.io( )调度器的线程中,每隔1秒发射一次数据,通过observeOn(Schedulers.newThread())指定map操作符在Schedulers.newThread()的调度器线程中处理数据,通过observeOn(AndroidSchedulers.mainThread())指定Observer在Android UI线程中接收数据。
    运行结果如下:

    System.out: 发射线程:RxCachedThreadScheduler-1---->发射:0
    System.out: 发射线程:RxCachedThreadScheduler-1---->发射:1
    System.out: 处理线程:RxNewThreadScheduler-1---->处理:0
    System.out: 接收线程:main---->接收:0
    System.out: 处理线程:RxNewThreadScheduler-1---->处理:1
    System.out: 接收线程:main---->接收:1
    

    与我们的预期结果一致

    通过示例一二三,我们可以总结subscribeOn和observeOn的用法如下:

    subscribeOn来指定对数据的处理运行在特定的线程调度器Scheduler上,直到遇到observeOn改变线程调度器若多次设定,则只有一次起作用。observeOn指定下游操作运行在特定的线程调度器Scheduler上。若多次设定,每次均起作用。

    示例四:Schedulers.trampoline()

    通过示例一二三我们可以发现,Observer处理数据相比于Observable发射的数据存在滞后的现象,Observable发射了两个数据,Observer才处理了一个,并不是Observable没发射一个,Observer就处理一个。
    运行:

    schedulerDemo4.jpg

    通过Schedulers.trampoline()/设置观察者在当前线程中处理数据,并且故意延迟两秒后在处理
    控制台打印如下:

    System.out: 发射线程:RxCachedThreadScheduler-1---->发射:0
    System.out: 接收线程:RxCachedThreadScheduler-1---->接收:0
    System.out: 发射线程:RxCachedThreadScheduler-1---->发射:1
    System.out: 接收线程:RxCachedThreadScheduler-1---->接收:1
    System.out: 发射线程:RxCachedThreadScheduler-1---->发射:2
    System.out: 接收线程:RxCachedThreadScheduler-1---->接收:2
    System.out: 发射线程:RxCachedThreadScheduler-1---->发射:3
    System.out: 接收线程:RxCachedThreadScheduler-1---->接收:3
    System.out: 发射线程:RxCachedThreadScheduler-1---->发射:4
    System.out: 接收线程:RxCachedThreadScheduler-1---->接收:4
    

    我们可以发现虽然Observer在接收到数据后,延迟了两秒才处理,但是Observable依然在Observer将数据处理完之后才开始发射下一条。Schedulers.trampoline()的作用在当前线程立即执行任务,如果当前线程有任务在执行,则会将其暂停,等插入进来的任务执行完之后,再将未完成的任务接着执行。

    示例五:Schedulers.single()

    schedulerDemo5.jpg

    运行结果如下:

    System.out: 发射线程:RxSingleScheduler-1---->发射:0
    System.out: 发射线程:RxSingleScheduler-1---->发射:1
    System.out: 发射线程:RxSingleScheduler-1---->发射:2
    
    System.out: 处理线程:RxSingleScheduler-1---->处理:0
    System.out: 处理线程:RxSingleScheduler-1---->处理:1
    System.out: 处理线程:RxSingleScheduler-1---->处理:2
    
    System.out: 接收线程:RxSingleScheduler-1---->接收:0
    System.out: 接收线程:RxSingleScheduler-1---->接收:1
    System.out: 接收线程:RxSingleScheduler-1---->接收:2
    

    通过Schedulers.single()将数据的发射,处理,接收在Schedulers.single()的线程单例中排队执行,当此线程中有任务执行时,其他任务将会按照先进先出的顺序依次执行。
    上一篇:Rxjava2入门教程三:Operators操作符
    下一篇:Rxjava2入门教程五:Flowable背压支持

    相关文章

      网友评论

      • junl_yaun:从第一篇看到调度器 总结的很棒 :clap:
      • 黑马有点白986:Observable生成发射数据流,以及Operators加工数据流都是在后台线程中进行,而Observable在前台线程中接受并相应数据。笔误了,Observer
        afa1332:@小腊月 嗯嗯👍,多谢指正😊

      本文标题:Rxjava2入门教程四:Scheduler线程调度器

      本文链接:https://www.haomeiwen.com/subject/rlghlxtx.html