准确来讲,RxAndroid 是隶属于 ReactiveX 组织的,Jake Wharton 作为参与者,贡献了大量的代码(从 git 提交历史记录可查询到),而且这个框架短小精悍,不至于像 RxJava 那么庞大,让人望而却步,非常值得一读,因此将她归为【拆 Jake Wharton 系列】之一,这系列陆续创作中,欢迎关注。
(一) 你将获得什么
每个框架有每个框架的使命,阅读源码,可以挖掘相应的技术点,阅读源码的乐趣便在于此。通过阅读 RxAndroid 源码和本文,你将获得:
- RxJava、RxAndroid 和 Android 的连接
- Rx 世界里钩子的实现套路
- 高覆盖率的单元测试
- Robolectric 对主线程的操纵
- 使用
CountDownLatch
来测试线程
(二)RxAndroid 简介
RxJava 中线程的变换和函数式编程与 Android 相得益彰,但是 RxJava 并非为 Android 量身打造。在线程变换的过程中,Android 有独特的 UI 主线程的概念,因此,需要一个框架来连接 Android 和 RxJava。RxAndroid充当了该角色。
所以很明确,RxAndroid 的使命在于提供 Android 主线程的变换,代码如下:
Observable.just("one", "two", "three", "four", "five")
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(/* an Observer */);
其中, AndroidSchedulers.mainThread()
便是这个框架提供的能力。
说起 Android 主线程通讯问题,必不可少的关联到 Handler、Looper、Message、HandlerThread等(可以查看笔者的另一系列文章《Handler 和他的小伙伴们》)。
因此 RxAndroid 提供了更通用的能力,可以指定任意的 Looper 来进行任意线程之间的通讯:public static Scheduler from(Looper looper)
,代码举例如下:
mHandlerThread = new HandlerThread("HandlerThread");
mHandlerThread.start();
Observable.just("one", "two", "three", "four", "five")
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.from(mHandlerThread.getLooper())
.subscribe(/* an Observer */);
本文基于 RxJava 和 RxAndroid 2.0.1 进行源码分析,Github地址如下:
https://github.com/ReactiveX/RxAndroid/releases/tag/2.0.1
(三)源码概览
RxAndroid 非常简洁,只有四个类,为了增加趣味性,对于其中的三个核心类,笔者称之为面子、里子和钩子。
面子和里子,互为表里,面子是外在,靠里子支撑;里子是内涵,靠面子表现。
关于面子和里子,软件世界里有个更专业的称呼,叫门面模式。表里如一,是值得尊敬的品格,现实世界如此,软件世界也是如此,而 RxAndroid 更是如此。
以下是这个框架的精华部分:
- AndroidSchedulers:面子,即框架的门面,它的作用在简介部分已经说明。
- HandlerScheduler:里子,这个类框架的核心,由它处理与 Android 主线程通讯的逻辑。
- RxAndroidPlugins:钩子,提供了行为的扩展。
- MainThreadDisposable:这个类是抽象类,提供了资源释放的生命周期供重写,它的作用是确保
onDispose()
在主线程执行,这个类的解析非本文的重点。 - 大量的单元测试:UT 同样是框架的精华部分,这个框架 UT 非常完善,值得学习。
(四)面子 —— AndroidSchedulers
在上文的简介中,我们已经领略到 AndroidSchedulers 作为门面的简洁,它仅对外暴露了两个方法。这里重点分析一下 AndroidSchedulers.mainThread()
。
在此之前,我们先看下命名。RxJava 关于线程的取值,同样也有个门面类,相关的代码如下:
- Schedulers.io()
- Schedulers.computation()
- Schedulers.newThread()
- Schedulers.single()
Schedulers
和 AndroidSchedulers
,以及 mainThread()
和上述方法在命名和实现上保持了高度的一致性。
接下来回到源码解析。mainThread
的实现并不复杂,但由于埋伏了两个钩子(RxAndroidPlugins),代码便显得莫名其妙了,所以我们先忽视所有关于钩子的逻辑,将代码精简如下:
public static Scheduler mainThread() {
return new HandlerScheduler(new Handler(Looper.getMainLooper()));
}
如此一来,这个门面类的逻辑就十分简单了,同时也呈现出了这个框架的里子——HandlerScheduler,并且持有了主线程的 Looper 对象。
(五)里子 —— HandlerScheduler
见名知意,这是一个与 Handler 有关的 Scheduler。与上文一样,我们先来讨论下命名的事情。在 RxJava 中,提供的默认线程我们都可以找到对应的实现,分别是:SingleScheduler、ComputationScheduler、IoScheduler 和 NewThreadScheduler,因此,这仍然是一个固定套路。
以上所有的 XxxxxScheduler
都有个共同的抽象父类,代码精简如下,
public abstract class Scheduler {
public abstract Worker createWorker();
public abstract static class Worker implements Disposable {
public abstract Disposable schedule(Runnable run, long delay, TimeUnit unit);
}
}
因此 HandlerScheduler 只要根据父类的规范,做相应的抽象方法实现即可,其中 Worker.schedule()
的重写是关键,由于涉及到 Android 主线程通讯,该方法的实现中将使用到 Handler 机制。所以我们先来简单回顾下 Handler 怎么进行主线程的通讯。
当我们需要与主线程通讯时,发起的最终实现都是一致的:
handler.sendMessageAtTime(Message msg, long uptimeMillis)
那么主线程如何处理消息呢?共有两种方式:
- 重写 Handler 的 handleMessage()
- 为 Message 指定 callback 属性(Runnable 类型),消息发出后,callback将会在主线程中回调。
以上两种方式,通过阅读 Handler 的 dispatchMessage()
源码可获知:
/**
* Handle system messages here.
*/
public void dispatchMessage(Message msg) {
if (msg.callback != null) {
handleCallback(msg);
} else {
if (mCallback != null) {
if (mCallback.handleMessage(msg)) {
return;
}
}
handleMessage(msg);
}
}
HandlerScheduler 使用了第2种方式与主线程通讯,源码精简如下:
@Override
public Worker createWorker() {
return new HandlerWorker(handler);
}
private static final class HandlerWorker extends Worker {
//省略部分代码
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
// 对run对象进行代理,增加异常处理和释放资源的逻辑
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
// 内部将执行message.callback = scheduled
Message message = Message.obtain(handler, scheduled);
message.obj = this;
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
return scheduled;
}
}
行文至此,还有一个很重要的问题未解决:** Message 对象的回调函数 callback(Runnable 类型)的具体实现是什么? **
通过代码 debug 可以轻易的获取到答案,但在此之前,我们先大胆预测一下:由于主线程执行的是 Observer
实例中的 onNext()
、onCompleted()
和 onError()
,因此 Runnable 便是由 Observer
实例封装而来,并且在合适的时机执行上述三个方法。
事实如我们预测的一样,Runnable 的具体实现为 ObservableObserveOn
类中的 ObserveOnObserver
内部类对象,它是 Runnable 的实现类。
HandlerScheduler
实现了 RxJava 、RxAndroid 和 Android 之间的连接。
(六)钩子 —— RxAndroidPlugins
何为钩子
首先解释下钩子的概念。一面墙壁光滑明亮,自然是赏心悦目,但是具备的功能性就弱了些,于是我们在墙壁上安置了一些钩子,通过钩子,我们可以挂上一些性感衣物或者一副世界名画,这面墙壁的功能性便大大增强了。
墙壁便是 RxAndroid,并且内置了不少的钩子,找到这些钩子后,可以做很多扩展的事情,比如输出日志、异常处理和单元测试的辅助等。
纵观 RxJava 和 RxAndroid 源码,埋伏了大量的钩子,这也是造成一些源码阅读起来比较费解的原因所在。所有的钩子的读写的逻辑都内聚在 Plugins 中,RxJava 中是 RxJavaPlugins
,RxAndroid 中则是 RxAndroidPlugins
,同样也保持了命名的一致性,而两个 Plugins 类,也可以认为是所有钩子的门面类。
发现钩子
在讲面子这一节的时候,笔者对 AndroidSchedulers.mainThread()
源码做了精简,实际上这里埋伏了个钩子 onMainThreadHandler
:
//MAIN_THREAD 同样埋伏了钩子,此处不做介绍,最终将返回HandlerScheduler对象
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
// 以下为 RxAndroidPlugins.java
public static Scheduler onMainThreadScheduler(Scheduler scheduler) {
// 钩子:onMainThreadHandler
Function<Scheduler, Scheduler> f = onMainThreadHandler;
if (f == null) {
return scheduler;
}
return apply(f, scheduler);
}
// 为钩子赋值
public static void setMainThreadSchedulerHandler(Function<Scheduler, Scheduler> handler) {
onMainThreadHandler = handler;
}
为了加深理解,可以对照下面的流程图查看,大部分的钩子都是基于同样的套路来实现的。
钩子的实现套路.pngRxAndroid 和 RxJava 2.x 内置了大量的钩子,而他们都以 get
和 set
的形式对外部提供读写。如下图:
对于钩子,我们需要一些具体的实例来加深理解,并且希望从框架源码本身来寻找实例,此时,单元测试将大展身手。
(七)单元测试是框架最好的说明书
钩子的 UT 解读
结合上一节,我们来解析下这个框架的单元测试,挖掘源码本身的更多信息量。
针对钩子的逻辑,我们一起来看下其中的一个测试方法:AndroidSchedulersTest
中的 mainThreadCallsThroughToHook()
,方法名其实已经表明的 UT 的测试意图,即对该场景进行测试:通过钩子(Hook)来执行 AndroidSchedulers.mainThread()
方法。
这个例子告诉我们:
- 如何为钩子赋值,并定义扩展的行为
- 钩子中扩展的行为触发的时机
所以说,单元测试是框架最好的说明书。
高覆盖率
通过 AS 的 Run Tests with Coverage,数据表明:这个框架的单元测试行覆盖率达到 91%,如下图:
高覆盖率
Robolectric 对主线程的操纵
RxAndroid 使用 Robolectric 对 Android 相关的逻辑进行测试。通过 ShadowLooper
可以操纵主线程,如下图所示,此 UT 位于 HandlerSchedulerTest
的 directScheduleOnceWithDelayPostsWithDelay()
。
这个例子告诉我们:
-
ShadowLooper.runUiThreadTasks()
可以模拟主线程执行 -
ShadowLooper.idleMainLooper()
可以指定时间来阻塞主线程 - 除此之外
ShadowLooper
还提供了很多好用的 api 来操纵主线程,可以通过 ShadowLooper 的源码去了解这些 api 的用途。
另外,MainThreadDisposableTest
中的 UT 向我们展示了如何使用 CountDownLatch
来测试线程,有兴趣的同学可以阅读这部分源码。
总而言之,一个优秀的框架中的单元测试,既能帮助我们更好的了解框架本身,也能帮助我们提高单元测试的技巧。
(八)总结
面子、里子和钩子组成了 RxAndroid 的全部,而单元测试在此基础上起到了锦上添花的作用,这依然是一个麻雀虽小五脏俱全的优秀开源框架,每一个优秀的框架,都是一本书一部电影,值得反复揣摩,用心研究。
参考资料
https://github.com/ReactiveX/RxJava/wiki/What%27s-different-in-2.0
https://github.com/ReactiveX/RxAndroid
网友评论