美文网首页
【Android】RxJava的使用(二)线程切换

【Android】RxJava的使用(二)线程切换

作者: 买辣条也想用券 | 来源:发表于2020-08-29 17:28 被阅读0次

    理论
    总所周知 RxJava 在切换线程时用到了两个方法 subscribeOn() 和 observeOn() 下面来分别解释一下这两个方法

    • subscribeOn() : 影响的是最开始的被观察者所在的线程。当使用多个 subscribeOn() 的时候,只有第一个 subscribeOn() 起作用;
    • observeOn() : 影响的是跟在后面的操作(指定观察者运行的线程)。所以如果想要多次改变线程,可以多次使用 observeOn;

    为什么 subscribeOn() 只有第一次切换有效
    因为 RxJava 最终能影响 ObservableOnSubscribe 这个匿名实现接口的运行环境的只能是最后一次运行的 subscribeOn() ,又因为 RxJava 订阅的时候是从下往上订阅,所以从上往下第一个 subscribeOn() 就是最后运行的,这就造成了写多个 subscribeOn() 并没有什么乱用的现象。

    线程控制 —— Scheduler (一)
    在不指定线程的情况下, RxJava 遵循的是线程不变的原则,即:在哪个线程调用 subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到 Scheduler (调度器)。

    Scheduler 的 API (一)
    正常情况下, 上游和下游是工作在同一个线程中的, 也就是说上游在哪个线程发事件, 下游就在哪个线程接收事件.
    在RxJava 中,Scheduler ——调度器,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程。RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景:

    在RxJava中, 已经内置了很多线程选项供我们选择, 例如有

    • Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。

    • Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。

    • Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和
      newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。

    • Schedulers.computation(): 代表CPU计算密集型的操作, 例如需要大量计算的操作 ,计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。

    • 另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。

    这些内置的Scheduler已经足够满足我们开发的需求, 因此我们应该使用内置的这些选项, 在RxJava内部使用的是线程池来维护这些线程, 所有效率也比较高.

    有了这几个 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制了。 * subscribeOn(): 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。 * observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。

    回到RxJava中, 当我们在主线程中去创建一个上游Observable来发送事件, 则这个上游默认就在主线程发送事件.
    当我们在主线程去创建一个下游Observer来接收事件, 则这个下游默认就在主线程中接收事件。
    但是实际情况是 ,我们更多想要的是这么一种情况, 在子线程中做耗时的操作, 然后回到主线程中来操作UI,

    要达到这个目的, 我们需要先改变上游发送事件的线程, 让它去子线程中发送事件, 然后再改变下游的线程, 让它去主线程接收事件. 通过RxJava内置的线程调度器可以很轻松的做到这一点. 接下来看一段代码

    //创建一个下游 观察者Observer

    Observer<Integer> observer = new Observer<Integer>() {
           @Override
           public void onSubscribe(Disposable d) {
    
           }
    
           @Override
           public void onNext(Integer value) {
               Log.w(TAG, "" + value);
               Log.d(TAG, "observer thread is : " + Thread.currentThread().getName());
    
           }
    
           @Override
           public void onError(Throwable e) {
           }
    
           @Override
           public void onComplete() {
           }
       };
    
    
       //创建一个上游  被观察者 Observable:
       Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
           @Override
           public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
               Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());
               emitter.onNext(1);
           }
       });
    
    //建立连接
    observable.subscribeOn(Schedulers.newThread())
                      .observeOn(AndroidSchedulers.mainThread())
                      .subscribe(observer);
    
    打印结果
    08-29 18:53:38.313 5128-5155/tongxunlu.com.myapplication D/MainActivity: Observable thread is : RxNewThreadScheduler-1
    08-29 18:53:38.314 5128-5128/tongxunlu.com.myapplication W/MainActivity: 1
    08-29 18:53:38.314 5128-5128/tongxunlu.com.myapplication D/MainActivity: observer thread is : main
    

    可以看到, 上游发送事件的线程的确改变了, 是在一个叫 RxNewThreadScheduler-2的线程中发送的事件, 而下游仍然在主线程中接收事件, 这说明我们的目的达成了, 接下来看看是如何做到的.

    和上一段代码相比,这段代码只不过是增加了两行代码:

    .subscribeOn(Schedulers.newThread())                                              
    .observeOn(AndroidSchedulers.mainThread())
    

    简单的来说, subscribeOn() 指定的是上游发送事件的线程, observeOn() 指定的是下游接收事件的线程.

    需要注意的是

    • 多次指定上游(Observable)的线程只有第一次指定的有效, 也就是说多次调用subscribeOn() 只有第一次的有效, 其余的会被忽略.
    • 多次指定下游(Observer)的线程是可以的, 也就是说每调用一次observeOn() , 下游的线程就会切换一次.
      举个栗子:
    //建立连接
                   observable.subscribeOn(Schedulers.newThread())
                           .subscribeOn(Schedulers.io())
                           .observeOn(AndroidSchedulers.mainThread())
                           .observeOn(Schedulers.io())
                           .subscribe(observer);
    

    ** doOnNext官方介绍:**
    The doOnNext operator is much like doOnEach(Action1) except that the Action that you pass it as a parameter does not accept a Notification but instead simply accepts the emitted item.

    可以这么理解:

    • do系列的作用是side effect,当onNext发生时,它被调用,不改变数据流。
    • doOnNext()允许我们在每次输出一个元素之前做一些额外的事情。

    实践
    对于我们Android开发人员来说, 经常会将一些耗时的操作放在后台, 比如网络请求或者读写文件,操作数据库等等,等到操作完成之后回到主线程去更新UI, 有了上面的这些基础, 那么现在我们就可以轻松的去做到这样一些操作.

    读写数据库

    上面说了网络请求的例子, 接下来再看看读写数据库, 读写数据库也算一个耗时的操作, 因此我们也最好放在IO线程里去进行, 这个例子就比较简单, 直接上代码:

    public Observable<List<Record>> readAllRecords() {
          return Observable.create(new ObservableOnSubscribe<List<Record>>() {
              @Override
              public void subscribe(ObservableEmitter<List<Record>> emitter) throws Exception {
                  Cursor cursor = null;
                  try {
                      cursor = getReadableDatabase().rawQuery("select * from " + TABLE_NAME, new String[]{});
                      List<Record> result = new ArrayList<>();
                      while (cursor.moveToNext()) {
                          result.add(Db.Record.read(cursor));
                      }
                      emitter.onNext(result);
                      emitter.onComplete();
                  } finally {
                      if (cursor != null) {
                          cursor.close();
                      }
                  }
              }
          }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
      }
    

    相关文章

      网友评论

          本文标题:【Android】RxJava的使用(二)线程切换

          本文链接:https://www.haomeiwen.com/subject/nlkjsktx.html