RxJava源码分析基于RxJava1.3.8。
题外话:
在写这边博客之前曾犹豫过很久,因为RxJava的源码非常复杂,感觉不是在一两篇博客中就能讲解清楚。但是正式因为RxJava的源码非常复杂,我也想通过写博客的方式来加深自己对RxJava源码的认识,同时也算是做笔记了,能时不时的翻出来看看,以防止忘记。
概述
虽然RxJava2也发布了很长时间了,但是对于分析RxJava的工作原理,RxJava1的源码相对于RxJava2相对而言会容易很多(原因想必大家都很清楚,其实是因为RxJava2我也没有怎么用过,哈哈),所以对RxJava的源码分析依然选择使用RxJava1。
在本次RxJava的源码分析中,只针对开发中常用的也是必须掌握的操作符做具体分析,主要包括三方面:
- 基本使用
- 变换的原理
- 线程切换的原理
至于其他的一些不常用的操作符(RxJava中有大量的操作)就不过多分析了。
这里非常推荐扔物线的博客:给 Android 开发者的 RxJava 详解
基本使用
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("RxJava!");
subscriber.onCompleted();
}
}).subscribe(new Subscriber<String>() {
@Override
public void onNext(String s) {
System.out.println(s);
}
@Override
public void onCompleted() {
System.out.println("onCompleted");
}
@Override
public void onError(Throwable e) {
}
});
<code></code>
上述代码就是RxJava中的最基本使用,这个基本使用主要包括:
- 通过<code>Observable.create()</code>创建一个Observable(被观察者)对象。
- 创建一个Observer(观察者)对象,也就是Subscriber对象。
- 通过<code>Observable.subscribe()</code>实现Observable-Observer的订阅。
- 在<code>Observable.subscribe()</code>方法内部产生一个事件,通过<code>OnSubscribe.call</code>产生事件。
- 最终通过<code>Observer</code>对象处理事件。
基本订阅
源码分析
针对上文RxJava中的基本使用,这里会针对这个流程做源码分析。
subscribe方法
首先,在RxJava的调用流程中,<code>Observable.subscribe()</code>方法实现<code>Observable</code>和<code>Observer</code>的订阅,同时该方法也是整个RxJava链式调用的起点,所以首先看看<code>Observable.subscribe()</code>的内部逻辑。
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
在上面方法中,仅仅只是调用了<code>subscribe(Subscriber<? super T> subscriber, Observable<T> observable)</code>重载方法,对这个重载方法其实不仅在该方法中调用了该重载方法,subscribe()有很多的重载方法,最终都是通过调用<code>subscribe(Subscriber<? super T> subscriber, Observable<T> observable)</code>该方法完成整个订阅流程的。
接下来就看下这个最终被调用的重载方法,如下:
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
// 省略部分代码
// 1、初始化方法,内部没有任何实现
subscriber.onStart();
// 2、将参数subscriber包装成SafeSubscriber
// SafeSubscriber主要保证subscriber完成调用后能被及时取消订阅,并且在onComplete()调用后无法在被调用
if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber<T>(subscriber);
}
try {
/**
* 核心流程:
*
* 1、首先, RxJavaHooks.onObservableStart()方法获取OnSubscribe对象
* 2、然后,根据获取的OnSubscribe,调用该对象的call()方法,这里可以看到在call()方法中传入的subscriber对象就是在subscribe()方法传入的subscriber对象
*
* 上面获取的OnSubscribe其实就是在创建Observable时通过create()方法创建的OnSubscribe对象
*/
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
// 省略部分代码
}
}
在subscribe()方法中,基本就完成了Observable对象和Subscriber的订阅,并通过<code>OnSubscribe.call(Subscriber)</code>开始整个链式调用。
在代码中,注释非常清晰,这里就不重复了。
这里需要注意的是通过RxJavaHooks获取OnSubscribe对象的流程,这里对<code>RxJavaHooks.onObservableStart()</code>的调用流程做了下梳理,如下:
public final class RxJavaHooks {
// 1、定义一个Func2对象onObservableStart
static volatile Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe> onObservableStart;
// 2、静态代码块调用,实现初始化
static {
init();
}
// 3、初始化onObservableStart对象
static void init() {
// 省略部分代码
onObservableStart = new Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe>() {
@Override
public Observable.OnSubscribe call(Observable t1, Observable.OnSubscribe t2) {
// 在该代码中,默认返回方法参数的OnSubscribe对象
return RxJavaPlugins.getInstance().getObservableExecutionHook().onSubscribeStart(t1, t2);
}
};
}
// 4、调用onObservableStart(),获取具体的OnSubscribe
public static <T> Observable.OnSubscribe<T> onObservableStart(Observable<T> instance, Observable.OnSubscribe<T> onSubscribe) {
Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableStart;
// 变量onObservableStart不会为空,所以会执行if判断语句
if (f != null) {
// 该方法会返回参数onSubscribe,也就是Observable.OnSubscribe对象
return f.call(instance, onSubscribe);
}
return onSubscribe;
}
}
所以,通过上述流程发现最终获取的OnSubscribe对象,还是创建Observable对象时创建的那个OnSubscribe对象。
总结
这样关于RxJava的基本使用的源码分析就到这里了,这里面的核心流程也就是<code>Subscriber.subscribe()</code>方法,该方法是实现响应式编程的核心方法。
当然,在基本使用中对于<code>Observerable.OnSubscribe</code>类的作用并没有很突出,在下一章节中将主要讲解RxJava变换的原理。在变换的流程中就是要对<code>Subscriber.subscribe()</code>和<code>Observerable.OnSubscribe</code>两个方法的方法
网友评论