美文网首页我爱编程
RxJava学习笔记(二)线程调度介绍和源码分析

RxJava学习笔记(二)线程调度介绍和源码分析

作者: sjandroid | 来源:发表于2018-05-28 15:40 被阅读0次

    今天记录下在使用RxJava时关于线程调度方面的知识。

    以下是今天的学习目录

    • 基本使用
    • 线程调度的整体流程
    • 关键类及方法说明
    • 总结

    基本使用
    Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            Log.e(TAG, "被观察者---发送事件'Hello World!'-- Thread:" + Thread.currentThread().getName());
    
            e.onNext("Hello World!");
            e.onComplete();
        }
    }).subscribeOn(Schedulers.newThread())
      .observeOn(Schedulers.single())
      .subscribe(new Observer<String>() {
          @Override
          public void onSubscribe(Disposable d) {}
    
          @Override
          public void onNext(String value) {
              Log.e(TAG, "观察者---onNext---Value:" + value + "---Thread:" + Thread.currentThread().getName());
          }
    
          @Override
          public void onError(Throwable e) {}
    
          @Override
          public void onComplete() {
              Log.e(TAG, "观察者---onComplete---Thread:" + Thread.currentThread().getName());
          }
      });
    }
    

    log:

    被观察者---发送事件'Hello World!'-- Thread:RxNewThreadScheduler-1
    观察者---onNext---Value:Hello World!---Thread:RxSingleScheduler-1
    观察者---onComplete---Thread:RxSingleScheduler-1
    

    通过log我们发现:

    • 通过“消息发射器-Emitter”发送消息时,当前所处的线程是以“RxNewThreadScheduler”开头的工作线程,而非 UI线程。
    • 在“Observer”接受消息时,当前所处的线程是以“RxSingleScheduler”开头的工作线程,也是非 UI线程。

    那么问题来了
    1:RxJava是如何实现线程调度的?
    2:这样做有什么好处?

    带着以上问题,我们来分析下“RxJava中线程调度的内部实现”吧。


    线程调度的整体流程
    创建“Observable”的过程

    1:通过Observable.create()创建类型为“ObservableCreate”“被观察者”并把“ObservableOnSubscribe(事件容器)”保存至变量“source”中
    2:通过Observabl.subscribeOn()创建类型为“ObservableSubscribeOn”“被观察者”。把“ObservableCreate”保存至变量“source”中。把“Scheduler”保存至“scheduler”变量中。
    3:通过Observabl.observeOn()创建类型为“ObservableObserveOn”“被观察者”。把“ObservableSubscribeOn”保存至变量“source”中。把“Scheduler”保存至“scheduler”变量中。
    通过上述3步之后,到此“Observable”的创建过程完毕,此时可能有人会问“Scheduler”是什么?它的作用又是什么呢?先不要着急,接来下我们通过分析“Observable的事件传递过程”来继续分析吧。

    “Observable”的事件传递过程

    说明:关于Observable的subscribeActual()方法在上一节已经介绍过了。该方法是Obaservable实际订阅动作的发生地,并且此方法为Obaservable的抽象方法每个子类必须实现,再者调用Observable的subscribe()其内部实际就是调用了subscribeActual()去完成“事件”发送之前的种种事情,例如:调用Observer的onSubscribe()询问是否需要通过事件控制开关关闭事件的传递,然后再回调ObservableOnSubscribe的subscribe()回调至事件产生处。所以我们必须要了解该方法究竟做了什么。

    通过上面的分析,我们知道最后创建的“Observable”是“ObservableObserveOn”,我们先看看它的“subscribeActual()”方法都做了什么吧。

        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            if (scheduler instanceof TrampolineScheduler) {
                source.subscribe(observer);
            } else {
                Scheduler.Worker w = scheduler.createWorker();
    
                source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
            }
        }
    

    通过查看此方法我们发现,映入眼帘的是一个对“scheduler”的if--else判断。此时我们就有必要分析下:Scheduler是什么?它的作用是什么?

    Scheduler:线程调度者
    image.png

    以上是关于Scheduler的部分类结构图。结合“第一节-基本使用”我们了解到,要是使用Scheduler基本都是通过“Schedulers”来完成。

    Schedulers:创建线程调度者实例的静态工厂

    从“Schedulers”的类说明“Static factory methods for returning standard Scheduler instances.”以及其中相应方法的实现,可以知道,“Schedulers是专门生产Scheduler实例的”。例如:通过 io()可以返回IoScheduler实例,通过newThread()可以获得NewThreadScheduler实例等等。
    接下来我们就debug下代码来看看,Scheduler到底是什么?它的运行机制又是怎样的吧。

    1:进入ObservableSubscribeOn.subscribeActual()查看运行时信息


    image.png

    此时,会调用Scheduler的抽象方法createWorker()返回一个Worker,因为我们已经通过observeOn()指定的Scheduler是“SingleScheduler”,接下来看看“SingleScheduler”的createWorker()的实现吧。

    2:SingleScheduler.createWorker()


    image.png
    image.png

    通过createWorker()返回了SingleScheduler中实现了Scheduler.Worker该抽象类的ScheduledWorker的实例。

    3:创建完Worker,执行“source.subscribe()”

    image.png
    此时会执行“ObservableSubscribeOn.subscribe()”(线程调度的整体流程 介绍了为什么会执行ObservableSubscribeOn.subscribe()),并把“ObserveOnObserver”该observer传递至ObservableSubscribeOn中。

    4:执行ObservableSubscribeOn.subscribeActual()


    image.png

    4.1:首先,回调第3步中创建的ObserveOnObserver.onSubscribe(),在该方法中会回调外部创建的Observer(实际接受事件的观察者)的onSubscribe()

    image.png

    4.2:再次,继续执行ObservableSubscribeOn.subscribeActual()剩下的代码,此时就真正开始 在为被观察者设置的线程中(此处是在通过NewThreadWorker创建相应线程执行)执行其相应的方法了

    image.png

    5:执行Scheduler.scheduleDirect()


    image.png
    image.png

    5.1:首先,执行Scheduler的抽象方法createWorker()
    因为我们为该Scheduler设置的是NewThreadScheduler,我们先去看看NewThreadScheduler的createWorker()做了什么。

    image.png
    image.png
    从图中我们可以知道,NewThreadScheduler.createWorker()返回了一个NewThreadWorker实例,并且在执行该类实例的时候,初始化了一个相应的 线程池(该线程池是为了后续要在其中执行异步任务准备的)

    5.2:其次,在上一步创建的Worker中,执行给定的任务(也就是在上一步创建的Worker中的线程池中执行相应的任务)


    image.png
    image.png

    5.3:再次,回到ObservableSubscribeOn.subscribeActual() 5.2执行的Runnable(任务)就是 调用source的subscribe()。该source是谁?它就是我们在 创建“Observable”的过程 中通过调用Observable.create()创建的 ObservableCreate实例。

    image.png

    5.4:最后,调用ObservableOnSubscribe.subscribe()回调至 消息准备处。 此时我们发现 当前线程是以“RxNewThreadScheduler”开头的工作线程,这也就解释了,在 基本使用中我们发现为什么通过 subscribeOn(Schedulers.newThread())设置后,执行的事件发送所在的线程不是UI线程而是以“RxNewThreadScheduler”开头的工作线程了。

    image.png
    image.png

    6:发送事件,此时会通过CreateEmitter(消息发射器)的OnNext(),onComplete(),onError()来向“观察者”发送事件

    image.png image.png

    7:调用ObservableObserveOn中ObserveOnObserver的onNext(),onComplete(),onError()


    image.png
    image.png

    8:通过阅读源码发现,ObserveOnObserver实现了Runnable接口,所以再执行Scheduler的schedule()方法时执行的是该类型的run()方法。


    image.png image.png image.png

    当执行drainNormal(),先通过checkTerminated()判断线程是否为终止了,如果没终止,则调用 Observer.onNext()接受事件。此时我们发现当前线程是通过observeOn(Schedulers.single())指定的线程名为“RxSingleScheduler”的线程。

    到此我们分析了通过Observable.subscribeOn()给 被观察者调用方法设置所在的线程 以及通过Observable.observeOn()给 观察者调用方法设置所在的线程 内部实现。


    关键类及方法说明

    相关文章

      网友评论

        本文标题:RxJava学习笔记(二)线程调度介绍和源码分析

        本文链接:https://www.haomeiwen.com/subject/htzljftx.html