在移动端编程的时候基本上都会需要频繁地切换线程,因为复杂的耗时任务需要放到后台线程里去运行,UI
绘制工作又只能在主线程里执行。Android
里一般用Handler
实现,iOS
里通过系统提供的任务队列实现,但两者均不够优雅,无可避免地需要将代码包裹起来,Android
里需要用Runnable
传递,iOS
里则是闭包。如果遇到多次切换线程的情况,代码的缩进层级就会变深,可读性变差。
还好,ReactiveX
库优雅地解决了这个问题,而且还是跨平台的,RxJava
和 RxSwift
让移动端的线程切换工作变得优雅起来。
这篇文章主要试图说清楚RxJava2
线程切换的实现过程。
操作符
RxJava2
的线程操作符是 subscribeOn
和 observeOn
,我们先按调用顺序一步步看。以下源码均有简化。
subscribeOn
public final Observable<T> subscribeOn(Scheduler scheduler) {
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
subscribeOn
操作符将之前产生的 Observable
和 传入的 Scheduler
封装成 ObservableSubscribeOn
。
observeOn
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
observeOn
操作符将之前产生的 Observable
和 传入的 Scheduler
封装成 ObservableObserveOn
。
subscribe
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe) {
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
return ls;
}
public final void subscribe(Observer<? super T> observer) {
subscribeActual(observer);
}
最后通过`subscribe`操作符设置观察者(`Observer`)来触发整个流程。
流程
subscribe
会调用上一个Observable
(即ObservableObserveOn
)的 subscribeActual()
方法。
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
此处将observeOn
操作符后面的观察者(包括后面所有的操作符和Observer
)和其将要运行的线程相关内容(Worker
)封装成ObserveOnObserver
。此处的source
为observeOn
操作符之前的内容(切换之前产生的Observable
)。
由此可见,每一次observeOn
的使用,都会将后面的观察者(同上)和将要切换的线程内容封装起来。
我们可以看看ObserveOnObserver
的执行过程。
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
if (s instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable<T> qd = (QueueDisposable<T>) s;
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
if (m == QueueDisposable.SYNC) {
sourceMode = m;
queue = qd;
done = true;
actual.onSubscribe(this);
schedule();
return;
}
if (m == QueueDisposable.ASYNC) {
sourceMode = m;
queue = qd;
actual.onSubscribe(this);
return;
}
}
queue = new SpscLinkedArrayQueue<T>(bufferSize);
actual.onSubscribe(this);
}
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
......
该ObserveOnObserver
里的数据处理都将在指定的Scheduler
里执行,所以到这里可以证明,每次observeOn
都会切换后面内容的执行线程。
再继续往上调用,回到前面的内容
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
假设该source
是由subscribeOn
产生的, 此处将会调用ObservableSubscribeOn
里的subscribeActual()
方法。
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
上一个操作产生的source
(即Observable
)的subscribe
操作将在指定的Scheduler
里执行, 每一次subscribeOn
的调用都会在之前的Observable
上封装一层,但数据的发射由最里层的Observable
实现,即在第一个ObservableSubscribeOn
封装里执行。
我们以最简单的 ObservableJust
举例,subscribeActual()
必然在第一个subscribeOn
里的Scheduler
里执行。
public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
private final T value;
public ObservableJust(final T value) {
this.value = value;
}
@Override
protected void subscribeActual(Observer<? super T> s) {
ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
s.onSubscribe(sd);
sd.run();
}
@Override
public T call() {
return value;
}
}
......
public void run() {
if (get() == START && compareAndSet(START, ON_NEXT)) {
observer.onNext(value);
if (get() == ON_NEXT) {
lazySet(ON_COMPLETE);
observer.onComplete();
}
}
}
小结
文章从源码层面解释了RxJava2
线程切换的原理,以及 subscribeOn
和 observeOn
两个操作符生效的场景。
整个流程还是很复杂,我先是读了几遍源码,但是在记忆里找起来总是很混乱,便写了个简单的demo
,用断点调试工具一步步地走,才慢慢豁然开朗。如果看不明白,建议使用断点调试工具。
网友评论