观察者模式的运用
传统的Java观察者模式可以参考此篇博客:Java观察者模式案例简析
RxJava 是基于Java的观察者模式开展的。构建被观察者(Observable/Flowable)、观察者(Observer/Subscriber),并通过建立两者的订阅关系实现观察,在事件的传递过程中可以对事件进行各种处理。
在rxjava 1.x、rxjava 2.x里,Observable是被观察者,Observer是观察者,正常逻辑是观察者通过subscribe订阅Observable的事件处理,当Observable发射事件时Observer接收数据。但为了保持流式API风格,观察者订阅被观察者的代码顺序设计有一些调整。
如:
Observable.subscribe(Observer);
RxJava 1.x 和RxJava 2.x的主要区别
在RxJava 2.x中的观察者模式有两种。而Flowable作为被观察者是专门支持背压的。这也是RxJava 1.x 和RxJava 2.x的主要区别。当然还有一些区别是操作符、接口的不兼容更新。
- Observable ( 被观察者 ) / Observer ( 观察者 )
-
Flowable (被观察者)/ Subscriber (观察者)
image.png
RxJava1.x 平滑升级到RxJava2.x
由于RxJava2.0变化较大无法直接升级,幸运的是,官方提供了RxJava2Interop这个库,可以方便地将RxJava1.x升级到RxJava2.x,或者将RxJava2.x转回RxJava1.x。
RxJava2Interop
经典流式API调用风格
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
})
// ...省略很多在发射过程中的流式处理代码
.subscribe(new Observer<Integer>() {
private Disposable mDisposable;
@Override
public void onSubscribe(Disposable d) {
mDisposable = d;
}
@Override
public void onNext(Integer integer) {
Log.d("onNext", "" + integer);
//新增的Disposable可以做到切断的操作,让Observer观察者不再接收上游事件。
if (integer == 3) {
mDisposable.dispose();
Log.d("onNext", "已停止接收事件");
}
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
打印结果:
11-28 13:00:42.195 29930-29930/? D/onNext: 1
11-28 13:00:42.195 29930-29930/? D/onNext: 2
11-28 13:00:42.195 29930-29930/? D/onNext: 3
11-28 13:00:42.195 29930-29930/? D/onNext: 已停止接收事件
Rxjava 线程调度
subscribeOn() 指定的就是发射事件的线程,observerOn 指定的就是订阅者接收事件的线程。
多次指定发射事件的线程只有第一次指定的有效,也就是说多次调用 subscribeOn() 只有第一次的有效,其余的会被忽略。
但多次指定订阅者接收线程是可以的,也就是说每调用一次 observerOn(),下游的线程就会切换一次。
Rxjava 2.x常用操作符
1,Function<T, R> ——将输入的value类型T转换成输出的value类型R。通常结合Map操作符。
/**
* A functional interface that takes a value and returns another value, possibly with a
* different type and allows throwing a checked exception.
*
* @param <T> the input value type
* @param <R> the output value type
*/
public interface Function<T, R> {
/**
* Apply some calculation to the input value and return some other value.
* @param t the input value
* @return the output value
* @throws Exception on error
*/
@NonNull
R apply(@NonNull T t) throws Exception;
}
2,Map——将一个Observable被观察者通过特定函数的执行,转换成另一种Observable被观察者。
在 2.x 中和 1.x 中作用几乎一致,不同点在于:2.x 将 1.x 中的 Func1 和 Func2 改为了 Function 和 BiFunction。
/**
* Returns an Observable that applies a specified function to each item emitted by the source ObservableSource and emits the results of these function applications.
*
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
3,Consumer——接收一个单独的数据,类似于一个简化版的观察者observer。
/**
* A functional interface (callback) that accepts a single value.
* @param <T> the value type
*/
public interface Consumer<T> {
/**
* Consume the given value.
* @param t the value
* @throws Exception on error
*/
void accept(@NonNull T t) throws Exception;
}
4,distinct——去重操作符。即先有的数字保留,重复的数字去除并保留原先顺序的方式输出。
Observable.just(2, 1, 2, 3, 4, 2, 3)
.distinct()
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.d("accept", "" + integer);
}
});
输出
11-28 11:59:17.511 7052-7052/com.zjrb.sjzsw D/accept: 2
11-28 11:59:17.511 7052-7052/com.zjrb.sjzsw D/accept: 1
11-28 11:59:17.511 7052-7052/com.zjrb.sjzsw D/accept: 3
11-28 11:59:17.511 7052-7052/com.zjrb.sjzsw D/accept: 4
5,concat—— 可以做到不交错的发射两个甚至多个 Observable 的发射事件,并且只有前一个 Observable 终止(onComplete) 后才会订阅下一个 Observable。比如可以采用 concat 操作符先读取缓存再通过网络请求获取数据。
案例说明:
Observable observable = Observable.just(1, 2, 3, 4, 5, 6)
.map(new Function<Integer, Integer>() {
@Override
public Integer apply(@NonNull Integer integer) throws Exception {
return integer + 1;
}
});
Observable.concat(Observable.just(-1, -2, -3, -4, -5, -6), observable)
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.d("accept", "" + integer);
}
});
打印输出:看吧,两个Observable是按照顺序依次无交错执行的。
11-29 01:48:50.430 31564-31564/com.zjrb.sjzsw D/accept: -1
11-29 01:48:50.430 31564-31564/com.zjrb.sjzsw D/accept: -2
11-29 01:48:50.430 31564-31564/com.zjrb.sjzsw D/accept: -3
11-29 01:48:50.430 31564-31564/com.zjrb.sjzsw D/accept: -4
11-29 01:48:50.430 31564-31564/com.zjrb.sjzsw D/accept: -5
11-29 01:48:50.430 31564-31564/com.zjrb.sjzsw D/accept: -6
11-29 01:48:50.430 31564-31564/com.zjrb.sjzsw D/accept: 2
11-29 01:48:50.430 31564-31564/com.zjrb.sjzsw D/accept: 3
11-29 01:48:50.430 31564-31564/com.zjrb.sjzsw D/accept: 4
11-29 01:48:50.430 31564-31564/com.zjrb.sjzsw D/accept: 5
11-29 01:48:50.431 31564-31564/com.zjrb.sjzsw D/accept: 6
11-29 01:48:50.431 31564-31564/com.zjrb.sjzsw D/accept: 7
注:熟悉操作符的目的在于,不同场景中都能随时想到有对应的工具可用。
背压
背压:指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略。
因为事件产生的速度远远快于事件消费的速度,最终导致数据积累越来越多,从而导致OOM等异常。这就是背压产生的必要性。
RxJava2.0中,Flowable是能够支持Backpressure的Observable,是对Observable的补充(而不是替代)。所以Observable被观察者支持的API,Flowable也都支持,并且Flowable的API里也都强制支持背压。
背压经典代码
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
if (e.requested() != 0) {
for (int i = 0; i < 10; i++) {
e.onNext(i + 1);
Log.d(TAG, "已发送" + (i + 1) + "个——剩下" + e.requested());
}
e.onComplete();
}
}
}, BackpressureStrategy.LATEST)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
subscription = s;
}
@Override
public void onNext(Integer s) {
Log.d(TAG, "接收 ——" + s);
if (s == 9){
subscription.cancel();
}
}
@Override
public void onError(Throwable t) {
Log.d(TAG, "接收错误——" + t);
}
@Override
public void onComplete() {
}
});
外部调用subscription请求配合配额11个。
if (subscription != null) {
subscription.request(11);
}
在BackpressureStrategy.LATEST背压策略下,上游发射10个事件,下游由外部调用请求发布配额指令,当下游接收到第9个事件时暂停上游发布(此操作会清空上游事件源)。
背压策略
- BackpressureStrategy.MISSING
此策略下,上游发射的数据不做缓存也不丢弃,下游处理溢出的问题。简单说就是没有背压。 - BackpressureStrategy.ERROR
此策略下,在上游发射速度过快并超出下游接收速度时,抛出MissingBackpressureException异常。 - BackpressureStrategy.BUFFER
此策略下,把上游发射过来的所有数据全部缓存在缓存区,不做丢弃,待下游接收。 - BackpressureStrategy.DROP
此策略下,相当于一种令牌机制(或者配额机制),下游通过request请求产生令牌(配额)给上游,上游接到多少令牌,就给下游发送多少数据。当令牌数消耗到0的时候,上游开始丢弃数据。BackpressureStrategy.LATEST的策略和此类似。 - BackpressureStrategy.LATEST
此策略和BackpressureStrategy.DROP的策略类似,但在令牌数为0的时候有一点微妙的区别:onBackpressureDrop直接丢弃数据,不缓存任何数据;而onBackpressureLatest则缓存最新的一条数据,这样当上游接到新令牌的时候,它就先把缓存的上一条“最新”数据发送给下游。
/**
* Represents the options for applying backpressure to a source sequence.
*/
public enum BackpressureStrategy {
/**
* OnNext events are written without any buffering or dropping.
* Downstream has to deal with any overflow.
* <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators.
*/
MISSING,
/**
* Signals a MissingBackpressureException in case the downstream can't keep up.
*/
ERROR,
/**
* Buffers <em>all</em> onNext values until the downstream consumes it.
*/
BUFFER,
/**
* Drops the most recent onNext value if the downstream can't keep up.
*/
DROP,
/**
* Keeps only the latest onNext value, overwriting any previous value if the
* downstream can't keep up.
*/
LATEST
}
Flowable案例代码
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
e.onNext(4);
}
}, BackpressureStrategy.ERROR)
//下面两行代码执行线程切换,达到异步效果
// .subscribeOn(Schedulers.io())
// .observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
subscription = s;
// subscription.request(1);
}
@Override
public void onNext(Integer s) {
// subscription.request(1);
Log.d(TAG, "接收——" + s);
// subscription.cancel();
}
@Override
public void onError(Throwable t) {
Log.d(TAG, "接收错误——" + t);
}
@Override
public void onComplete() {
}
});
这里指定背压策略是BackpressureStrategy.ERROR,这种策略下执行此段代码会报如下错误。
D/rxjava2: 接收错误——io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
因为,上下游是同步的。上游发射了事件但是下游没有接收,就会造成阻塞(即便上游的事件队列长度只有3个 < 128)。为了避免ANR,就要提示MissingBackpressureException异常。
如果恢复第12、13行处理线程切换的代码,表示上下游位于不同线程,是异步状态。此种情形下,上游发射数据后就不会报MissingBackpressureException异常,但虽然上游能正常发射数据,下游同样接收不到数据。
这里涉及到一个知识点:
Flowable默认事件队列大小为128。BackpressureStrategy.BUFFER策略下事件队列无限大,和没有采取背压的Observable ( 被观察者 ) / Observer ( 观察者 )类似了。
注:在处理同一组数据时,Observable ( 被观察者 ) / Observer ( 观察者 )比BackpressureStrategy.BUFFER策略下的Flowable (被观察者)/ Subscriber (观察者)性能更优,内存消耗更少。
在上下游异步的情况下,上游会先把事件发送到长度为128的事件队列中,待下游发送请求数据指令后从事件队列中拉取数据。这种“响应式拉取”的思想用于解决上下游流速不均衡的情况。
上述代码中,第19、24行代码是表示下游接收前、接收后发送请求配额指令给上游。也可以通过subscription.request(n);在外围调用发送n个请求配额给上游以获取数据。
API自带的被观察者的背压策略
API内的其他被观察者,API也为我们提供了背压策略方法:
- onBackpressureBuffer()
- onBackpressureDrop()
- onBackpressureLatest()
示例代码如下:
Flowable.interval(1, TimeUnit.MICROSECONDS)
.onBackpressureDrop() //加上背压策略
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Long>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
subscription = s;
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Long aLong) {
Log.d(TAG, "onNext: " + aLong);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
});
如果不加背压策略,则会报错:
D/rxjava2: onSubscribe
W/rxjava2: onError:
io.reactivex.exceptions.MissingBackpressureException: Can't deliver value 128 due to lack of requests
at io.reactivex.internal.operators.flowable.FlowableInterval$IntervalSubscriber.run(FlowableInterval.java:87)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:428)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:278)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:273)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
at java.lang.Thread.run(Thread.java:761)
响应式编程
当上下游在同一个线程中的时候,在下游调用request(n)就会直接改变上游中的requested的值,多次调用便会叠加这个值,而上游每发送一个事件之后便会去减少这个值,当这个值减少至0的时候,上游若继续发送事件便会抛异常了。
案例情景一:同步环境下,在下游发出10个请求配额情况下,上游发射130个事件。
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
for (int i = 0; i < 130; i++) {
e.onNext(i + 1);
Log.d(TAG, "requested—left—" + e.requested());
}
// e.onComplete();
}
}, BackpressureStrategy.ERROR)
//下面两行代码执行线程切换,达到异步效果
// .subscribeOn(Schedulers.io())
// .observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
subscription = s;
subscription.request(10);
}
@Override
public void onNext(Integer s) {
Log.d(TAG, "接收 ——" + s);
}
@Override
public void onError(Throwable t) {
Log.d(TAG, "接收错误——" + t);
}
@Override
public void onComplete() {
}
});
打印LOG日志:
D/rxjava2: 接收 ——1
D/rxjava2: requested—left—9
D/rxjava2: 接收 ——2
D/rxjava2: requested—left—8
D/rxjava2: 接收 ——3
D/rxjava2: requested—left—7
D/rxjava2: 接收 ——4
D/rxjava2: requested—left—6
D/rxjava2: 接收 ——5
D/rxjava2: requested—left—5
D/rxjava2: 接收 ——6
D/rxjava2: requested—left—4
D/rxjava2: 接收 ——7
D/rxjava2: requested—left—3
D/rxjava2: 接收 ——8
D/rxjava2: requested—left—2
D/rxjava2: 接收 ——9
D/rxjava2: requested—left—1
D/rxjava2: 接收 ——10
D/rxjava2: requested—left—0
D/rxjava2: 接收错误——io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
D/rxjava2: requested—left—0
下游不再发送请求配额时,上游的配额令牌就为0。此时上游还有事件强行发个的话,就会出现异常。这里可以验证上面说的结论。
案例情景二:异步环境,在下游发出10个请求配额情况下,上游发射128个事件。
只是多了切换线程的代码:
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
打印LOG日志:
D/rxjava2: requested—left—127
D/rxjava2: requested—left—126
D/rxjava2: requested—left—125
...
D/rxjava2: requested—left—2
D/rxjava2: requested—left—1
D/rxjava2: requested—left—0
D/rxjava2: 接收 ——1
D/rxjava2: 接收 ——2
D/rxjava2: 接收 ——3
D/rxjava2: 接收 ——4
D/rxjava2: 接收 ——5
D/rxjava2: 接收 ——6
D/rxjava2: 接收 ——7
D/rxjava2: 接收 ——8
D/rxjava2: 接收 ——9
D/rxjava2: 接收 ——10
异步的情况下,上游先把事件序列内的事件发射完毕,下游才开始接收。如果上游的时间序列超过默认的128个,则上游事件发射到第129个就会报MissingBackpressureException异常,下游就接收不到事件了。这就涉及到下一个知识点了。
当上下游工作在不同的线程里时,每一个线程里都有一个requested,而我们下游调用request(1000)时,实际上改变的是下游线程中的requested,而上游中的requested的值是由RxJava内部调用request(n)去设置的,这个调用会在合适的时候自动触发。
何时自动触发呢?我们一起看下:
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
Log.d(TAG, "e.requested() == "+e.requested());
for (int i = 0; ; i++) {
boolean flag = false;
//这里做了一个循环,使发射循环处于保活状态,并在适当时机继续发射事件
while (e.requested() == 0){
if (!flag){
Log.d(TAG, "e.requested() == "+e.requested());
flag = true;
}
}
e.onNext(i + 1);
Log.d(TAG, "已发送" + (i + 1) + "个——剩下" + e.requested());
}
}
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
subscription = s;
}
@Override
public void onNext(Integer s) {
Log.d(TAG, "接收 ——" + s);
}
@Override
public void onError(Throwable t) {
Log.d(TAG, "接收错误——" + t);
}
@Override
public void onComplete() {
}
});
外部手动调用request向上游请求发射配额
if (subscription != null) {
subscription.request(96);
Log.d(TAG,"下游请求了96个");
}
打印Log日志:
D/rxjava2: e.requested() == 128
D/rxjava2: 已发送1个——剩下127
D/rxjava2: 已发送2个——剩下126
...
D/rxjava2: 已发送127个——剩下1
D/rxjava2: 已发送128个——剩下0
D/rxjava2: e.requested() == 0
D/rxjava2: 下游请求了96个
D/rxjava2: 接收 ——1
D/rxjava2: 接收 ——2
...
D/rxjava2: 接收 ——95
D/rxjava2: 接收 ——96
D/rxjava2: 已发送129个——剩下95
...
D/rxjava2: 已发送223个——剩下1
D/rxjava2: 已发送224个——剩下0
D/rxjava2: e.requested() == 0
1,2,3,4行表示启动之后,上游自动想上游的事件序列缓存区发射128个事件。
8,9...13行表示下游手动请求96个发射配额时接收的事件。此时会自动触发上游继续发送事件,如14,15...17,18行,上游会自动再次发射96个事件(95-0+1=96)。
如果你下游请求95个发射配额的话,上游不会自动触发事件发射的(这个应该是底层设置的触发阀吧)。
因此得出结论:当下游每消费96个事件便会自动触发内部的request()去设置上游的requested的值。
在某一些场景下,可以在发送事件前先判断当前的requested的值是否大于0,若等于0则说明下游处理不过来了,则需要等待,
注意:是在onNext事件里,onComplete和onError事件不会消耗requested。
本章完~
网友评论