线程切换
Rx数据发射器和观察者在同一个线程,未发生线程切换,串行工作,Rx是一个异步框架,主要功能是提供异步操作,让数据源和观察者工作在不同线程。
线程调度器Scheduler,它指定每一段代码应该运行的线程,线程切换两个重要方法。
subscribeOn方法,指定发射数据工作线程。
observeOn方法,指定观察者工作线程。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(2); //发送
e.onComplete();
}
})
.subscribeOn(Schedulers.newThread())
//插入一个被观察者
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
//新线程执行
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
//主线程
}
});
创建ObservableCreate被观察者,调用subscribeOn方法,数据发射器切换Scheduler新建线程工作。
public final Observable<T> subscribeOn(Scheduler scheduler) {
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
创建ObservableSubscribeOn被观察者,Observable子类,使用委托者设计模式,内部封装ObservableCreate,同时,封装Scheduler,发射线程。
doNext方法,创建一个Consumer,返回一个ObservableDoOnEach被观察者,封装ObservableSubscribeOn,内部Consumer,在订阅观察者时将封装到观察者对象,作为观察者onNext方法的消费对象。
observeOn方法,设置线程在主线程。
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
创建ObservableObserveOn被观察者,封装ObservableDoOnEach被观察者和Scheduler线程。

ObservableSubscribeOn和ObservableObserveOn都持有线程Scheduler切换器。
订阅时,调用链条中第一个被观察者ObservableObserveOn的subscribe方法,**被观察者基类Observable都实现ObservableSource接口#subscribe方法,订阅方法都来自这个接口,每个被观察者的subscribeActual方法自己实现。
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
//ObservableDoOnEach被观察者
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
调用内部source的subscribe方法,即ObservableSource类型,其实是链条中下一个被观察者ObservableDoOnEach。
创建一个观察者ObserveOnObserver,封装用户自己的观察者,创建一个Worker,一起带到观察者中。
ObservableDoOnEach的#subscribeActual方法,创建一个DoOnEachObserver观察者,封装ObserveOnObserver,同时封装外部Consumer。
继续沿着链条走,ObservableSubscribeOn的#subscribeActual方法。
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
创建一个SubscribeOnObserver观察者,创建一个SubscribeTask任务,scheduler是发射器绑定的工作线程,它的scheduleDirect方法在指定的线程中执行SubscribeTask。

subscribeOn方法,设置线程是新线程,因此,将在新线程执行任务。
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);//线程执行,source是ObservableCreate。
}
}
线程run方法,此时,被观察者是ObservableSubscribeOn,在新线程中,调用内部source的subscribe方法,ObservableCreate,订阅时,创建发射器,数据源外部调用发射器发射流程和Rx异步文章的一致。
区别是在subscribeOn方法指定新线程执行发射方法。
新线程发射数据时,将找到观察者,观察者也是链式节点,第一个节点是最后创建的SubscribeOnObserver,当执行DoOnEachObserver的onNext方法时,内部Consumer的accept方法。
@Override
public void onNext(T t) {
if (done) {
return;
}
try {
onNext.accept(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
s.dispose();
onError(e);
return;
}
actual.onNext(t);//下一个观察者onNext方法
}
Consumer类型包括onNext,onError,onNext变量即doOnNext方法设置的Consumer,在新线程执行,上图黄色部分都在新线程执行。
到达ObserveOnObserver观察者时,它内部封装工作线程Work。
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
观察者实现了Runnable,通过Worker,schedule方法,将观察者本身作为任务,在observeOn方法指定主线程运行。后面的观察者,onNext方法,都会切换线程,在主线程中运行。
每调用一次observeOn方法,都会创建一个ObserveOnObserver绑定执行线程,它后面的观察者都会在指定线程运行,因此,线程便会切换一次。
如果调用两次subscribeOn方法。
.subscribeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.newThread())
创建两个ObservableSubscribeOn被观察者节点,根据链表,首先调用的节点在后面,第二次调用的节点在前面,从头部节点开始subscribe订阅时,即使前面(第二次)的切换了线程,在后面(第一次)的节点subscribe订阅由会转换到另外的线程,最终发射数据将以后面的节点指定线程运行,即第一次。
因此,subscribeOn方法设置多次时,以第一次设置的发射数据线程有效。

任重而道远
网友评论