刚参加工作那会,Rxjava很火,所谓的链式编程,很多人都说链式的风格看起来代码逻辑清晰,精简,容易看懂。但我好像不太以为然,毕竟,各种各样的操作符变换,什么map,flatMap,observeOn,subscribeOn分分钟把你绕晕。当然,如果从业务代码的角度,通过操作符将它们链式组装起来,确实会清晰很多。但当时我更关注的是其背后的东西,即,Rxjava是怎么组装我们的业务代码成为链式的,调用链是如何形成的,调用链的代码又是如何执行的,它的变换原理究竟是怎样的。后来,秉着知其然,也要知其所以然的原则就去研究了它的源码,记得当时,看flatMap的时候,总没法不看代码就能在大脑里想出其调用链的关系,总感觉饶不过弯,不看,就没法想明白,后来想啊想,终于在饭堂排队吃饭的时候突然给想懂了。记得当时得意又开心~
转眼间,工作已3年多。Rxjava出了2.0,近期好像在准备出3.0。2.0的代码看了一些,原理没有变,多了一些api,诸如disposable等等。
在近期的电话面试里,也经常比较喜欢问到:Rxjava是如何做到线程变换或者Observable变换的,它的变换原理是怎样的?但结果总是差强人意。。懂原理的人真的很少。
...
上篇文章讲了Rxjava的Observable变换原理,这次讲其线程变换原理。其实,线程变换也是基于Observable的变换。
我们知道,Android中我们一般是通过Handler机制去实现线程的变换的。无论是变换到主线程,或者是变换到拥有Looper的子线程,Handler都可以搞定。那么Rxjava中的线程变换,也是使用到了Handler吗?其实,除了使用Handler之外,其也用到了Executor,线程池。但是,对外暴露的不是Handler,也不是Executor,而是一个叫Scheduler的抽象类,还有它内部的一个叫Worker的抽象类。由这两个类,再最后delegate到Executor或者Handler来实现变换。Scheduler叫调度器,一般Scheduler的执行会delegate到Worker来执行。我们在创建Observable的时候,就会通过操作符创建Observable所对应的Scheduler,当调用链执行到对应的Observable节点的时候,就会将后续任务或代码交给其对应的Scheduler来执行,从而达到切换线程的目的。
这里,调用链代码的执行可区分为两种,一种是向上产生订阅,即调用链向上游传递订阅任务;一种是订阅完成后,在上游执行完成任务之后向下游发射执行结果。
第一种,一般是将执行代码内置于Runnable中,再将Runnable丢给Scheduler来执行,从而达到切换线程的目的。
第二种,当然了,也是需要有一个Runnable来给Scheduler或者Worker执行,区别在于它会有一个队列来存放执行结果,然后在切换线程之后再从队列里获取。
举个例子🌰,看看源码
首先第一种是向上游传递订阅任务时的线程变换
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
--------
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
subscribeActual里边,通过scheduler.scheduleDirect(task)实现了线程切换;在切换之后继续向上游订阅,source.subscribe(parent)。
第二种:向下游发射任务结果的线程变换,一般应用于切换到主线程获取然后刷新界面。
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
-----
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = downstream;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
}
....
a.onNext(v);
}
先在subscribeActual中初始化worker,worker内置于Observer,然后继续向上游订阅传递Observer。接收订阅时,也就是在onNext,先将结果t存放于队列中,然后启用worker的任务,在worker执行的时候,将其从队列中取出,再继续向下游发射结果,完成线程的切换。
你看,线程切换原理就是这么简单~
网友评论