组合操作符
组合操作符一共主要有以下几个:
- CombineLatest
- Join
- Merge
- StartWith
- Switch
- Zip
zip
zip操作符一般可以用在如下场景,
- 一个页面有多个接口,我希望等数据全部返回出来之后一并显示
- 用户发图片,可能有多张,我希望用户上传完图片之后一并提示
......
看图说话,当我们有2个或多个Observables发射的数据项时,zip操作符将严格的按照发射的顺序去将结合这些数据项,并且最终他发射出的数据数量与发射数据项最少的那个Observable的数据数量一样多。
zip操作符可以接受一到九个参数:一个Observable序列,或者一些发射Observable的Observables
例子:
我这边有三个数据源,他们分别在2s\4s\6s吐数据,然后我打算将他们整合在一起
Observable o1=Observable.create(new Observable.OnSubscribe<Object>() {
@Override
public void call(Subscriber<? super Object> subscriber) {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
subscriber.onNext("first");
}
});
Observable o2=Observable.create(new Observable.OnSubscribe<Object>() {
@Override
public void call(Subscriber<? super Object> subscriber) {
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
subscriber.onNext("second");
}
});
Observable o3=Observable.create(new Observable.OnSubscribe<Object>() {
@Override
public void call(Subscriber<? super Object> subscriber) {
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
e.printStackTrace();
}
subscriber.onNext("third");
}
}).subscribeOn(Schedulers.io());
Observable.zip(o1, o2, o3, new Func3() {
@Override
public Object call(Object o, Object o2, Object o3) {
Log.d("MainActivity", o.toString());
Log.d("MainActivity", o2.toString());
Log.d("MainActivity", o3.toString());
return o.toString()+o2.toString()+o3.toString();
}}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1() {
@Override
public void call(Object o) {
Log.d("MainActivity", o.toString());
}
});
最终,这个call将会在三条数据都返回过来后再执行
06-21 05:19:04.276 6306-6416/com.example.clevo.rxjavademo D/MainActivity: first
06-21 05:19:04.276 6306-6416/com.example.clevo.rxjavademo D/MainActivity: second
06-21 05:19:04.276 6306-6416/com.example.clevo.rxjavademo D/MainActivity: third
06-21 05:19:04.276 6306-6306/com.example.clevo.rxjavademo D/MainActivity: firstsecondthird
请看时间点是一致的
如果有多对匹配,那么则顺序打印出匹配项
Observable o4=Observable.create(new Observable.OnSubscribe<Object>() {
@Override
public void call(Subscriber<? super Object> subscriber) {
for (int i=0;i<3;i++) {
try {
Thread.sleep(i*3000);
subscriber.onNext("four");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}}).subscribeOn(Schedulers.io());
Observable.zip(Observable.just("1", "2", "3", "4"), o4, new Func2() {
@Override
public Object call(Object o, Object o2) {
//每结合成功一组就触发
Log.d("MainActivity", o.toString());
Log.d("MainActivity", o2.toString());
return o.toString()+o2.toString();
}}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1() {
@Override
public void call(Object o) {
Log.d("MainActivity", o.toString());
}
});
看看结果
06-21 05:33:11.689 19242-19264/com.example.clevo.rxjavademo D/MainActivity: 1
06-21 05:33:11.689 19242-19264/com.example.clevo.rxjavademo D/MainActivity: four
06-21 05:33:11.690 19242-19242/com.example.clevo.rxjavademo D/MainActivity: 1four
06-21 05:33:17.690 19242-19264/com.example.clevo.rxjavademo D/MainActivity: 2
06-21 05:33:17.690 19242-19264/com.example.clevo.rxjavademo D/MainActivity: four
06-21 05:33:17.690 19242-19242/com.example.clevo.rxjavademo D/MainActivity: 2four
06-21 05:33:26.691 19242-19264/com.example.clevo.rxjavademo D/MainActivity: 3
06-21 05:33:26.691 19242-19264/com.example.clevo.rxjavademo D/MainActivity: four
06-21 05:33:26.691 19242-19242/com.example.clevo.rxjavademo D/MainActivity: 3four
与zip操作符功能差不多的还有zipWith操作符,他与zip操作符的区别在于zipWith操作符总是接受两个参数,第一个参数是一个Observable或者一个Iterable
o4.zipWith(Observable.just("1", "2", "3", "4"), new Func2() {
@Override
public Object call(Object o, Object o2) {
Log.d("MainActivity", o.toString());
Log.d("MainActivity", o2.toString());
return o.toString()+o2.toString();
}}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1() {
@Override
public void call(Object o) {
Log.d("MainActivity", o.toString());
}
});
结果与之前一致
CombineLatest
CombineLatest操作符一般可以用在如下场景
- 登录页面用户名密码需要同时校验,两个EditText的TextWatcher分别触发不同的Observal,以达到共同校验的效果
看图说话,CombineLatest操作符与Zip操作符类似,但是区别很明显,ZIp操作符是每一个Observal都发射数据了,才会被结合成一个新的Observal,而CombineLatest是只要之前的Observal被发射过了,那么他会用这条Observal最后的那条数据,重新结合成一个新的Observal。这个从图中应该能够清晰的展现出来。
我把之前的代码改动一下
Observable.combineLatest(o1, Observable.just("1", "2", "3", "4"), new Func2() {
@Override
public Object call(Object o, Object o2) {
Log.d("MainActivity", o.toString());
Log.d("MainActivity", o2.toString());
return o.toString()+o2.toString();
}}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1() {
@Override
public void call(Object o) {
Log.d("MainActivity", o.toString());
}
});
来看看结果
06-21 20:44:32.084 2068-2089/com.example.clevo.rxjavademo D/MainActivity: first
06-21 20:44:32.084 2068-2089/com.example.clevo.rxjavademo D/MainActivity: 1
06-21 20:44:32.084 2068-2089/com.example.clevo.rxjavademo D/MainActivity: first
06-21 20:44:32.084 2068-2089/com.example.clevo.rxjavademo D/MainActivity: 2
06-21 20:44:32.084 2068-2089/com.example.clevo.rxjavademo D/MainActivity: first
06-21 20:44:32.084 2068-2089/com.example.clevo.rxjavademo D/MainActivity: 3
06-21 20:44:32.084 2068-2089/com.example.clevo.rxjavademo D/MainActivity: first
06-21 20:44:32.084 2068-2089/com.example.clevo.rxjavademo D/MainActivity: 4
06-21 20:44:32.084 2068-2068/com.example.clevo.rxjavademo D/MainActivity: first1
06-21 20:44:32.084 2068-2068/com.example.clevo.rxjavademo D/MainActivity: first2
06-21 20:44:32.084 2068-2068/com.example.clevo.rxjavademo D/MainActivity: first3
06-21 20:44:32.084 2068-2068/com.example.clevo.rxjavademo D/MainActivity: first4
虽然o1只发射了一条数据,但是另外一个Observal有4条数据出来,最终得到4条结果
Join
Join恕我个人愚笨,这张图我看了好久才知道是什么意思。这个跟之前所说的结合是一个很大的区别,它带上了有效期。你可以简单的理解是,我输入了手机号,然后等待验证码,验证码有效期是1分钟,如果在1分钟内你发射数据,咱俩就可以结合了,1分钟后,你照死发,我都不会理你。
看图说话,源Observal就是第一行数据,目标Observal是第二行数据。源Observal的有效期是蓝色箭头跟黑色箭头之间部分,目标Observal有效期是黑色部分跟粉红色部分之间。所以我们看,第一个菱形过来之后,粉红色球还处于有效期内,他们结合了;土黄色球来了,菱形还在他有效期内,他们也结合了;但是青色球来了之后,菱形的有效期过了,就不能结合了。
o1.join(o3, new Func1() {
@Override
public Object call(Object o) {
return Observable.timer(2, TimeUnit.SECONDS);
}}, new Func1() {
@Override
public Object call(Object o) {
return Observable.timer(2, TimeUnit.SECONDS);
}}, new Func2() {
@Override
public Object call(Object o, Object o2) {
return o.toString()+o2.toString();
}}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(Object o) {
Log.d("MainActivity", o.toString());
}
});
四个参数是什么意思,o3是目标Observal,第二个参数是源Observal的有效时间,第三个参数是目标Observal的有效时间,第四个参数是如果有合并,自行处理合并后的逻辑。这里,第二个参数的有效期只有2秒,那么算上之前的延时时间,源Observal过期2s之后目标Observal才发射数据,那么这里就不会有结合操作了
跟他类似的还有GroupJoin
GroupJoin大体上是一致的,区别在与他是等到有效期到了,才响应,而且第四个参数入参有区别,join是你对象Observal发射出来的那个值,而GroupJoin是对象Observal发射出的Observal
o1.groupJoin(o3, new Func1() {
@Override
public Object call(Object o) {
return Observable.timer(7, TimeUnit.SECONDS);
}}, new Func1() {
@Override
public Object call(Object o) {
return Observable.timer(2, TimeUnit.SECONDS);
}}, new Func2() {
@Override
public Object call(final Object o, Object o2) {
return ((Observable<String>) o2).flatMap(new Func1<String, Observable<?>>() {
@Override
public Observable<?> call(String s) {
return Observable.just(s+o.toString());
}
});
}}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(Object o) {
((Observable) o).subscribe(new Subscriber() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(Object o) {
Log.d("MainActivity", o.toString());
}
});
}
});
算上之前的启动信息
06-22 02:25:39.927 730-20553/system_process I/ActivityManager: START u0 {act=android.intent.action.MAIN cat=[android.intent.category.LAUNCHER] flg=0x10200000 cmp=com.example.clevo.rxjavademo/.MainActivity (has extras)} from uid 10039 on display 0
06-22 02:25:39.927 730-20553/system_process V/WindowManager: addAppToken: AppWindowToken{1b4d4776 token=Token{ad42a11 ActivityRecord{3912da38 u0 com.example.clevo.rxjavademo/.MainActivity t236}}} to stack=1 task=236 at 0
06-22 02:25:39.954 730-1035/system_process V/WindowManager: Adding window Window{cb3386f u0 com.example.clevo.rxjavademo/com.example.clevo.rxjavademo.MainActivity} at 3 of 9 (before Window{12c90a49 u0 Starting com.example.clevo.rxjavademo})
06-22 02:25:40.117 730-756/system_process I/ActivityManager: Displayed com.example.clevo.rxjavademo/.MainActivity: +186ms
06-22 02:25:47.938 29412-30378/com.example.clevo.rxjavademo D/MainActivity: thirdfirst
可以看到到了有效期结束的时候,顺利打印
Merge
我们在请求列表数据的时候,一般有这个需求,将本地保存的数据与网络请求到的数据合并成一条数据显示出来,这个时候,我们就需要用到Merge
Merge从图中可以看出来,Merge只是单纯的把多条Observal合并成1条
Observable.merge(o4, o3).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1() {
@Override
public void call(Object o) {
Log.d("MainActivity", o.toString());
}
});
看看结果
06-22 02:42:18.161 13441-13441/com.example.clevo.rxjavademo D/MainActivity: four
06-22 02:42:21.161 13441-13441/com.example.clevo.rxjavademo D/MainActivity: third
06-22 02:42:24.162 13441-13441/com.example.clevo.rxjavademo D/MainActivity: four
06-22 02:42:33.162 13441-13441/com.example.clevo.rxjavademo D/MainActivity: four
还有一个操作符与他类似,MergeDelayError,他与Merge的区别在于,Merge在数据发射过程中如果遇到错误,会立即终止,MergeDelayError则会继续发射,直到数据发射完,才将Error传递给观察者
Observable o5=Observable.create(new Observable.OnSubscribe<Object>() {
@Override
public void call(Subscriber<? super Object> subscriber) {
for (int i=0;i<3;i++) {
try {
Thread.sleep((i+1)*3000);
if (i==1) {
subscriber.onError(new Exception("ERROR"));
}
subscriber.onNext("five");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}}).subscribeOn(Schedulers.io());
Observable.mergeDelayError(o5, o3).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Subscriber() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(Object o) {
Log.d("MainActivity", o.toString());
}
});
对比下结果
06-22 02:52:15.416 22830-22830/com.example.clevo.rxjavademo D/MainActivity: five
06-22 02:52:18.416 22830-22830/com.example.clevo.rxjavademo D/MainActivity: third
06-22 02:52:21.418 22830-22830/com.example.clevo.rxjavademo D/MainActivity: five
06-22 02:52:30.418 22830-22830/com.example.clevo.rxjavademo D/MainActivity: five
06-22 02:57:21.805 27254-27254/com.example.clevo.rxjavademo D/MainActivity: five
06-22 02:57:22.295 27254-27254/com.example.clevo.rxjavademo D/MainActivity: third
StartWith
这个其实没啥好说的,就是往既定的Observal里面直接加入一个Observal的内容
startWith.c.pngObservable.just("1", "2", "3").startWith("5", "6", "7").subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.d("MainActivity", s);
}
});
看看结果
06-22 03:36:55.080 31825-31825/? D/MainActivity: 5
06-22 03:36:55.080 31825-31825/? D/MainActivity: 6
06-22 03:36:55.080 31825-31825/? D/MainActivity: 7
06-22 03:36:55.080 31825-31825/? D/MainActivity: 1
06-22 03:36:55.080 31825-31825/? D/MainActivity: 2
06-22 03:36:55.080 31825-31825/? D/MainActivity: 3
SwitchOnNext
SwitchOnNext如图,有一个Observable,他可以自己狂发射不同的Observal,这个时候,如果在同一个时间内存在两个或多个Observable提交结果,那么只取最后一个Observable提交的结果给观察者,这里面第二个Observal发射数据了,那么第一个Observal从黄球开始就被关闭了
private Observable<String> createObserver(final int index) {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
for (int i = 1; i < index; i++) {
subscriber.onNext(index + "-" + i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).subscribeOn(Schedulers.newThread());
}
每隔1s发一条数据
Observable.switchOnNext(Observable.create(
new Observable.OnSubscribe<Observable<String>>() {
@Override
public void call(Subscriber<? super Observable<String>> subscriber) {
for (int i = 1; i < 3; i++) {
if (i==1) {
subscriber.onNext(createObserver(10));
}
else if (i==2) {
subscriber.onNext(createObserver(5));
}
try {
Thread.sleep(2100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
})).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.d("MainActivity", s);
}
});
这样,分析中我们可知,在过了4200s之后,第一条Observal就会被关闭
06-22 04:25:01.155 11678-11709/com.example.clevo.rxjavademo D/MainActivity: 10-1
06-22 04:25:02.156 11678-11709/com.example.clevo.rxjavademo D/MainActivity: 10-2
06-22 04:25:03.157 11678-11709/com.example.clevo.rxjavademo D/MainActivity: 10-3
06-22 04:25:03.255 11678-11744/com.example.clevo.rxjavademo D/MainActivity: 5-1
06-22 04:25:04.259 11678-11744/com.example.clevo.rxjavademo D/MainActivity: 5-2
06-22 04:25:05.263 11678-11744/com.example.clevo.rxjavademo D/MainActivity: 5-3
06-22 04:25:06.263 11678-11744/com.example.clevo.rxjavademo D/MainActivity: 5-4
组合操作符我们就学习到这里了,下一篇我们学一下简单的错误处理
主要参考文章
- 给 Android 开发者的 RxJava 详解 扔物线大神的文章,我的RXJava启蒙文章
- 木水川的博客 对RXJava操作符做了很详细的说明以及提供详细的示例
- **ReactiveX文档中文翻译 mcxiaoke翻译的RXJava中文文档 **
- Android RxJava使用介绍 job_hesc对RXJava操作符介绍以及简单的示例
网友评论