美文网首页
写给自己的RxJava — 线程切换

写给自己的RxJava — 线程切换

作者: 董成鹏 | 来源:发表于2017-10-31 13:01 被阅读0次

线程切换

只要使用RxJava肯定对下面的代码特别熟悉

Observable.from(list)
  .subcribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(new Action1(){
      public void call(Object obj){
          
      }
  });

其中subscribeOn()可以把事件发生的线程切换到io线程,observeOn()可以把处理事件的线程切换到Android应用程序主线程。

那他是怎么做到这么简洁的切换呢?

Rxjava的所有变换都基于一个lift模型,我们接下来介绍一下这个模型。

首先回顾一下Observable通知Subscriber的原理。

在生成Observable的时候我们会传入一个OnSubscribe的实例,在发生订阅关系subscribe方法中,OnSubscribe实例的call(Subscriber)方法中就会调用传入的Subscriber的的相关方法,从而实现通知,消息发送。

流程大概如下

Observable—(实例化时)—>OnSubscribe—(订阅关系发生subscribe时) —>Subscriber

那首先考虑一下我们怎么实现在Observable中发送的是 圆形 事件,但是在Subscriber中接收到 方形事件并处理呢。

如果Observable和Subscriber的事件都不一样,这都不能发生订阅关系的,因为在编译检查的时候就无法通过。

RxJava采用的方法是提供一个变换方法lift(Operater),该方法返回一个Observable对象。该Observable对象会发送方形 事件,这样就可以用这个新的Observable对象来订阅原始的Subscriber了。

在新生成的Obaservable对象的时候,我们也会生成该Observable对应的OnSubscribe对象,并实现新的OnSubscribe对象的call(Subscriber)方法。由于我们用新生成的Observable对象去订阅原始的Subscriber,所以新生成的OnSubscribe的call方法中的参数就是原始的Subscriber了。

接下来就是lift(Operator)中Operator接口发挥作用的时候了,这个Operator接口的call方法实现把一个Subscriber变换成另一个Subscrber的功能。

在这里就是把原始的响应方形事件的Subscriber转换成响应圆形事件的Subscriber,这样就可以调用原始的OnSubscribe的call方法,把圆形事件变成发送给这个新的响应圆形事件的Subscriber.

这样新的Observable中就同时包含了:

  • 原始的OnSubscribe(它能发送圆形事件)
  • 新的Subscriber(它能接受圆形事件)
  • 原始的Subscriber(它能接受 方形事件)

这样,在发生订阅关系时,原始的Observable(原始的OnSubscribe)会发送圆形事件给新的Subscriber,新的Subscriber在处理的时候,就把这个事件转换一下传递给原始的Subscriber。

基本流程就是这样,Observable.Operator就是实现新老Subscriber关联的纽带。

借用扔物线的snippet

//这个lift的作用就是把只发送圆形事件的Observable转换成发送方形事件的Observable
public <Rectangle> Observable<Rectangle> lift(Operator<? extends Rectangle, ? super Round> operator) {
    return Observable.create(new OnSubscribe<Rectangle>() {
        @Override
       //这个参数subscriber是原始的subscriber,只接受方形事件
        public void call(Subscriber subscriber) {
          
          //这个newSubscriber是operator生成的新的Subscriber,它可以接收圆形事件
          //新的subscriber会调用原始的sunscriber的相关方法
            Subscriber newSubscriber = operator.call(subscriber);
            newSubscriber.onStart();
          
          //这个onSubscribe是原始的OnSubscribe,它发送圆形事件
          //所以可以用新生成的newSubscriber来接收
            onSubscribe.call(newSubscriber);
        }
    });
}

记不住以上内容也没有关系,只要记住在变换中,会成成一个新的Observable和Subscriber就可以了,我们在变换完之后所进行的操作都是针对新生成的Observable和新Subscriber。

经过变换之后,我们拥有两个Observable(OnSubscribe),也拥有两个Subscriber.

那我们想要切换事件处理的线程怎么办呢?我们可以在Operator中生成新的Subscriber的时候进行处理,在newSubscriber和原始subscriber进行映射的时候进行切换,所以可以知道observeOn()切换的是所有它下游的线程。

所以如果我们想要切换事件发生的线程,会怎么办呢?根据上面的代码,可以知道,只需要让最后的onSubscribe.call(newSubscriber)运行在新线程就可以了。从这句代码也可以看出,subscribeOn()方法切换的是它的上游线程,这种线程切换一直会影响到最原始的observable。

但是如果在一条链式调用中出现了多个subscribeOn()方法,由于链式调用最上游的第一个subscribeOn方法会直接影响到最原始的observable,而在接下来的的链式调用中消息的发送是有newSubscriber来控制的,所以第二个subscribeOn方法不会影响线程的切换。

多说无益,直接上代码

        Observable.just("hello","world","rxjava","rxandroid")
                .map(new Func1<String, String>() {
                    @Override
                    public String call(String s) {
                      //此处由于受到下面第一个subscribeOn的影响,输出RxComputationScheduler-1
                        Log.v("chicodong","thread 1 is: "+Thread.currentThread().getName());
                        return s.toUpperCase();
                    }
                })
                .subscribeOn(Schedulers.computation())
                .map(new Func1<String, Integer>() {
                    @Override
                    public Integer call(String s) {
                      //此处由于上一个subscribeOn切换过线程,新生成的subscriber是在RxComputationScheduler-1发送事件的,所以仍然输出RxComputationScheduler-1
                        Log.v("chicodong","thread 2 is: "+Thread.currentThread().getName());
                        return s.length();
                    }
                })
                .map(new Func1<Integer, Integer>() {
                    @Override
                    public Integer call(Integer integer) {
                      //此处由于上一个subscribeOn切换过线程,新生成的subscriber是在RxComputationScheduler-1发送事件的,所以仍然输出RxComputationScheduler-1
                        Log.v("chicodong","thread 3 is: "+Thread.currentThread().getName());
                        return integer+100;
                    }
                })
                //此处的第二个subscribeOn相当于没有起作用
                .subscribeOn(Schedulers.io())
                .map(new Func1<Integer, String>() {

                    @Override
                    public String call(Integer integer) {
                      //此处由于上一个subscribeOn切换过线程,新生成的subscriber是在RxComputationScheduler-1发送事件的,所以仍然输出RxComputationScheduler-1
                        Log.v("chicodong","thread 4 is: "+Thread.currentThread().getName());
                        return integer.toString();
                    }
                })
                .observeOn(Schedulers.newThread())
                .map(new Func1<String, String>() {
                    @Override
                    public String call(String s) {
                      //此处受到obServeOn的影响,输出RxNewThreadScheduler-1
                        Log.v("chicodong","thread 5 is: "+Thread.currentThread().getName());
                        return s+" lalala";
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .map(new Func1<String, String>() {
                    @Override
                    public String call(String s) {
                      //此处受到第二个observeOn的影响,输出main
                        Log.v("chicodong","thread 6 is: "+Thread.currentThread().getName());
                        return s.toUpperCase();
                    }
                })
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                      //此处受到第二个observeOn的影响,输出main
                        Log.v("chicodong","final result is "+s+" thread is "+Thread.currentThread().getName());
                    }
                });

虽然多余一个subscribeOn对于线程切换没有影响,但是它可以在事件还没有发生时起作用,最常见的就是doOnSubscribe()方法了

Observable.from(list)
  .subscribeOn(Schedulers.io())
  .doOnSubscribe(new Action0(){
      public void call(){
          
      }
  })
  .subscribeOn(AndroidSchedulers.mainThread())
  .observeOn(AndroidScheduler.mainThread())
  .subscribe(new Action1(){
      public void call(Object obj){
          
      }
  });

上面的代码中第二个subscribeOn对于线程切换没有影响,但是却可以使doOnSubscribe()运行在主线程中。

相关文章

  • 写给自己的RxJava — 线程切换

    线程切换 只要使用RxJava肯定对下面的代码特别熟悉 其中subscribeOn()可以把事件发生的线程切换到i...

  • RxJava源码分析-线程切换

    RxJava源码分析-线程切换 接着上篇分析,本篇我们来揭开RxJava线程切换的神秘面试,先上一段代码 这段代码...

  • RxJava的线程切换

    RxJava 线程切换 前言 在上篇文章对RxJava 的工作流程进行的简单的分析,今天来分享一下线程切换的流程。...

  • Rxjava2 操作符原理(2)

    Rxjava2 基本用法(1) Rxjava2 操作符原理(2) Rxjava2 线程切换(3) Rxjava2 ...

  • Rxjava2 线程切换(3)

    Rxjava2 基本用法(1) Rxjava2 操作符原理(2) Rxjava2 线程切换(3) Rxjava2 ...

  • Rxjava2 基本用法(1)

    Rxjava2 基本用法(1) Rxjava2 操作符原理(2) Rxjava2 线程切换(3) Rxjava2 ...

  • Rxjava2 简析Flowable背压(4)

    Rxjava2 基本用法(1) Rxjava2 操作符原理(2) Rxjava2 线程切换(3) Rxjava2 ...

  • RxJava源码分析之线程调度(一)

    RxJava强大的地方之一是他的链式调用,轻松地在线程之间进行切换。这几天也大概分析了一下RxJava的线程切换的...

  • 安卓第三方组件收集

    要点:如果不是必须, 用系统控件 RxJava 线程切换需要注意的地方 RxJava 内置的线程调度器的确可以让我...

  • RxJava:线程切换

    上一篇:RxJava:基本订阅流程 我们在Rxjava中最常用的两个方法: subscribeOn(Schedul...

网友评论

      本文标题:写给自己的RxJava — 线程切换

      本文链接:https://www.haomeiwen.com/subject/erhbpxtx.html