美文网首页
RxJava2--多线程调度Scheduler

RxJava2--多线程调度Scheduler

作者: None_Ling | 来源:发表于2018-12-25 17:47 被阅读23次

    Scheduler背景

    前面介绍过RxJava的基本概念与使用,可以通过RxJava发射事件,而通过Observer来接收事件。

    然而我们大多数时候,会有耗时的操作,比如在子线程完成复杂的矩阵运算,文件的IO操作,网络请求,数据库读写等等,我们希望可以在子线程完成这些事情,而在主线程接收回调事件。

    这种情况,我们就需要用到Scheduler对象了。

    Scheduler家族

    所使用的Scheduler主要在Schedulers这个类中,RxJava仅仅提供了以下这些调度器:

    Schedulers.SINGLE
    Schedulers.COMPUTATION
    Schedulers.IO
    Schedulers.TRAMPOLINE
    Schedulers.NEW_THREAD
    AndroidSchedulers.MAIN_THREAD
    
    • Schedulers.io( ):
      用于IO密集型的操作,例如读写SD卡文件,查询数据库,访问网络等,具有线程缓存机制,CoreSize为1,在此调度器接收到任务后,先检查线程缓存池中,是否有空闲的线程,如果有,则复用,如果没有则创建新的线程,并加入到线程池中,如果每次都没有空闲线程使用,可以无上限的创建新线程。
    • Schedulers.newThread( ):
      在每执行一个任务时创建一个新的线程,不具有线程缓存机制,因为创建一个新的线程比复用一个线程更耗时耗力,虽然使用Schedulers.io( )的地方,都可以使用Schedulers.newThread( ),但是,Schedulers.newThread( )的效率没有Schedulers.io( )高。
    • Schedulers.computation():
      用于CPU 密集型计算任务,即不会被 I/O 等操作限制性能的耗时操作,例如xml,json文件的解析,Bitmap图片的压缩取样等,具有固定的线程池,大小为CPU的核数。不可以用于I/O操作,因为I/O操作的等待时间会浪费CPU。
    • Schedulers.trampoline():
      在当前线程立即执行任务,如果当前线程有任务在执行,则会将其暂停,等插入进来的任务执行完之后,再将未完成的任务接着执行。
    • Schedulers.single():
      拥有一个线程单例,所有的任务都在这一个线程中执行,当此线程中有任务执行时,其他任务将会按照先进先出的顺序依次执行。
    • Scheduler.from(@NonNull Executor executor):
      指定一个线程调度器,由此调度器来控制任务的执行策略。
    • AndroidSchedulers.mainThread():
      在Android UI线程中执行任务,为Android开发定制。使用MainLooper实现

    Scheduler使用

    通过subscribeOn以及observerOn分别指定任务事件与监听事件所在的线程。

    还是之前的例子,只是增加了subscribeOn(Schedulers.io())以及observeOn(AndroidSchedulers.mainThread()),让事件在IO线程中发射,而在Android主线程接收。

    Observable.create<Int> { emitter ->
                Log.e(TAG, "Emitter onNext1...${Thread.currentThread().name}")
                emitter.onNext(1)
                Log.e(TAG, "Emitter onNext2...${Thread.currentThread().name}")
                emitter.onNext(2)
                Log.e(TAG, "Emitter onComplete...${Thread.currentThread().name}")
                emitter.onComplete()
            }.subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe { data ->
                        Log.e(TAG, "onNext...$data...${Thread.currentThread().name}")
                    }
    

    看输出的日志:会发现有些不一样

    1. onNext发送的线程在子线程RxCachedThreadScheduler-1
    2. onNext事件在子线程发送后,并不会在主线程立即响应,而是会积攒后,等事件都发送完毕后,统一按顺序回调到主线程
     E/SelectImageActivity: Emitter onNext1...RxCachedThreadScheduler-1
     E/SelectImageActivity: Emitter onNext2...RxCachedThreadScheduler-1
     E/SelectImageActivity: Emitter onComplete...RxCachedThreadScheduler-1
     E/SelectImageActivity: onNext...1...main
     E/SelectImageActivity: onNext...2...main
    

    而如果将subscribeOnObserverOn都指定成同一个Scheduler都话,如Schedulers.computation(),则需要看这个Scheduler的调度策略了。

    • 如果没有指定Schedulers的话,则会发送一个事件,就接收一个事件
    • 如果指定了Schedulers的话,就会遵循线程调度了,如果没有阻塞的话,就会顺序调用,并且将事件传递到子线程接收

    测试结果

    1. 都使用Schedulers.single()
     E/SelectImageActivity: Emitter onNext1...RxSingleScheduler-1
     E/SelectImageActivity: Emitter onNext2...RxSingleScheduler-1
     E/SelectImageActivity: Emitter onComplete...RxSingleScheduler-1
     E/SelectImageActivity: onNext...1...RxSingleScheduler-1
     E/SelectImageActivity: onNext...2...RxSingleScheduler-1
    
    1. 都使用AndroidSchedulers.mainThread()
     E/SelectImageActivity: Emitter onNext1...main
     E/SelectImageActivity: Emitter onNext2...main
     E/SelectImageActivity: Emitter onComplete...main
     E/SelectImageActivity: onNext...1...main
     E/SelectImageActivity: onNext...2...main
    
    1. 都使用Schedulers.computation()
     E/SelectImageActivity: Emitter onNext1...RxComputationThreadPool-1
     E/SelectImageActivity: Emitter onNext2...RxComputationThreadPool-1
     E/SelectImageActivity: Emitter onComplete...RxComputationThreadPool-1
     E/SelectImageActivity: onNext...1...RxComputationThreadPool-2
     E/SelectImageActivity: onNext...2...RxComputationThreadPool-2
    
    1. 在发射事件后调用sleep模拟线程阻塞的操作,代码如下:
    Observable.create<Int> { emitter ->
                Log.e(TAG, "Emitter onNext1...${Thread.currentThread().name}")
                emitter.onNext(1)
                sleep()
                Log.e(TAG, "Emitter onNext2...${Thread.currentThread().name}")
                emitter.onNext(2)
                sleep()
                Log.e(TAG, "Emitter onComplete...${Thread.currentThread().name}")
                emitter.onComplete()
            }.subscribeOn(Schedulers.computation())
                    .observeOn(Schedulers.computation())
                    .subscribe { data ->
                        Log.e(TAG, "onNext...$data...${Thread.currentThread().name}")
                    }
    

    而打印结果如下:

     E/SelectImageActivity: Emitter onNext1...RxComputationThreadPool-1
     E/SelectImageActivity: onNext...1...RxComputationThreadPool-2
     E/SelectImageActivity: Emitter onNext2...RxComputationThreadPool-1
     E/SelectImageActivity: onNext...2...RxComputationThreadPool-2
     E/SelectImageActivity: Emitter onComplete...RxComputationThreadPool-1
    

    可以看到,只要使用了Scheduler后,在加入sleep的阻塞操作后,执行了线程的调度,就会打印出来事件的发射与接收的顺序。

    相关文章

      网友评论

          本文标题:RxJava2--多线程调度Scheduler

          本文链接:https://www.haomeiwen.com/subject/abiflqtx.html