今天记录下在使用RxJava时关于线程调度方面的知识。
以下是今天的学习目录
- 基本使用
- 线程调度的整体流程
- 关键类及方法说明
- 总结
基本使用
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
Log.e(TAG, "被观察者---发送事件'Hello World!'-- Thread:" + Thread.currentThread().getName());
e.onNext("Hello World!");
e.onComplete();
}
}).subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.single())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {}
@Override
public void onNext(String value) {
Log.e(TAG, "观察者---onNext---Value:" + value + "---Thread:" + Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {}
@Override
public void onComplete() {
Log.e(TAG, "观察者---onComplete---Thread:" + Thread.currentThread().getName());
}
});
}
log:
被观察者---发送事件'Hello World!'-- Thread:RxNewThreadScheduler-1
观察者---onNext---Value:Hello World!---Thread:RxSingleScheduler-1
观察者---onComplete---Thread:RxSingleScheduler-1
通过log我们发现:
- 通过“消息发射器-Emitter”发送消息时,当前所处的线程是以“RxNewThreadScheduler”开头的工作线程,而非 UI线程。
- 在“Observer”接受消息时,当前所处的线程是以“RxSingleScheduler”开头的工作线程,也是非 UI线程。
那么问题来了
1:RxJava是如何实现线程调度的?
2:这样做有什么好处?
带着以上问题,我们来分析下“RxJava中线程调度的内部实现”吧。
线程调度的整体流程
创建“Observable”的过程
1:通过Observable.create()创建类型为“ObservableCreate”的“被观察者”并把“ObservableOnSubscribe(事件容器)”保存至变量“source”中。
2:通过Observabl.subscribeOn()创建类型为“ObservableSubscribeOn”的“被观察者”。把“ObservableCreate”保存至变量“source”中。把“Scheduler”保存至“scheduler”变量中。
3:通过Observabl.observeOn()创建类型为“ObservableObserveOn”的“被观察者”。把“ObservableSubscribeOn”保存至变量“source”中。把“Scheduler”保存至“scheduler”变量中。
通过上述3步之后,到此“Observable”的创建过程完毕,此时可能有人会问“Scheduler”是什么?它的作用又是什么呢?先不要着急,接来下我们通过分析“Observable的事件传递过程”来继续分析吧。
“Observable”的事件传递过程
说明:关于Observable的subscribeActual()方法在上一节已经介绍过了。该方法是Obaservable实际订阅动作的发生地,并且此方法为Obaservable的抽象方法每个子类必须实现,再者调用Observable的subscribe()其内部实际就是调用了subscribeActual()去完成“事件”发送之前的种种事情,例如:调用Observer的onSubscribe()询问是否需要通过事件控制开关关闭事件的传递,然后再回调ObservableOnSubscribe的subscribe()回调至事件产生处。所以我们必须要了解该方法究竟做了什么。
通过上面的分析,我们知道最后创建的“Observable”是“ObservableObserveOn”,我们先看看它的“subscribeActual()”方法都做了什么吧。
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
通过查看此方法我们发现,映入眼帘的是一个对“scheduler”的if--else判断。此时我们就有必要分析下:Scheduler是什么?它的作用是什么?
Scheduler:线程调度者
image.png以上是关于Scheduler的部分类结构图。结合“第一节-基本使用”我们了解到,要是使用Scheduler基本都是通过“Schedulers”来完成。
Schedulers:创建线程调度者实例的静态工厂
从“Schedulers”的类说明“Static factory methods for returning standard Scheduler instances.”以及其中相应方法的实现,可以知道,“Schedulers是专门生产Scheduler实例的”。例如:通过 io()可以返回IoScheduler实例,通过newThread()可以获得NewThreadScheduler实例等等。
接下来我们就debug下代码来看看,Scheduler到底是什么?它的运行机制又是怎样的吧。
1:进入ObservableSubscribeOn.subscribeActual()查看运行时信息
image.png
此时,会调用Scheduler的抽象方法createWorker()返回一个Worker,因为我们已经通过observeOn()指定的Scheduler是“SingleScheduler”,接下来看看“SingleScheduler”的createWorker()的实现吧。
2:SingleScheduler.createWorker()
image.png
image.png
通过createWorker()返回了SingleScheduler中实现了Scheduler.Worker该抽象类的ScheduledWorker的实例。
3:创建完Worker,执行“source.subscribe()”
此时会执行“ObservableSubscribeOn.subscribe()”(线程调度的整体流程 介绍了为什么会执行ObservableSubscribeOn.subscribe()),并把“ObserveOnObserver”该observer传递至ObservableSubscribeOn中。
4:执行ObservableSubscribeOn.subscribeActual()
image.png
4.1:首先,回调第3步中创建的ObserveOnObserver.onSubscribe(),在该方法中会回调外部创建的Observer(实际接受事件的观察者)的onSubscribe()
4.2:再次,继续执行ObservableSubscribeOn.subscribeActual()剩下的代码,此时就真正开始 在为被观察者设置的线程中(此处是在通过NewThreadWorker创建相应线程执行)执行其相应的方法了
5:执行Scheduler.scheduleDirect()
image.png
image.png
5.1:首先,执行Scheduler的抽象方法createWorker()
因为我们为该Scheduler设置的是NewThreadScheduler,我们先去看看NewThreadScheduler的createWorker()做了什么。
image.png
从图中我们可以知道,NewThreadScheduler.createWorker()返回了一个NewThreadWorker实例,并且在执行该类实例的时候,初始化了一个相应的 线程池(该线程池是为了后续要在其中执行异步任务准备的)。
5.2:其次,在上一步创建的Worker中,执行给定的任务(也就是在上一步创建的Worker中的线程池中执行相应的任务)
image.png
image.png
5.3:再次,回到ObservableSubscribeOn.subscribeActual() 5.2执行的Runnable(任务)就是 调用source的subscribe()。该source是谁?它就是我们在 创建“Observable”的过程 中通过调用Observable.create()创建的 ObservableCreate实例。
5.4:最后,调用ObservableOnSubscribe.subscribe()回调至 消息准备处。 此时我们发现 当前线程是以“RxNewThreadScheduler”开头的工作线程,这也就解释了,在 基本使用中我们发现为什么通过 subscribeOn(Schedulers.newThread())设置后,执行的事件发送所在的线程不是UI线程而是以“RxNewThreadScheduler”开头的工作线程了。
image.png
6:发送事件,此时会通过CreateEmitter(消息发射器)的OnNext(),onComplete(),onError()来向“观察者”发送事件
7:调用ObservableObserveOn中ObserveOnObserver的onNext(),onComplete(),onError()
image.png
image.png
8:通过阅读源码发现,ObserveOnObserver实现了Runnable接口,所以再执行Scheduler的schedule()方法时执行的是该类型的run()方法。
image.png image.png image.png
当执行drainNormal(),先通过checkTerminated()判断线程是否为终止了,如果没终止,则调用 Observer.onNext()接受事件。此时我们发现当前线程是通过observeOn(Schedulers.single())指定的线程名为“RxSingleScheduler”的线程。
到此我们分析了通过Observable.subscribeOn()给 被观察者调用方法设置所在的线程 以及通过Observable.observeOn()给 观察者调用方法设置所在的线程 内部实现。
网友评论