使用过RxJava的人都知道,方便的Api链式调用外,还有一个非常好用的地方,就是线程调度和操作符的使用,我们来举个例子
Observable.just("test")
//指定了被观察者执行的线程环境为io线程
.subscribeOn(Schedulers.io())
//使用map操作来完成类型转换
.map(new Function<String,String>(){
@Override
public String apply(String s) throws Exception {
//这里执行一个耗时操作,比如IO操作
return s + "map";
}
})
//将后面执行的线程环境切换为主线程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
Log.i("xiaosanye","s: " + s);
}
});
- 实际上在使用map操作时,new Function() 就对应了类型的转变的方向。在apply()方法中,输入的是字符串s,返回的是字符串s+“map”,当然我们也可以进行类型转化,比如传入原类型String返回转换后的类型Bitmap。
- 如果在map操作时,执行的是耗时操作的化,比如网络请求、IO操作,这时候我们就要把这个操作切换到子线程去执行,执行完成后,再切换回UI主线程,如果没有使用RxJava肯定得使用各种callback函数,而在RxJava中,通过observeOn和subscribeOn链式操作很方便的实现了线程切换。
1、源码分析observable.subscribeOn和observable.observeOn的内部实现
Observable#observable.subscribeOn
=>return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
=> ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T>
=>AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T>
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
observable.subscribeOn通过return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler))方法,将this也就是observable本身传进去,最终返回一个ObservableSubscribeOn对象,ObservableSubscribeOn继承自AbstractObservableWithUpstream
AbstractObservableWithUpstream继承自Observable并且实现HasUpstreamObservableSource接口,
所以ObservableSubscribeOn对象本身是一个被观察者,
我们我们可以得出结论,调用observable.subscribeOn最后实际上是返回了一个新的被观察者ObservableSubscribeOn
我们再看observable.observeOn
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
Observable#observable.observeOn
=>return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize))
=>ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T>
=>AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T>
同理调用observable.observeOn通过return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize))方法,也将this也就是observable本身传进去,最终返回的也是一个新的被观察者ObservableObserveOn
由上一篇文章我们知道,我们在Observable.create的时候返回一个被观察者ObservableCreate对象,所以我们创建observable的时候实际上返回的是ObservableCreate对象。
-
综上代码分析,其实不管是Observable#observable.subscribeOn还是Observable#observable.observeOn最终都是将observable传进去(也就是ObservableCreate对象),经过装饰,返回了一个新的被观察者对象而已,如下图
image.png
所以最后,我们Observable.subscribe订阅的时候实际上调用的就是经过装饰的最外层ObservableObserveOn
细心的小伙伴肯定也看到了,其实在它们各自的subscribeActual方法里也会对观察者对象进行一次装饰
ObservableObserveOn#subscribeActual
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
ObservableSubscribeOn#subscribeActual
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
最后返回对应的观察者对象ObserveOnObserver、SubscribeOnObserver
最后还有一点要注意的是:
-
Observable.subscribeOn():只能指定一次,如果指定多次则以第一次为准
-
Observable.observeOn():可指定多次,每次指定完都在下一步生效。
那为什么会这样呢?我们看到它们各种的subscribeActual方法里,observeOn是每次都会new一个ObserveOnObserver,ObserveOnObserver里都会传入一个observer和Scheduler.Worker,而subscribeOn只是调用自身的scheduler.scheduleDirect方法。
- 结论:SubscribeOn这个操作符指定的是Observable自身在哪个调度器上执行,而且跟调用的位置没有关系。而ObservableOn则是指定一个观察者在哪个调度器上观察这个Observable,当每次调用了ObservableOn这个操作符时,之后都会在选择的调度器上进行观察,直到再次调用ObservableOn切换了调度器。
2、源码分析操作符Map的内部实现
Observable#map
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
经过上面分析,这段代码很熟悉了把,把this也就是Observable对象和mappr(我们自定义的函数fuction)传进去,最终肯定是返回一个ObservableMap被观察者对象的,我们再点进去ObservableMap看看,
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
downstream.onNext(v);
}
....省略部分代码
}
我们看到在subscribeActual方法里有一句代码source.subscribe(new MapObserver<T, U>(t, function));这句就是RxJava2.x所有操作符的核心代码了。
source就是我们上游传进来的被观察者,然后调用source.subscribe把function传进去,我们知道在被观察者subscribe完,肯定是走下面的onNext方法了,我们看到onNext里的一句关键代码mapper.apply(t),这句就是我们实际转变的地方,也就是我们在外层实现的apply方法。
网友评论