前言
上一节中,我们找到了造成上游、下游发射事件速度不平衡问题的原因,就是是否有水缸,那么接下来我们就从两个方面来解决:分别是从数量上和速度上。
1. 从数量上解决
这里需要使用sample操作符,sample操作符意思是每隔指定事件就从上游取出事件发射给下游,我们每隔2秒取一个事件给下游,示例代码如下:
/**
* 从数量上解决
*/
public static void demo1(){
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; ; i++) {
emitter.onNext(i);
}
}
}).subscribeOn(Schedulers.io()) // 让上游的 for循环在子线程中执行
.sample(2 , TimeUnit.SECONDS) // 每隔2秒从上游取出事件发射给下游
.observeOn(AndroidSchedulers.mainThread()) //下游切换到主线程中
.subscribe(new Consumer<Integer>() { // 建立连接后,new Consumer()然后复写 onNext()方法
@Override
public void accept(Integer integer) throws Exception {
Log.e("TAG" , "integer -> " + integer) ;
}
});
}
运行效果如下:

可以看到,上游在不停的发射数据,但我们只是每隔一定时间取一个放进水缸中,并没有全部放进水缸中,这次只是占用5M。
到这里,大家以后可以出去吹牛,我曾经通过技术手段去优化一个程序,使内存从 300多M降低到5M。
2. 从速度上解决
解决方式就是:可以给上游的 for循环语句之后,让其延迟2秒即可,具体代码如下:
/**
* 从速度上解决
*/
public static void demo2(){
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; ; i++) {
emitter.onNext(i);
Thread.sleep(2000); // 在上游的for循环执行完之后,添加2秒延迟
}
}
}).subscribeOn(Schedulers.io()) // 表示上游的for循环操作在子线程中执行
.observeOn(AndroidSchedulers.mainThread()) // 切换到主线程中执行下边的操作
.subscribe(new Consumer<Integer>() { // 建立连接之后,new Consumer()之后,只是复写onNext()方法,不需要复写其他方法
@Override
public void accept(Integer integer) throws Exception {
Log.e("TAG" , "integer -> " + integer) ;
}
});
}
运行效果如下:

有效果图可知:内存也变为6M,而且内存线很平稳。
- 上游通过适当的延迟,既减缓了事件进入水缸的速度,也让下游有充足的时间从水缸中去事件去处理,如此一来,就不至于大量事件一下子涌进水缸,也不会OOM;
3. 从 数量、速度 上修改之前的Zip操作符的合并多个上游事件
我们之前有一个zip操作符,把多个上游的事件组合之后,然后把组合事件发送给下游,下边通过示例代码也来实现下:
从数量上修改,代码如下:
/**
* 从数量上解决:
* 取样:每隔2秒从上游取出事件发射给下游
*/
public static void demo3(){
// 创建第一个上游:Observable1
Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; ; i++) {
emitter.onNext(i);
}
}
}).subscribeOn(Schedulers.io()).sample(2 , TimeUnit.SECONDS) ; // 让上游的for循环在子线程中执行,并且是每隔2秒从上游发射事件给下游
// 创建第二个上游:Observable2
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("A");
}
}).subscribeOn(Schedulers.io()) ;
// 通过zip操作符:把上游1、上游2组合,然后把组合后的事件再发射给下游
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return integer + s;
}
}).subscribe(new Consumer<String>() { // 建立连接之后,通过new Consumer()复写accept() -> 表示的是onNext()
@Override // 然后再new Consumer()复写 accept() -> 表示的是onError()
public void accept(String s) throws Exception {
Log.e("TAG" , "s -> " + s) ;
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e("TAG" , "throwable -> " + throwable) ;
}
});
}
从速度上解决,代码如下:
/**
* 从速度上解决:
* 让上游的for循环执行完之后,延迟两秒发射数据给下游
*/
public static void demo4(){
// 创建第一个上游:Observable1
Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
for (int i = 0; ; i++) {
emitter.onNext(i);
Thread.sleep(2000); // 让上游的for循环完之后,然后延迟2秒,让上游减慢发射事件的速度
}
}
}).subscribeOn(Schedulers.io()) ;
// 创建第二个上游:Observable2
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("A");
}
}) ;
// 通过zip操作符,把上游1、上游2组合,然后把组合后的事件发射给下游
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return integer + s;
}
}).subscribe(new Consumer<String>() { // 建立连接之后,然后new Consumer(),复写accept() -> 相当于 onNext()
@Override // 然后再次new Consumer(),复写accept() -> 相当于 onError()
public void accept(String s) throws Exception {
Log.e("TAG" , "s -> " + s) ;
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
Log.e("TAG" , "throwable -> " + throwable) ;
}
});
}
通过本节的学习,大家应该对如何处理上下游发射数据速度不平衡应该都知道如何解决了。下一节我们就来看下 Flowable,它的用法和我们这一节讲解的用法基本一样,只是对它稍微的封装了。
网友评论