观察者设计模式
提到RxJava 有点了解的就知道这个框架是基于观察者模式的,先来温习下观察者模式。
观察者模式 UML被观察者(Observable)持有对观察者(Observer) 的引用,当被观察者发生某些变化,调用 观察者的 API 就可以了。
RxJava 正是基于观察者模式的响应式编程框架。
基于事件流编程
啥是响应式?其实就是事件驱动,事件一发生,就会被相应接收处理。就这么简单,一张图就能表达。有一个起点和一个终点,从起点发送事件,终点接收处理事件。他们之间与一个订阅关系。
Rxjava 使用示例
先看下 Rxjava 的基本使用
在使用的时候需要引入 rxjava 包
// 依赖 RxAndroid 2x 的依赖库,完整的支持在Android 中使用,比如一些线程调度
api 'io.reactivex.rxjava2:rxandroid:2.1.0'
// 增加 Rxjava 2x 的依赖库
api io.reactivex.rxjava:1.3.0"
使用时,注意要引入 io 包中的类
// 起点 可以先不用管,用 create 创建了一个 Observable
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
}
})
// 订阅
.subscribe(
// 终点 也可以不用管,new 了一个 Observer
new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
Rxjava 的上游与下游
事件流向分层:事件流可以分成上游与下游,事件的起点也就是上游,而终点就是事件的下游。
上游可以发送多个消息给下游,像下面这样
拆分实现时,分别创建观察者和被观察者,再订阅
// 上游 Observable 被观察者
Observable observable = Observable.create(new ObservableOnSubscribe<Integer>() {
// 发射器 发射事件
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
log("1: 上游发射事件");
// 发射事件
emitter.onNext(1);
log("2: 上游发射完成");
}
});
// 下游 Observer 观察者 处理事件
Observer<Integer> observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
log("3 下游处理事件 onNext " + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
// 上游 订阅 下游
observable.subscribe(observer);
当然也可以像最初那样,链式调用
RxJava 调用流程
// 上游 Observable 被观察者
Observable.create(new ObservableOnSubscribe<Integer>() {
// 发射器 发射事件
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
log("2: 开始发射");
// 发射事件
emitter.onNext(1);
emitter.onComplete(); // 4
log("5:发射完成");
}
})
//订阅
.subscribe(new Observer<Integer>() {
// 下游 Observer 观察者 处理事件
@Override
public void onSubscribe(Disposable d) {
// 弹出加载框 loading……
log("1: 订阅成功");
}
@Override
public void onNext(Integer integer) {
log("3: 下游接收 onNext " + integer);
}
@Override
public void onError(Throwable e) {
log("4: 下游接收 onError");
}
@Override
public void onComplete() {
// 隐藏加载 loading……
// 只有接收完成之后,上游的 log才会打印
log("4: 下游接收完成 onComplete");
}
});
//-----------
结果
1: 订阅成功
2: 开始发射
3: 下游接收 onNext 1
4: 下游接收完成 onComplete
5:发射完成
发射器事件
RxJava 中有个事件发射器,用来发送事件。发射器有Emitter 有三个方法,onNext、onComplete、onError,调用后会对应下游观察者的同名方法。
使用中有下面几个需要注意的地方
- onNext/onComplete/onError
emitter.onNext(1);
emitter.onComplete();
// onComplete 之后,以下收不到
emitter.onNext(2)
↑执行 emitter.onComplete 之后,继续发射,下游不再接受上游事件
emitter.onNext(1);
emitter.onError(new IllegalAccessError("error"));
// onError 之后,以下收不到
emitter.onNext(2);
↑执行 emitter.onError 之后,继续发射,下游不再接受上游事件
- onError 和 onComplete 顺序测试
emitter.onNext(1);
emitter.onComplete();
emitter.onError(new IllegalAccessError("error"));
↑ onComplete 后再调用 onError 会报错
emitter.onError(new IllegalAccessError("error"));
emitter.onComplete();
↑ 先发射 onError 再发射 onComplete 不会报错,但 onComplete也不会接收。
订阅关系
上游和下游可以有订阅关系,这种关系也可以被切断
// 上游 Observable 被观察者
Observable.create(new ObservableOnSubscribe<Integer>() {
// 发射器 发射事件
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
log("上游发射事件");
// 发射事件
emitter.onNext(1);
emitter.onNext(2);
log("上游发射完成");
}
}).subscribe(new Observer<Integer>() {
// 下游 Observer 观察者 处理事件
Disposable disposable;
@Override
public void onSubscribe(Disposable d) {
// 事件被订阅
disposable = d;
}
@Override
public void onNext(Integer integer) {
log("下游处理事件 onNext " + integer);
// 接收上游的一个时间之后,就切断下游,让下游不再接收,但上游可以继续发
disposable.dispose();
// 实际用法,可在 onDestroy 中 使用上面方法,切断下游
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
//---------
结果
上游发射事件
下游处理事件 onNext 1
上游发射完成
可以看到事件2 没有被接收了,但还是会发送的,只是不接收。
总结 Part
- RxJava 是基于事件流思想的,有起点和终点,所以一旦满足起点和终点这样的需求,都可以使用 RxJava 来实现。
- 标准中的观察者设计模式,一个被观察者对应多个观察者,多次注册。RxJava 是改装后的观察者设计模式,一个被观察者,一个订阅,一个观察者。
- RxJava中上游 Observable 被观察者,下游 Observer 观察者
- ObservableEmitter<emitter> emitter 发射器 发射事件
- 可以拆分写,也可以链式调用
- RxJava 的大致流程
- RxJava 发射器方法的使用注意点
- onComplete/onError 之后不会再接收后面发射的内容;
- onComplete 后再 onError 会报错;先发射 onError 再发射 onComplete 不会报错,但 onComplete 也不会接收了
- Disposable 切断下游,不再接收上游发射的事件
写在后面
其实使用 RxJava 比较久了,是配合 Retrofit 使用也好,还是用它来处理线程等等,都没有系统的去学习其架构思想,最近重新学习一下搞一搞。
网友评论