先上这两个对应的类的代码(仅看关键的方法):
Observable.subscribeOn
方法创建的类ObservableSubscribeOn
:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
}
Observable.observeOn
方法创建的类ObservableObserveOn
:
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
}
Observable.create
创建的普通的ObservableCreate
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
}
}
可以看到,普通的没有线程切换的Observable,在subscribeActual方法里直接执行了source.subscribe(parent)方法。
在ObservableSubscribeOn
的subscribeActual
方法里,可以看到source. subscribe发生在指定的线程里,所以上游在指定线程执行subscribe方法
在ObservableObserveOn
的subscribeActual
方法里,可以看到source. subscribe仍然在当前线程,在run方法里有判断如果是异步的就把下游observer.onNext
,observer.onError
等方法在指定线程里执行。
由上面可以看出subscribeOn影响的是上游,observeOn影响的是下游。
还有一种情况,如果有多个subscribeOn或者多个observeOn的时候,每句代码生成的Observable执行在哪个线程呢?这里先说一下结论,每个Observable在下一句离它最近的subscribeOn指定的线程执行,每个Observable在上一句离它最近的observeOn指定的线程执行。
为什么会是这个结论呢?
因为订阅(Observable.subscribe)发生的顺序是从下游到上游,所以执行subscribeOn的时候
--未写完--
网友评论