前言
我们在上面的篇幅讲了Rxjava的使用、操作符、背压模式,这篇文章,我们将对Rxjava主线流程进行分析。
Rxjava的实现代码
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
Log.d(TAG, "subscribe");
e.onNext(1);
e.onComplete();
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
我们经过前面的分析,会发现方法执行的顺序是:
1.Observer里面的onSubscribe()方法。
2.ObservableOnSubscribe里面的subscribe方法。
3.Observer里面的onNext()方法。
4.Observer里面的onComplete()方法。
下面我们分析Rxjava源码:
首先我们调用的是Observable的create方法。
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
这里很简单,跟进去后发现主要调用的是RxJavaPlugins里面的onAssembly()方法,然后创建了一个ObservableCreate对象。
再跟进去,然后由于我们并没有调用setOnObservableAssembly()代码设置onObservableAssembly,所以f是空,直接返回了source。至此,create()方法结束。
接着,我们看订阅方法subscribe():
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
protected abstract void subscribeActual(Observer<? super T> observer);
同样的由于没能调用setOnObservableAssembly()所以RxJavaPlugins.onSubscribe(this, observer)方法直接返回的就是observer。
接着我们看subscribeActual(observer)方法,发现他是一个抽象的方法,留给子类实现。我们从上面的分析可以知道前面创建的是ObservableCreate对象,那么我们去ObservableCreate类中找这个方法。
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
从上面代码可以看到的是我们在subscribeActual()方法中调用了observer的onSubscribe()方法,这就是为什么onSubscribe()方法在最先执行。
由于ObservableCreate是Observable的子类,然后再调用了source.subscribe()方法。就执行到了第二步。
最后就是我们外面看到的自己调用的onNext()和onComplete()方法。
这里的CreateEmitter是一个内部类实现了ObservableEmitter<T>, Disposable的接口。
至此,Rxjava的主要流程就分析完毕。
map操作符的流程
分析玩主线流程,下面我们分析下map的操作流程。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onComplete();
}
})
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return null;
}
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: " + s);
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
当我们创建完ObservableCreate对象后会调用Observable类中的map()方法,同样的,由于没能调用setOnObservableAssembly()所以返回的就是ObservableMap对象。
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
接下来就不同了,由于返回的是ObservableMap,所以我们调用的是ObservableMap类中的subscribeActual方法,进而进入到了Observable中的subscribe方法。
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}
但是这时map起到了一个中间桥梁的作用,当我们在ObservableOnSubscribe中的subscribe()方法中调用onNext()方法是,实际上持有的是ObservableMap对象。
这时候我们就会调用到ObservableMap中的onNext()方法,然后我们就可以看到我们执行了mapper.apply(t),然后将返回的值再用actual.onNext(v)传递给了被观察者。
所以总结就是,map起了桥梁作用,同时持有被观察者和观察者,通过得到被观察者传递的事件,然后在内部调用重写的apply()方法转换事件,再调用观察者的onNext方法传递给观察者。
网友评论