Rxjava由于其基于事件流的链式调用、逻辑简洁 & 使用简单的特点,深受各大 Android开发者的欢迎。
image.png
定义
RxJava在GitHub的介绍:
RxJava:a library for composing asynchronous and event-based programs using observable sequences for the Java VM
// 翻译:RxJava 是一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库
RxJava是一个 基于事件流、实现异步操作的库
作用
实现异步操作
类似于
Android
中的AsyncTask
、Handler
作用
特点
由于 RxJava
的使用方式是:基于事件流的链式调用,所以使得 RxJava
:
- 逻辑简洁
- 实现优雅
- 使用简单
原理流程总览
在为RxJava删繁就简的能力惊叹之余,好奇的我们肯定控制不住探索其原理的欲望。“为通过链式操作符就可以一路走到底?为啥线程可以链式切换呢?等等......”
我最近春节在家,终于有机会可以好好系统探索下RxJava的原理源码,下载了RxJava 1.x的源码仔细斟酌一番,拨开代码的重重迷雾,慢慢地抓住了它简约而不简单的设计原理。
为什么说是简约而不简单呢?简约是因为它的原理并不复杂,不高深,不简单是因为它能把一切复杂都隐藏在流畅的链式中。
话不多说,进入主题。
经过分析,我觉得用手机包装流水线来形容RxJava总的工作流程还是比较恰当的(大家觉得有不妥的欢迎指出讨论~):
1.先搭建生产流水线
2.启动流水线包装
3.用户一层层拆开包装最后拿到手机(最后的结果)。
这里各个比喻对应的代码对象:
Observable:流水线的某一道工序
OnSubscribe:一道工序中的工人
OnSubscribe的call方法:包装Subscriber
subscribe方法:启动流水线
Subscriber:一层包装盒
Subscriber的onNext:用户拆开包装
具体阐述下以上比喻的意思:
1. 先搭建生产流水线
其实大部分操作符,都是新建一个Observable对象,然后将上游的Observable对象包装起来,传入一个新的OnSubscribe,比如:
publicfinal<R> Observable<R> map(Func1<? superT, ? extends R> func) {
returnunsafeCreate( newOnSubscribeMap<T, R>( this, func));
}
publicfinalObservable<T> filter(Func1<? superT, Boolean> predicate) {
returnunsafeCreate( newOnSubscribeFilter<T>( this, predicate));
}
publicfinal<R> Observable<R> lift( finalOperator<? extends R, ? superT> operator) {
returnunsafeCreate( newOnSubscribeLift<T, R>(onSubscribe, operator));
}
publicfinalObservable<T> subscribeOn(Scheduler scheduler, booleanrequestOn) {
if( thisinstanceofScalarSynchronousObservable) {
return((ScalarSynchronousObservable<T>) this).scalarScheduleOn(scheduler);
}
returnunsafeCreate( newOperatorSubscribeOn<T>( this, scheduler, requestOn));
}
最后都是调用了create方法创建Observable, 把当前Observable传入给新的Observable持有,以保持链式(有点类似链表持有上一个节点的指针)。为什么要这样呢, 因为RxJava是的链式是基于代理模式做的,也就是说基于一层一层Observable的包装。
那包装的是什么呢?就是OnSubscribe,那OnSubscribe包装的意义是什么呢?其实就是包装如何包装Subscriber的逻辑。
比如map,传入的是OnSubscribeMap
(OnSubscribe的基类),它的call代码如下,
@Override
publicvoidcall( finalSubscriber<? superR> o) {
MapSubscriber<T, R> parent = newMapSubscriber<T, R>(o, transformer);
o.add(parent);
source.unsafeSubscribe(parent);
}
当map这个操作符创建出来的Observable被调用subscribe被调用的时候,就会该OnSubscribeMap的call方法,可以看代码发现这里创建了一个MapSubscriber对象,然后调用上游的Observable的unsafeSubscribe方法,传入该MapSubscriber对象作为参数。
所以当你开心地用RxJava一个个操作符把链写得老长的时候,里面的逻辑就是不断一层层包装Observable,每个Observable持有一个自己的OnSubscribe,具体类型由对应的操作符确定。
这就是我说的第一个流程 搭建流水线,总的来说就是从上往下不断创建Observable,并连成链,即后一个Observable持有上游Observable的引用。
Observable之所以说是流水线的某一道工序,是因为它是这条链最基本的串联元素,而OnSubscribe之所以说是一道工序中的工人,是因为它决定了Subscriber是如何被包装的。
2. 启动流水线包装
启动的开关正是链尾的subscribe方法。看下Observable的subscribe方法:
publicfinalSubion subscribe(Subscriber<? superT> subscriber) {
returnObservable.subscribe(subscriber, this);
}
subscribe(Subscriber<? super T> subscriber, Observable observable)方法,方法比较长,最重要的就是
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
RxJava 1.x中,RxJavaHooks.onObservableStart其实没有做什么操作,返回的就是原来的observable对象的onSubscribe,所以这里就是调用observable对象的onSubscribe的call方法,传入subscriber对象。
publicinterfaceOnSubscribe< T> extendsAction1< Subscriber<? superT>> {
// cover for generics insanity
}
OnSubscribe是一个继承Action1的接口,Action1是一个只有call方法的接口,所以这里call的逻辑由具体的OnSubscribe对象确定。
还记得还是那个面说的map操作生成的OnSubscribeMap对象的call方法逻辑么?它的call方法中创建了一个MapSubscriber对象,然后调用上游的Observable的unsafeSubscribe方法,并传入该MapSubscriber对象作为参数。
这里要注意的是,在创建了一个MapSubscriber对象的时候, 会传入当前Observable调用的subscribe方法的参数Subscriber对象,保存该对象的引用actual ,以保持链式:
publicMapSubscriber(Subscriber<? superR> actual, Func1<? superT, ? extends R> mapper) {
this.actual = actual;
this.mapper = mapper;
}
所以假如链尾的Observable是map操作符创建的,则subscribe执行的时候,会执行Observable对象中的OnSubscribeMap对象的call方法,生成一个MapSubscriber对象并持有我们代码中链的最后传入的 Subscriber对象,然后让上游的Observable对象调用subscribe方法,并传入这个MapSubscriber对象。
所以这里就是从下往上递归调用Observable对象的subscribe方法,从而生成一条Subscriber对象的链(也可以理解为一层层包装)。
在这里,经过subscribe方法的启动,已经开始加工包装,最后生产出了一条Subscriber对象的链,即我们的手机包装盒。
3. 用户拆开手机包装盒
这个流程,可以用杨宗纬的《洋葱》一段经典歌词来阐述:“一层一层一层地剥开我的心~~。”
上一步操作从下到上生成了Subscriber对象的链,链的尾部就是最上游的Observable中的:
Observable.create(object : OnSubscribe<Int> {
override fun call(t: Subscriber<in Int>){
t.onStart
t.onNext( 1)
}
})
这里第二行的t: Subscriber,现在第四行调用了 t.onNext(1),又以之前的map操作符生成的MapSubscriber对象为例:
publicvoidonNext(T t){
R result;
try{
result = mapper.call(t);
} catch(Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe;
(Throwable.addValueAsLastCause(ex, t));
return;
}
actual.onNext(result);
}
这里先使用mapper.call(t)对传进去的Subscriber对象进行了变换,即map操作中指定的变换方法,这个下一节再谈。
先注意这里最后调用了 actual.onNext(result);, 而actual就是Subscriber链的下一个Subscriber对象,而除了map以外,大部分的Subscriber对象的onNext方法也有这样的逻辑。
所以可以知道,这里Subscriber链在递归调用,也可以看作一层一层一层地打开,就仿佛是拆开手机包装盒。
流程总结
看前面的叙述各位可能还是有点雾里看花,总结一下前面三个小节各对应一个流程,从RxJava调用代码来说 :
就是先从上到下把各个变换的Observable连成链(拼装流水线),然后在最后subscribe的时候,又从下到上通过每个Observable的OnSubscribe从最下的Subscriber对象开始连成链(流水线开始工作包装Subscriber),直到顶端,当顶端的Subscriber对象调用了onNext方法的时候,又从上往下调用Subscriber链的onNext(用户一层层拆开包装盒),里面执行了每个操作的变换逻辑。
举个例子进一步说明以上流程:
Observable.create(object : OnSubscribe<Int> {
override fun call(t: Subscriber<in Int>){
t.onStart
t.onNext( 1)
}
})
.map(object : Func1<Int, String> {
override fun call(t: Int): String {
Log.d(TAG, Thread.currentThread.name)
returnt.toString
}
})
.map(object : Func1<String, Book> {
override fun call(t: String): Book {
Log.d(TAG, Thread.currentThread.name)
returnBook(t)
}
})
.subscribe(object : Subscriber<Book> {
override fun onStart{
}
override fun onNext(t: Book){
Log.d(TAG, Thread.currentThread.name)
Log.d(TAG, t.toString)
}
override fun onComplete{
}
override fun (t: Throwable){
Log.d(TAG, t.message)
}
})
为了简单,这里只使用了map操作符。以下是一个简单的流程图:
![](https://img.haomeiwen.com/i28055132/cda763fe766da23b.png)
操作符原理解析
如果上面的总流程分析能理解的话,那么下面的操作符的理解就不难了。
普通的变换操作符
这里举map的例子。这里的变换处于从上往下递归执行Subscriber链onNext阶段(用户拆手机包装盒)
前面提到map中生成的MapSubscriber对象的onNext方法:
publicvoidonNext(T t){
R result;
try{
result = mapper.call(t);
} catch(Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe;
(Throwable.addValueAsLastCause(ex, t));
return;
}
//调用下游的Subscriber的onNext方法
actual.onNext(result);
}
注意到第四行代码 result = mapper.call(t);,这里的mapper其实就是我们写的map操作的变换方法:
.map(object : Func1<String, Book> {
override fun call(t: String): Book {
Log.d(TAG, Thread.currentThread.name)
returnBook(t)
}
})
这里面的Func1回调接口,所以经过这样call方法的调用,就实现了map的操作变换,然后执行 actual.onNext(result);,即将变换后的结果交给下游的Subscriber的onNext方法。
如果理解了上面的流程图,是不是理解map易如反掌呢?
线程切换操作符
线程切换主要两个操作符: subscribeOn和observeOn
线程切换是我觉得RxJava最牛逼的地方,不过了解了原理之后觉得也不复杂高深,主要还是在上面的总流程中的对应节点使用了常见的切换线程方式。
subscribeOn
作用:将subscribe Observer的执行放在对应的线程。
subscribeOn最终会执行到:
publicfinalObservable<T> subscribeOn(Scheduler scheduler, booleanrequestOn) {
if( thisinstanceofScalarSynchronousObservable) {
return((ScalarSynchronousObservable<T>) this).scalarScheduleOn(scheduler);
}
returnunsafeCreate( newOperatorSubscribeOn<T>( this, scheduler, requestOn));
}
注意最后执行了:
returnunsafeCreate( newOperatorSubscribeOn<T>( this, scheduler, requestOn));
根据前面的分析,这里就是创建新的Observable对象,并传入一个OnSubscribe实例对象,这里是OperatorSubscribeOn对象。
根据上面的说明,这里要看call方法:
publicvoidcall( finalSubscriber<? superT> subscriber) {
finalWorker inner = scheduler.createWorker;
SubscribeOnSubscriber<T> parent = newSubscribeOnSubscriber<T>(subscriber, requestOn, inner, source);
subscriber.add(parent);
subscriber.add(inner);
inner.schedule(parent);
}
可以看到第四行SubscribeOnSubscriber parent = new SubscribeOnSubscriber(subscriber, requestOn, inner, source);,所以它创建的就是Subscriber对象就是SubscribeOnSubscriber,注意这里第二行final Worker inner = scheduler.createWorker;和最后一行 inner.schedule(parent);,
这里的方法调用栈比较长就不赘述,直接说下,这里worker里面就是执行线程切换的,里面封装线程池或者Handler,通过schedule方法就可以将SubscribeOnSubscriber包装成一个Runnable放入线程池中执行,执行的方法是SubscribeOnSubscriber的call方法。
而SubscribeOnSubscriber的call方法:
publicvoidcall{
Observable<T> src = source;
source = null;
t = Thread.currentThread;
src.unsafeSubscribe( this);
}
和其他的Subscriber一样,也是传入上游Observable的subscribe方法中。
回忆上面讲的总流程, 在第二个流程从下往上包装Subscriber链(加工包装)的时候,subscribeOn就是将从它当前这个节点开始将后面的一系列的Subscriber的成链以及从上往下执行各个Subscriber对象的onNext放到指定的线程执行。
常见的一种描述subscribeOn作用的说法:****“将该subscribeOn语句的上游放在对应的线程中。”其实并不准确,因为如果只使用了subscribeOn而没有使用observeOn的话, 整条链的变换过程都会执行在subscribeOn指定的线程的。****RxJava官方的解释才是准确的:
Asynchronously subscribes Observers to thisObservable on the specified { @linkScheduler}.
在刚才的示例代码中加入subscribeOn:
Observable.create(object : OnSubscribe<Int> {
override fun call(t: Subscriber<in Int>){
t.onStart
t.onNext( 1)
}
})
.map(object : Func1<Int, String> {
override fun call(t: Int): String {
Log.d(TAG, Thread.currentThread.name)
returnt.toString
}
})
//这里切换线程
.subscribeOn(Schedulers.io)
.map(object : Func1<String, Book> {
override fun call(t: String): Book {
Log.d(TAG, Thread.currentThread.name)
returnBook(t)
}
})
.subscribe(object : Subscriber<Book> {
override fun onStart{
}
override fun onNext(t: Book){
Log.d(TAG, Thread.currentThread.name)
Log.d(TAG, t.toString)
}
override fun onComplete{
}
override fun (t: Throwable){
Log.d(TAG, t.message)
}
})
用刚才的流程图来表示的话,subscribeOn切换线程差不多是这样子的:
![](https://img.haomeiwen.com/i28055132/01f705eb3926935c.png)
红色部分为放入指定线程的逻辑。
observeOn
observeOn最终会走到:
publicfinalObservable<T> observeOn(Scheduler scheduler, booleandelayError, intbufferSize) {
if( thisinstanceofScalarSynchronousObservable) {
return((ScalarSynchronousObservable<T>) this).scalarScheduleOn(scheduler);
}
returnlift( newOperatorObserveOn<T>(scheduler, delayError, bufferSize));
}
这里使用了lift方法:
publicfinal<R> Observable<R> lift( finalOperator<? extends R, ? superT> operator) {
returnunsafeCreate( newOnSubscribeLift<T, R>(onSubscribe, operator));
}
这里和map本质还是一样的,创建一个新的额Observable并传入一个新的OnSubscribe对象(OnSubscribeLift),那主要就是要看这个OnSubscribeLift的call做了什么:
Subscriber<? superT> st = RxJavaHooks.onObservableLift(operator).call(o);
try{
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart;
parent.call(st);
call最主要的就是这几行,和map基本差不多,就是使用operator对传入从下游传入的Subscribeder进行转换,所以关键看OperatorObserveOn的call做了什么转换:
ObserveOnSubscriber<T> parent = newObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
parent.init;
returnparent;
主要看这里,OperatorObserveOn中创建了一个ObserveOnSubscriber,最后返回。
注意这里和OperatorSubscribeOn的不同,OperatorSubscribeOn是在call方法就把新建的Subscriber对象包装为Runnbale放入线程池中执行, 将上游Observable对他的subscribe调用放到了指定线程。
而OperatorObserveOn是将ObserveOnSubscriber对象作为参数传入了上游的OnSubscribe的call方法,然后整个从下往上的包装Subscribe还是在原来的线程中执行,那这里关键点就是看ObserveOnSubscriber的onNext做了什么操作:
if(!queue.offer(NotificationLite.next(t))) {
( newMissingBackpressureException);
return;
}
schedule;
重点就是这几行。第一行的t是onNext返回的Subscriber对象,NotificationLite.next这里正常情况下返回的还是t,而queue是一个队列,这里将t入列, 然后执行了schedule,该方法是将当前的ObserveOnSubscriber对象包装为Runnable,放入线程池中,然后在指定线程执行其call方法主要代码如下:
...
finalQueue<Object> q = this.queue;
finalSubscriber<? superT> localChild = this.child;
...
for(;;) {
...
Object v = q.poll;
booleanempty = v == null;
if(checkTerminated(done, empty, localChild, q)) {
return;
}
if(empty) {
break;
}
localChild.onNext(NotificationLite.<T>getValue(v));
...
}
可以看到这里就是循环从队列中取出元素,然后再传入下游的Subscriber的onNext方法。
总结obs****erveOn的操作:
在从下往上包装Subscriber链的时候(用户拆开手机包装盒),在调用observeOn的地方插入一个ObserveOnSubscriber对象,该对象可以在之后从上往下递归调用Subscriber链的时候,将该ObserveOnSubscriber对象之后的所有的onNext方法放到指定线程中执行。
现在在实例中添加observeOn:
Observable.create(object : OnSubscribe<Int> {
override fun call(t: Subscriber<in Int>){
t.onStart
t.onNext( 1)
}
})
.map(object : Func1<Int, String> {
override fun call(t: Int): String {
Log.d(TAG, Thread.currentThread.name)
returnt.toString
}
})
.subscribeOn(Schedulers.io)
//将当前调用之后的onNext放入指定线程
.observeOn(Schedulers.main)
.map(object : Func1<String, Book> {
override fun call(t: String): Book {
Log.d(TAG, Thread.currentThread.name)
returnBook(t)
}
})
.subscribe(object : Subscriber<Book> {
override fun onStart{
}
override fun onNext(t: Book){
Log.d(TAG, Thread.currentThread.name)
Log.d(TAG, t.toString)
}
override fun onComplete{
}
override fun (t: Throwable){
Log.d(TAG, t.message)
}
})
用流程图来表示:
![](https://img.haomeiwen.com/i28055132/a3fcfd05a9ac810a.png)
红色为subscribeOn指定线程执行部分,绿色为observeOn指定线程执行部分。
文末
除了满足自己的探索欲望之外,通过学习RxJava源码,就可以学习到如何运用RxJava的设计思想,通过封装代码编写自己的响应式编程框架,以后我们就可以写出自己的RxPay、RxLogin之类的了。更多Android架构师进阶技术,尽在一键获取【源码解读】
网友评论