![](https://img.haomeiwen.com/i1325359/4eec2fd1d27aed2b.png)
前言
相信很多做Android或是Java研发的同学对RxJava应该都早有耳闻了,尤其是在Android开发的圈子里,RxJava渐渐开始广为流行。同样有很多同学已经开始在自己的项目中使用RxJava。它能够帮助我们在处理异步事件时能够省去那些复杂而繁琐的代码,尤其是当某些场景逻辑中回调中嵌入回调时,使用RxJava依旧能够让我们的代码保持极高的可读性与简洁性。不仅如此,这种基于异步数据流概念的编程模式事实上同样也能广泛运用在移动端这种包括网络调用、用户触摸输入和系统弹框等在内的多种响应驱动的场景。那么现在,就让我们一起分析一下RxJava的响应流程吧。
(本文基于RxJava-1.1.3)
用法
首先来看一个简单的例子:
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("onNext");
subscriber.onCompleted();
}
}).map(new Func1<String, String>() {
@Override
public String call(String s) {
return s + " -> Xepher";
}
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("--- onCompleted ---");
}
@Override
public void onError(Throwable e) {
System.out.println("onError");
}
@Override
public void onNext(String s) {
System.out.println("subscriber -> " + s);
}
});
运行结果为:
subscriber -> onNext -> Xepher
--- onCompleted ---
从结果中我们不难看出整体的调用流程:
首先通过调用Observable.create()
方法生成一个被观察者,紧接着在这里我们又调用了map()
方法对原被观察者进行数据流的变换操作,生成一个新的被观察者(为何是新的被观察者后文会讲),最后调用subscribe()
方法,传入我们的观察者,这里观察者订阅的则是调用map()
之后生成的新被观察者。
在整个过程中我们会注意到三个主角:Observable、OnSubscribe、Subscriber,所有的操作都是围绕它们进行的。不难看出这里三个角色的分工:
- Observable:被观察者的来源,亦或说是被观察者本身
- OnSubscribe:用来通知观察者的不同行为
- Subscriber:观察者,通过实现对应方法来产生具体的处理。
所以接下来我们以这三个角色为中心来分析具体的流程。
分析
一、订阅过程
首先我们进入Observable.create()
看看:
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(hook.onCreate(f));
}
这里调用构造函数生成了一个Observable对象并将传入的OnSubscribe赋给自己的成员变量onsubscribe
,等等,这个hook是从哪里冒出来的?我们向上找:
static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();
RxJavaObservableExecutionHook这个抽象Proxy类默认对OnSubscribe对象不做任何处理,不过通过继承该类并重写onCreate()
等方法我们可以对这些方法对应的时机做一些额外处理比如打Log或者一些数据收集方面的工作。
到目前最初始的被观察者已经生成了,我们再来看看观察者这边。我们知道通过调用observable.subscribe()
方法传入一个观察者即构成了观察者与被观察者之间的订阅关系,那么这内部又是如何实现的呢?看代码:
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
... ...
subscriber.onStart();
if (!(subscriber instanceof SafeSubscriber)) {
subscriber = new SafeSubscriber<T>(subscriber);
}
try {
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
... ...
return Subscriptions.unsubscribed();
}
}
这里我们略去部分无关代码看主要部分,subscribe.onStart()
默认空实现我们暂且不用管它,对于传进来的subscriber
要包装成SafeSubscriber,这个SafeSubscriber对原来的subscriber的一系列方法做了更完善的处理,包括:onError()
与onCompleted()
只会有一个被执行;保证一旦onError()
或者onCompleted()
被执行,将不再能再执onNext()
等情况。这里封装为SafeSubscriber之后,调用onSubscribe.call()
,并将subscriber
传入,这样就完成了一次订阅。
![](https://img.haomeiwen.com/i1325359/f22d00be66e44370.jpg)
以上就是一次
map()
变换的流程,事实上多次map()也是同样道理:最外层的目标Subscriber发生订阅行为后,onSubscribe.onNext()
会逐层嵌套调用,直至初始Observable被最底层的Subscriber订阅,通过Operator的一层层变化将消息传到目标Subscriber。再次祭出扔物线的图:![](https://img.haomeiwen.com/i1325359/a3b51139c159ee89.jpg)
至于其他的多种变化的实现流程也都很类似,借助于Operator的不同实现来达到变换数据流的目的。例如其中的
flatMap()
,它需要进行两次lift()
,其中第二次是OperationMerge,将转换成的每一个Observable数据流通过InnerSubscriber这个纽带订阅后,在InnerSubscriber的onNext()
中拿到R,再通过传入的parent
(也就是原MergeSubscriber)将它们全部发射(emit)出去,由最外层我们传入的Subscriber统一接收,这样就完成了 T
=> Observable<R>
=> R
的转化:![](https://img.haomeiwen.com/i1325359/76705c707c3f5274.png)
在subscribeOn()时,我们会新生成一个Observable,它的成员
onSubscribe
会在目标Subscriber订阅时使用传入的Scheduler的worker
作为线程调度执行者,在对应的线程中通知原始Observable发送消息给这个过程中临时生成的Subscriber,这个Subscriber又会通知到目标Subscriber,这样就完成了subscribeOn()
的过程。
2、observeOn()
![](https://img.haomeiwen.com/i1325359/627e0dcbca0fc134.png)
对比
subscribeOn()
和observeOn()
这两个过程,我们不难发现两者的区别:subscribeOn()
将初始Observable的订阅事件整体都切换到了另一个线程;而observeOn()
则是将初始Observable发送的消息切换到另一个线程通知到目标Subscriber。前者把** “订阅 + 发送” 的切换了一个线程,后者把 “发送” ** 切换了一个线程。所以,我们的代码中所实现的功能其实是:
... ...
.subscribeOn(Schedulers.io())//将“订阅”、“发送”都切换到Schedulers.io()对应的线程
... ...
.observeOn(AndroidSchedulers.mainThread())////将“发送”再切换回到AndroidSchedulers.mainThread()对应的线程
这样就能很容易实现耗时任务在子线程操作,在主线程作更新操作等这些常见场景的功能啦。
四、其他角色
Subject
Subject在Rx系列是一个比较特殊的角色,它继承了Observable的同时也实现了Observer接口,也就是说它既可作为观察者,也可作为被观察者,他一般被用来作为连接多个不同Observable、Observer之间的纽带。可能你会奇怪,我们不是已经有了像map()
、flatMap()
这类的操作符去变化 Observable数据流了吗,为什么还要引入Subject这个东西呢?这是因为Subject所承担的工作并非是针对Observable数据流内容的转换连接,而是数据流本身在Observable、Observer之间的调度。光这么说可能还是很模糊,我们举个《RxJava Essentials》中的例子:
PublishSubject<String> stringPublishSubject = PublishSubject.create();
Subscription subscriptionPrint = stringPublishSubject.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("Observable completed");
}
@Override
public void onError(Throwable e) {
System.out.println("Oh,no!Something wrong happened!");
}
@Override
public void onNext(String message) {
System.out.println(message);
}
});
stringPublishSubject.onNext("Hello World");
我们通过create()
创建了一个PublishSubject
,观察者成功订阅了这个subject
,然而这个subject
却没有任何数据要发送,我们只是知道他未来会发送的会是String
值而已。之后,当我们调用subject.onNext()
时,消息才被发送,Observer
的onNext()
被触发调用,输出了"Hello World"
。
这里我们注意到,当订阅事件发生时,我们的subject
是没有产生数据流的,直到它发射了"Hello World"
,数据流才开始运转,试想我们如果将订阅过程和subject.onNext()
调换一下位置,那么Observer
就一定不会接受到"Hello World"
了(这不是废话吗- -|||),因而这也在根本上反映了Observable
的冷热区别。
一般而言,我们的Observable都属于Cold Observables,就像看视频,每次点开新视频我们都要从头开始播放;而Subject则默认属于Hot Observables,就像看直播,视频数据永远都是新的。
基于这种属性,Subject
自然拥有了对接收到的数据流进行选择调度等的能力了,因此,我们对于Subject
的使用也就通常基于如下的思路:
//接收数据流
Observable.create(...).subscribe(mSubject);
//选择调度并发射数据给观察者
mSubject.subscribe(...);
在前面的例子里我们用到的是PublishSubject
,它只会把在订阅发生的时间点之后来自原始Observable的数据发射给观察者。等一下,这功能听起来是不是有些似曾相识呢?
![](https://img.haomeiwen.com/i1325359/a6037d25f1ab33bd.png)
- 首先
Repositories.repositoryWithInitialValue()
生成原点REventSource
。 - 配置完
Observable
之后进入RFrequency
状态,接着配置数据流的流数。 - 前面配置完成后,数据流
RFlow
生成,之后通过getFrom()
、mergeIn()
、transform()
等方法可进一步进行流式调用;也可以使用attemptXXX()
方法代替原方法,后面接着调用orSkip()
、orEnd()
进行error handling处理。当使用attemptXXX()
方法时,数据流状态会变为RTermination
,它代表此时的状态已具有终结数据流的能力,是否终结数据流要根据failed check触发,结合后面跟着调用的orSkip()
、orEnd()
,我们的数据流会从RTermination
再次切换为RFlow
,以便进行后面的流式调用。
- 经过前面一系列的流式处理,我们需要结束数据流时,可以选择调用
thenXXX()
方法,对数据流进行最终的处理,处理之后,数据流状态会变为RConfig
;也可以为此行为添加error handling处理,选择thenAttemptXXX()
方法,后面同样接上orSkip()
、orEnd()
即可,最终数据流也会转为Rconfig
状态。 - 此时,我们可以在结束前按需要选择对数据流进行最后的配置,例如:调用
onDeactivation()
配置从“订阅”到“取消订阅”的过程是否需要继续执行数据流等等。 - 一切都部署完毕后,我们
compile()
这个RConfig
,得到最终的成型的Repository
,它具有添加Updatable
、发送数据通知Receiver
的能力。 - 我们根据需要添加
Updatable
,repository
在数据流处理完成后会通过update()
发送event通知Updatable
。 -
Updatable
收到通知后则会拉取repository
的成果数据,并将数据通过accept()
发送给Receiver
。完成 Push event, pull data 的流程。
以上就是一次Agera的流式调用的内部基本流程。可以看到,除了 Push event, pull data 这一特点、goLazy的加载模式(本文未介绍)等,依托于较为精简的方法,Agera的流式调用过程同样也能够做到过程清晰,并且上手难度相较于RxJava也要简单一些,开源作者是Google的团队也让一些G粉对其好感度提升不少。不过Agera目前版本则是 agera-1.0.0-rc2,未来的版本还有很多不确定因素,相比之下Rx系列发展了这么久,框架已经相对成熟。究竟用Agera还是RxJava,大家按自己的喜好选择吧。
新人处女作,文章中难免会有错误遗漏以及表述不清晰的地方,希望大家多多批评指正,谢谢!
参考&拓展:
RxJava Wiki
Agera Wiki
给 Android 开发者的 RxJava 详解
Google Agera vs. ReactiveX
When Iron Man becomes reactive
Top 7 Tips for RxJava on Android
How to Keep your RxJava Subscribers from Leaking
RxJava – the production line
网友评论