前言
我们经常看RxJava的文章,很多都是API性的介绍.今天我们就用一段来理解它吧,了解它的内幕
本文编译需要:
compile 'io.reactivex.rxjava2:rxjava:2.0.1'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
public static void debug() {
1. Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
2. @Override
3. public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
4. emitter.onNext(1);
5. }
6. });
7. Consumer<Integer> consumer = new Consumer<Integer>() {
8. @Override
9. public void accept(Integer integer) throws Exception {
10. Log.d(TAG, "accept: " + integer);
11. }
12. };
13. observable.subscribe(consumer);
}
这篇文章的目的就是分析这段代码,去挖掘RxJava到底做了什么
让我们开始吧
我们都知道RxJava最重要的东西就是Obervable(被观察者)和Observer(观察者), 或者针对事件来说就是上游和下游, 上游发送事件下游处理事件. 为了便于分析,我把代码标了行号,便于引用.
粗略的分析上面这段代码就是一个Observable订阅了一个Consumer
弄清Obervable的来源
来看Observable.create() 方法的原型:
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
它的参数是ObservableOnSubscribe类型,它只有一个方法待实现:
void subscribe(ObservableEmitter<T> e) throws Exception;
为什么需要ObservableEmitter形参呢.我们不用管,因为被new进ObservableCreate构造函数里去了,肯定是回调作用,这在我们平时写代码时见多了.实际上我们点进ObservableCreate里看到ObservableEmitter是一个内部类, 那问题是RxJavaPlugins.onAssembly()又是干嘛的,
public static <T> Observable<T> onAssembly(Observable<T> source) {
Function<Observable, Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
可以看到把onObservableAssembly赋给f, onObservableAssembly是什么 , 其实就是一个属性而已, 含有getter和setter.而我们也没看到调用set函数,所以该函数就直接返回了source,所以我们得到最终的Observable就是ObservableCreate对象.为什么用这个代替Observabel呢,看下类声明
public final class ObservableCreate<T> extends Observable<T> {
...
}
原来就是Observable的子类.
到了这里,终于清楚了Observable的来龙去脉.
弄清Observer的来源
我们看到这片代码用的是Consumer,它跟Observer有什么关系呢. 看下它的声明
public interface Consumer<T> {
/**
* Consume the given value.
* @param t the value
* @throws Exception on error
*/
void accept(T t) throws Exception;
}
这不就是一个普普通通的接口嘛,实现accept的方法就ok了,是的,consumer就是这么简单
是时候让Observable和Consumer发生关联了
只剩下observable.subscribe(consumer); 这句了, 顾名思义,被观察者订阅了消费者(观察者),我们跟进去看看代码怎么写的
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext) {
return subscribe(onNext, Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}
它调用了subscribe方法,用了我们传进去的consumer对象即onNext,而后面三个参数是系统提供默认的实现,这也说明,肯定还能传进我们自己想要的参数,先不管.继续跟:
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
return ls;
}
这片代码先是进行了一些非空判断,然后把我们的onNext转换成LambdaObserver对象了,LambdaObserver是什么呢:
public final class LambdaObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
...
}
它实现了Observer, 到了这里,终于明白了,原来我们传进来的Consumer被处理成LambdaObserver对象了,继续跟进subscribe(ls);
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
记住,我们手里现在握着的Observable对象是什么,Observer是什么, 当然是ObservableCreate和LambdaObserver的对象.
我们又看到了RxJavaPlugins的调用,跟我们上一步讲的onAssembly是类似的,直接返回了observer.往下就是一个非空的判断了.接着是subscribeActual方法的调用.
看下改方法的声明:
protected abstract void subscribeActual(Observer<? super T> observer);
它是Observable类的一个抽象方法,问题是谁来实现它,就是ObservableCreate了.因为是他实现了Observable.
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
这是ObservableCreate的代码片段.看到它的构造函数没,参数source不就是我们开始传进来的嘛.
继续看subscribeActual方法, 第一行用我们的LambdaObserver对象为参,构造了CreateEmitter对象,继续跟进. 因为observer被LambdaObserver实现了,所以这里当然跳进它的onSubscribe方法
public LambdaObserver(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete,
Consumer<? super Disposable> onSubscribe) {
super();
this.onNext = onNext;
this.onError = onError;
this.onComplete = onComplete;
this.onSubscribe = onSubscribe;
}
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.setOnce(this, s)) {
try {
onSubscribe.accept(this);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
onError(ex);
}
}
}
这里先是用一个帮助类判断了一下,下一步的onSubscribe是什么呢,我们看到它的构造函数,就是我们之前讲的系统给我们提供的默认实现,默认实现是空实现.所以这个方法我们认为什么都没做.但是,如果onSubscribe不是系统提供的呢,那么它的执行时间点是比onNext早的.
我们继续跟进subscribeActual方法.现在要处理try块了
source.subscribe(parent);
注意这个source是ObservableOnSubscribe对象.这个对象又是啥呢?这就不是我们一开始new出来的嘛
... new ObservableOnSubscribe<Integer>() {
2. @Override
3. public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
4. emitter.onNext(1);
5. }
6. }
参数刚好也是ObservableEmitter, 第4行的emitter参数就是
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
传进去的. 紧跟着emitter.onNext(1);就是parent对象的事了
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
...
还记得吧,这里的observer就是LambdaObserver对象也就是一开始的Consumer了.
isDisposed()是什么,表示是否取消被观察者和观察者的关联,我们一路跟进来,没有关于它的操作,肯定返回true
然后就是observer.onNext(t);即回调了Consumer的onNext方法.
结束
到了这里一个简单的流程终于完成了,谢谢大家!
网友评论