美文网首页Retrofit RXjava 网络框架android 开发程序员
RxJava入门与提高-线程控制Scheduler篇(4)

RxJava入门与提高-线程控制Scheduler篇(4)

作者: 小玉1991 | 来源:发表于2017-11-09 13:01 被阅读39次

    前言

    Scheduler 的 API

    在RxJava 中,Scheduler ——调度器,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程。
    RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景:

    • Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。
    • Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
    • Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
    • Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
    • AndroidSchedulers.mainThread(), Android 还有一个专用的 ,它指定的操作将在 Android 主线程运行。
      有了这几个 Scheduler ,就可以使用subscribeOn() 和 observeOn() 两个方法来对线程进行控制了。
      (1)subscribeOn(): 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。
      (2)observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。

    线程操作的知识

    先看两个栗子:

    栗子1
    Observable.create(new Observable.OnSubscribe<String>() {
                       @Override
                       public void call(Subscriber<? super String> subscriber) {
                           Logger.v( "rx_call" , Thread.currentThread().getName()  );
     
                           subscriber.onNext( "dd");
                           subscriber.onCompleted();
                       }
                   })
     
                   .subscribeOn(Schedulers.io())
                   .observeOn(AndroidSchedulers.mainThread())
     
                   .map(new Func1<String, String >() {
                       @Override
                       public String call(String s) {
                           Logger.v( "rx_map" , Thread.currentThread().getName()  );
                           return s + "88";
                       }
                   })
                   .subscribe(new Action1<String>() {
                       @Override
                       public void call(String s) {
                           Logger.v( "rx_subscribe" , Thread.currentThread().getName()  );
                       }
                   }) ;
    结果
    /rx_call: RxCachedThreadScheduler-1    --io线程
    /rx_map: main                                     --主线程
    /rx_subscribe: main                              --主线程
    
     栗子2
    Observable.create(new Observable.OnSubscribe<String>() {
                       @Override
                       public void call(Subscriber<? super String> subscriber) {
                           Logger.v( "rx_call" , Thread.currentThread().getName()  );
     
                           subscriber.onNext( "dd");
                           subscriber.onCompleted();
                       }
                   })
                   .map(new Func1<String, String >() {
                       @Override
                       public String call(String s) {
                           Logger.v( "rx_map" , Thread.currentThread().getName()  );
                           return s + "88";
                       }
                   })
     
                   .subscribeOn(Schedulers.io())
                   .observeOn(AndroidSchedulers.mainThread())
     
                   .subscribe(new Action1<String>() {
                       @Override
                       public void call(String s) {
                           Logger.v( "rx_subscribe" , Thread.currentThread().getName()  );
                       }
                   }) ; 
    结果
    /rx_call: RxCachedThreadScheduler-1     --io线程
    /rx_map: RxCachedThreadScheduler-1   --io线程
    /rx_subscribe: main                 --主线程
    
    

    分析上面两个栗子,结论

    • create() , just() , from() 等 --- 事件产生
    • map() , flapMap() , scan() , filter() 等 -- 事件加工
    • subscribe() -- 事件消费

    事件产生:默认运行在当前线程,可以由 subscribeOn() 自定义线程
    事件加工:默认跟事件产生的线程保持一致, 可以由 observeOn() 自定义线程
    事件消费:默认运行在当前线程,可以有observeOn() 自定义

    结论

    • subscribeOn 作用于该操作符之前的 Observable 的创建操符作以及 doOnSubscribe 操作符,换句话说就是 doOnSubscribe 以及 Observable 的创建操作符总是被其之后最近的 subscribeOn 控制。多次调用,创建操作符除了最近的外,其余无效。

    • observeOn() 设定事件加工和消费的线程,可以调用多次,每次调用只对后边的操作产生影响。或者这样说,observeOn作用于该操作符之后操作符直到出现新的observeOn操作符。

    subscribeOn

    image.png

    查看log,这里,模块3执行,它后边没有设置线程,是main。
    模块2执行,它后边设置了IO线程,所以领打印“doOnSubscribe1:RxIoScheduler-2”
    模块1执行,也就是创建create方法执行,它后边有两个subscribeOn(),只有离他最近 的才会起作用。

    image.png

    observeOn

    结论:observeOn作用于该操作符之后操作符直到出现新的observeOn操作符

    image.png image.png

    图片解释:
    图中有两个observeOn()方法,竖向的箭头方向可以认为是Rxjava内部变化方向,横向的箭头可以认为是程序执行方向,颜色可以认为是不同的线程。
    由图可以看出,每次竖直方向经过observeOn(),程序的执行颜色都会变化为observeOn()设定的颜色,直到再次执行observeOn()方法。
    也就是 observeOn作用于该操作符之后操作符直到出现新的observeOn操作符

    本文对应源码不做分析,只是讲述结论性的知识。


    欢迎继续收看:RxJava入门与提高-网络Retrofit 篇(5)
    作者:ZhangYushui
    來源:简书
    著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

    相关文章

      网友评论

        本文标题:RxJava入门与提高-线程控制Scheduler篇(4)

        本文链接:https://www.haomeiwen.com/subject/dempmxtx.html