1. 响应式编程
1.1 响应式编程概念
- 响应式编程是一种通过异步和数据流来构建事物关系的编程模型。
- 事物的关系 也可以说成是 业务逻辑 ,是响应式编程的核心理念。
- 数据流 和 异步 是实现这个核心理念的关键。异步和数据流都是为了正确的构建事物的关系而存在的。
1.2 响应式编程demo
int a=1;
int b=a+1;
System.out.print(“b=”+b) // b=2
a=10;
System.out.print(“b=”+b) // b=2
上面是一段很常见的代码,简单的赋值打印语句,但是这种代码有一个缺陷,那就是如果我们想表达的并不是一个赋值动作,而是b和a之间的关系,即无论a如何变化,b永远比a大1。那么可以想见,我们就需要花额外的精力去构建和维护一个b和a的关系。
而响应式编程的想法正是企图用某种操作符帮助你构建这种关系。
它的思想完全可以用下面的代码片段来表达:
int a=1;
int b <= a+1; // <= 符号只是表示a和b之间关系的操作符
System.out.print(“b=”+b) // b=2
a=10;
System.out.print(“b=”+b) // b=11
响应式编程的思想,它希望有某种方式能够构建关系,而不是执行某种赋值命令。
应用初始化.png比如在收单应用初始化逻辑中,先完成SDK初始化,数据库初始化,签到,才会跳转到交易菜单界面。
在响应式编程中,这一流程可以这样解读
应用初始化2.png
在初始化过程中,SDK初始化,数据库初始化,签到这些业务完成之后才会去安排页面跳转的操作,那么这些上游的业务在自己工作完成之后,就需要通知下游,通知下游的方式有很多种,响应式编程的方式就是通过数据(事件)流。
每一个业务完成后,都会有一条数据(一个事件)流向下游,下游的业务收到这条数据(这个事件),才会开始自己的工作。
我们能发现SDK初始化,数据库初始化,签到这三个业务本身相互独立,应当在不同的线程环境中执行,以保证他们不会相互阻塞。而假如没有异步编程,我们可能只能在一个线程中顺序调用这三个相对耗时较多的业务,最终再去做页面跳转,这样做不仅没有忠实反映业务本来的关系,而且会让你的程序“反应”更慢。
总的来说,异步和数据流都是为了正确的构建事务的关系而存在的。只不过,异步是为了区分出无关的事务,而数据流(事件流)是为了联系起有关的事务。
2. RxJava
Rx是响应式拓展,即支持响应式编程的一种拓展,用来处理事件和异步任务。
2.1 RxJava的优点
简洁。而且当业务越繁琐越复杂时这一点就越显出优势——它能够保持简洁。
2.2 RxJava的基本概念
我们都知道监听者模式,订阅模式这些概念。而Observable和Subscribers的英文意思就是如此。我们大概也知道差不多和监听者模式差不多。
- Observable事件源,被观察者。
- Observer / Subcriblers 观察者,事件订阅者
- subscribe() 方法,绑定Observable与Subcribler或者Observabler
- 事件 (包括 onNext,onComplete,onError 等事件)
以第一章的初始化应用为例:
Observable obserInitSDK=Observable.create((context)->{initSDK(context)}).subscribeOn(Schedulers.newThread())
Observable obserInitDB=Observable.create((context)->{initDatabase(context)}).subscribeOn(Schedulers.newThread())
Observable obserLogin=Observable.create((context)->{Login(context)})
.subscribeOn(Schedulers.newThread())
// 合并多个Observables的发射物
Observable observable = Observable.merge(obserInitSDK,obserInitDB,obserLogin)
// 订阅被观察者
observable.subscribe(()->{startActivity()})
当initSDK,initDB,Login都是耗时较长的操作时,遵照业务关系编写响应式代码可以极大的提高程序的执行效率,降低阻塞。
从上面代码中,可以看出,响应式编程有如下优点
- 在业务层面实现代码逻辑分离,方便后期维护和拓展
- 极大提高程序响应速度,充分发掘CPU的能力
- 帮助开发者提高代码的抽象能力和充分理解业务逻辑
- Rx丰富的操作符会帮助我们极大的简化代码逻辑
2.3 操作符决策树
RxJava的几种主要操作符:
- 创建操作:直接创建一个Observable
- 组合操作:组合多个Observable
- 变换操作:对Observable发射的数据执行变换操作
- 过滤操作:从Observable发射的数据中取特定的值
- 条件/布尔/过滤操作:转发Observable的部分值
- 算术/聚合操作:对Observable发射的数据序列求值
创建操作
用于创建Observable的操作符
-
Create
— 通过调用观察者的方法从头创建一个Observable -
Defer
— 在观察者订阅之前不创建这个Observable,为每一个观察者创建一个新的Observable -
Empty/Never/Throw
— 创建行为受限的特殊Observable -
From
— 将其它的对象或数据结构转换为Observable -
Interval
— 创建一个定时发射整数序列的Observable -
Just
— 将对象或者对象集合转换为一个会发射这些对象的Observable -
Range
— 创建发射指定范围的整数序列的Observable -
Repeat
— 创建重复发射特定的数据或数据序列的Observable -
Start
— 创建发射一个函数的返回值的Observable -
Timer
— 创建在一个指定的延迟之后发射单个数据的Observable
变换操作
用于对Observable发射的数据进行变换
-
Buffer
— 缓存,可以简单的理解为缓存,它定期从Observable收集数据到一个集合,然后把这些数据集合打包发射,而不是一次发射一个 -
FlatMap
— 扁平映射,将Observable发射的数据变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable,可以认为是一个将嵌套的数据结构展开的过程。 -
GroupBy
— 分组,将原来的Observable分拆为Observable集合,将原始Observable发射的数据按Key分组,每一个Observable发射一组不同的数据 -
Map
— 映射,通过对序列的每一项都应用一个函数变换Observable发射的数据,实质是对序列中的每一项执行一个函数,函数的参数就是这个数据项 -
Scan
— 扫描,对Observable发射的每一项数据应用一个函数,然后按顺序依次发射这些值 -
Window
— 窗口,定期将来自Observable的数据分拆成一些Observable窗口,然后发射这些窗口,而不是每次发射一项。类似于Buffer,但Buffer发射的是数据,Window发射的是Observable,每一个Observable发射原始Observable的数据的一个子集
过滤操作
用于从Observable发射的数据中进行选择
-
Debounce
— 只有在空闲了一段时间后才发射数据,通俗的说,就是如果一段时间没有操作,就执行一次操作 -
Distinct
— 去重,过滤掉重复数据项 -
ElementAt
— 取值,取特定位置的数据项 -
Filter
— 过滤,过滤掉没有通过谓词测试的数据项,只发射通过测试的 -
First
— 首项,只发射满足条件的第一条数据 -
IgnoreElements
— 忽略所有的数据,只保留终止通知(onError或onCompleted) -
Last
— 末项,只发射最后一条数据 -
Sample
— 取样,定期发射最新的数据,等于是数据抽样,有的实现里叫ThrottleFirst -
Skip
— 跳过前面的若干项数据 -
SkipLast
— 跳过后面的若干项数据 -
Take
— 只保留前面的若干项数据 -
TakeLast
— 只保留后面的若干项数据
组合操作
用于将多个Observable组合成一个单一的Observable
-
And/Then/When
— 通过模式(And条件)和计划(Then次序)组合两个或多个Observable发射的数据集 -
CombineLatest
— 当两个Observables中的任何一个发射了一个数据时,通过一个指定的函数组合每个Observable发射的最新数据(一共两个数据),然后发射这个函数的结果 -
Join
— 无论何时,如果一个Observable发射了一个数据项,只要在另一个Observable发射的数据项定义的时间窗口内,就将两个Observable发射的数据合并发射 -
Merge
— 将两个Observable发射的数据组合并成一个 -
StartWith
— 在发射原来的Observable的数据序列之前,先发射一个指定的数据序列或数据项 -
Switch
— 将一个发射Observable序列的Observable转换为这样一个Observable:它逐个发射那些Observable最近发射的数据 -
Zip
— 打包,使用一个指定的函数将多个Observable发射的数据组合在一起,然后将这个函数的结果作为单项数据发射
错误处理
用于从错误通知中恢复
辅助操作
用于处理Observable的操作符
-
Delay
— 延迟一段时间发射结果数据 -
Do
— 注册一个动作占用一些Observable的生命周期事件,相当于Mock某个操作 -
Materialize/Dematerialize
— 将发射的数据和通知都当做数据发射,或者反过来 -
ObserveOn
— 指定观察者观察Observable的调度程序(工作线程) -
Serialize
— 强制Observable按次序发射数据并且功能是有效的 -
Subscribe
— 收到Observable发射的数据和通知后执行的操作 -
SubscribeOn
— 指定Observable应该在哪个调度程序上执行 -
TimeInterval
— 将一个Observable转换为发射两个数据之间所耗费时间的Observable -
Timeout
— 添加超时机制,如果过了指定的一段时间没有发射数据,就发射一个错误通知 -
Timestamp
— 给Observable发射的每个数据项添加一个时间戳 -
Using
— 创建一个只在Observable的生命周期内存在的一次性资源
条件和布尔操作
用于单个或多个数据项,也可用于Observable
-
All
— 判断Observable发射的所有的数据项是否都满足某个条件 -
Amb
— 给定多个Observable,只让第一个发射数据的Observable发射全部数据 -
Contains
— 判断Observable是否会发射一个指定的数据项 -
DefaultIfEmpty
— 发射来自原始Observable的数据,如果原始Observable没有发射数据,就发射一个默认数据 -
SequenceEqual
— 判断两个Observable是否按相同的数据序列 -
SkipUntil
— 丢弃原始Observable发射的数据,直到第二个Observable发射了一个数据,然后发射原始Observable的剩余数据 -
SkipWhile
— 丢弃原始Observable发射的数据,直到一个特定的条件为假,然后发射原始Observable剩余的数据 -
TakeUntil
— 发射来自原始Observable的数据,直到第二个Observable发射了一个数据或一个通知 -
TakeWhile
— 发射原始Observable的数据,直到一个特定的条件为真,然后跳过剩余的数据
算术和聚合操作
用于整个数据序列
-
Average
— 计算Observable发射的数据序列的平均值,然后发射这个结果 -
Concat
— 不交错的连接多个Observable的数据 -
Count
— 计算Observable发射的数据个数,然后发射这个结果 -
Max
— 计算并发射数据序列的最大值 -
Min
— 计算并发射数据序列的最小值 -
Reduce
— 按顺序对数据序列的每一个应用某个函数,然后返回这个值 -
Sum
— 计算并发射数据序列的和
另外还有连接操作、转换操作,可以通过文档查看使用方法。
2.4 RxJava 基础框架解析
- 先从比较常用的create方法看
public static Completable create(CompletableOnSubscribe source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new CompletableCreate(source));
}
在 create 方法中,其实很简单,只是对 source 进行判空处理,并将 source 用 ObservableCreate 包装起来,并返回回去。下面让我们一起来看一下 ObservableCreate方法
public final class CompletableCreate extends Completable {
final CompletableOnSubscribe source;
public CompletableCreate(CompletableOnSubscribe source) {
this.source = source;
}
// daizy -- 持有了上游 source 的引用,并重写 subscribeActual 方法
@Override
protected void subscribeActual(CompletableObserver observer) {
Emitter parent = new Emitter(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
ObservableCreate 也很简单,它是 Observable 的子类,持有了上游 source 的引用,并重写 subscribeActual 方法,这个方法要结合订阅Subscribe源码看。
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(CompletableObserver observer) {
// 检查 observer 是否为 null,为 null 抛出异常
ObjectHelper.requireNonNull(observer, "observer is null");
try {
// RxJavaPlugins 插件的,暂时不管
observer = RxJavaPlugins.onSubscribe(this, observer);
// 检查 observer 是否为 null,为 null 抛出异常
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null CompletableObserver. Please check the handler provided to RxJavaPlugins.setOnCompletableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
subscribeActual(observer);
} catch (NullPointerException ex) { // NOPMD
throw ex;
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
RxJavaPlugins.onError(ex);
throw toNpe(ex);
}
}
subscribe 方法也比较简单,大概可以分为以下两步:
- 第一步,对observer 进行判空,为空则抛出异常
- 第二步,调用 subscribeActual 方法,在Observable类 中,subscribeActual 是一个抽象方法,要关注的是其实现类的subscribeActual方法。从上面的分析,我们知道,当我们调用 Observable create(ObservableOnSubscribe source) 方法的时候,最终会返回 ObservableCreate 实例。因此,我们只需要关注 ObservableCreate 的 subscribeActual 方法。
protected void subscribeActual(Observer<? super T> observer) {
// CreateEmitter 是 ObservableCreate 的一个静态内部类
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
// source 是上游 ObservableOnSubscribe 的引用
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
继续看ObservableCreate的subscribeActual方法,在执行observer.onSubscribe 方法的时候,会将parent对象作为方法参数暴露出去,parent即是CreateEmitter,可以通过它的dispose方法取消订阅关系。
接着在调用source.subscribe(parent)的时候,会先调用ObservableOnSubscribe 的 subscribe 方法。
因此,我们可以得出,调用的顺序是:
Observable.subscrible -> Observable.subscribleActual -> Observable.subscribleActual -> observer.onSubscribe -> ObservableOnSubscribe.subscribe(emitter)
emitter是CreateEmitter的实例,包装了observe,调用emitter的方法,就会调用observe的 onNext 、onComolete/onError方法。
以上是RxJava基本原理,Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer,并且回调 Observer 的相应的方法。
2.5 RxJava 线程切换
Observable通过subscribeOn方法来指定线程
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
通过代码可以看出,先对scheduler进行判空,然后用ObservableSubscribeOn 将scheduler 包装起来,接下来研究看看ObservableSubscribeOn这个类的源码。
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
首先先来看他的构造函数 ,有两个参数 source ,scheduler。
- source 代表上游的引用,是 Observable 的一个实例
- scheduler 调度器可以通过 Schedulers.newThread() 或者 Schedulers.io() 创建相应的实例。
RxJava 可用的调度器大概有下面几种,根据需求选择:
我们主要看下这个方法
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
SubscribeTask 这个类,他是 ObservableSubscribeOn 的一个非静态内部类,可以看到 其实也比较简单,他实现了 Runnable 接口,并且持有 parent 引用。
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
在 run 方法中,通过 source.subscribe(parent) 建立联系。因而,当我们的 SubscribeTask 的 run 方法运行在哪个线程,相应的 observer 的 subscribe 方法就运行在哪个线程。
接下来再看看scheduleDirect的实现
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
这个方法主要是将task包装成DisposeTask,然后通过Worker进行调度。再看看Worker 是在做啥。
Scheduler我们以NewThreadScheduler为例子
public final class NewThreadScheduler extends Scheduler {
final ThreadFactory threadFactory;
private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler";
/**
* daizy -- 线程池
*/
private static final RxThreadFactory THREAD_FACTORY;
/** The name of the system property for setting the thread priority for this Scheduler. */
private static final String KEY_NEWTHREAD_PRIORITY = "rx2.newthread-priority";
static {
int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
Integer.getInteger(KEY_NEWTHREAD_PRIORITY, Thread.NORM_PRIORITY)));
THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority);
}
public NewThreadScheduler() {
this(THREAD_FACTORY);
}
public NewThreadScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}
@NonNull
@Override
public Worker createWorker() {
// 通过线程池来调度
return new NewThreadWorker(threadFactory);
}
}
通过代码可以看出来,Worker里头封装了线程池,所以RxJava的线程切换,也是基于线程池来处理。
回过来看DisposeTask
static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection {
@NonNull
final Runnable decoratedRun;
@NonNull
final Worker w;
@Nullable
Thread runner;
DisposeTask(@NonNull Runnable decoratedRun, @NonNull Worker w) {
this.decoratedRun = decoratedRun;
this.w = w;
}
@Override
public void run() {
runner = Thread.currentThread();
try {
decoratedRun.run();
} finally {
dispose();
runner = null;
}
}
@Override
public void dispose() {
if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
((NewThreadWorker)w).shutdown();
} else {
w.dispose();
}
}
@Override
public boolean isDisposed() {
return w.isDisposed();
}
@Override
public Runnable getWrappedRunnable() {
return this.decoratedRun;
}
}
DisposeTask 实现了 Disposable,Runnable ,SchedulerRunnableIntrospection 接口,Disposable 接口主要是用来取消订阅关系的 Disposable。
从上面的分析,可以得出Observable.subscribeOn方法,控制Observable的执行线程是通过将 Observable.subscribe(Observer) 的操作放在了指定线程中,当我们调用 subcribe 的时候,它的过程是从下往上的,即下面的 Observable 调用上面的 Observable。
用流程图描述如下:
图片.png
网友评论