前言
上文我们介绍了过滤操作符和条件操作符。这里我们接着介绍合并操作符和异常处理操作符。
合并型操作符
合并操作符,是将两个或多个被观察者合并为一个被观察者,并向观察者传递事件。常见的合并操作符有startWith,concatWith,concat,merge,zip。
startWith操作符
Observable.just(1, 2, 3)
.startWith(Observable.just(10, 20, 30))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "accept: " + integer);
}
});
accept: 10
accept: 20
accept: 30
accept: 1
accept: 2
accept: 3
startWith可以将两个被观察者合并为一个被观察者,这里需要注意的是,startWith里面的被观察者是先发送的。
concatWith操作符
concatWith的功能和startWith相同,只是concatWith里的面被观察者是后发送的。
concat操作符
Observable.concat(Observable.just(1), Observable.just("23"), Observable.just(45))
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
Log.d(TAG, "accept: " + o);
}
});
accept: 1
accept: 23
accept: 45
concat操作符是将至多四个被观察者合并为一个被观察者。
merge操作符
Observable observable1 = Observable.intervalRange(1, 3, 1,2, TimeUnit.SECONDS);
Observable observable2 = Observable.intervalRange(6, 3, 1,2, TimeUnit.SECONDS); // 6 7 8 9 10
Observable observable3 = Observable.intervalRange(11, 3, 1,2, TimeUnit.SECONDS); // 11 12 13 14 15
Observable.merge(observable1,observable2,observable3)
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
Log.d(TAG, "accept: " + o);
}
});
accept: 1
accept: 6
accept: 11
accept: 2
accept: 7
accept: 12
accept: 3
accept: 8
accept: 13
同样merge操作符还是至多能让四个被观察者合并,这里我之所以用intervalRange的方式创建就是为了证明每个被合并的观察者都是并列执行的。
zip操作符
Observable.zip(Observable.just("语文","数学","英语"),Observable.just(90,99,82,95), new BiFunction<String, Integer, StringBuffer>() {
@Override
public StringBuffer apply(String s, Integer integer) throws Exception {
return new StringBuffer().append("课程" + s).append("==").append(integer+"");
}
})
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
Log.d(TAG, "accept: " + o);
}
});
accept: 课程语文==90
accept: 课程数学==99
accept: 课程英语==82
zip操作符是将两个或多个被观察者发送的数据一一对应组合起来成为一个新的数据发送给观察者。
这里需要注意的是,根据代码和日志我们可以看到,被观察者二多了一个95的数据,但是日志并没有打印出来,这是因为该数据在被观察者一中不存在对应关系,所以就被舍弃了。
异常处理型操作符
异常处理操作符在Rxjava中的功能十分强大,可以对异常进行不同程度的处理和转换。常见的异常处理操作符有onErrorReturn,onErrorResumeNext,onExceptionResumeNext,retry。。
onErrorReturn操作符
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
for (int i = 0; i < 100; i++) {
if (i == 5) {
e.onError(new IllegalAccessError("我要报错了")); // 发射异常事件
}
e.onNext(i);
}
e.onComplete();
}
})
.onErrorReturn(new Function<Throwable, Integer>() {
@Override
public Integer apply(Throwable throwable) throws Exception {
Log.d(TAG, "onErrorReturn: " + throwable.getMessage());
return 400; // 400代表有错误,给下一层,目前 下游 观察者
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer); // 400
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
onNext: 0
onNext: 1
onNext: 2
onNext: 3
onNext: 4
onErrorReturn: 我要报错了
onNext: 400
onComplete:
onErrorReturn操作符,可以拦截到被观察者使用onError发送的异常,并且给观察者发送一个标识。
onErrorResumeNext操作符
与onErrorReturn操作符类似,不同的地方在于他返回的是一个被观察者,而我们可以在这个被观察者内多次发送事件。
onExceptionResumeNext操作符
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
for (int i = 0; i < 100; i++) {
if (i == 5) {
// e.onError(new IllegalAccessError("我要报错了")); // 发射异常事件
throw new Exception("出错了");
}
e.onNext(i);
}
e.onComplete();
}
})
.onExceptionResumeNext(new ObservableSource<Integer>() {
@Override
public void subscribe(Observer<? super Integer> observer) {
observer.onNext(404); // 可以让程序 不崩溃的
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer); // 400
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
onNext: 0
onNext: 1
onNext: 2
onNext: 3
onNext: 4
onNext: 404
onExceptionResumeNext操作符非常强大,他同样可以处理onError事件,但它与上面两个操作符不同的是,它可以处理throw抛出的异常,使程序不崩溃。
retry操作符
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
for (int i = 0; i < 100; i++) {
if (i == 3) {
e.onError(new IllegalAccessError("我要报错了")); // 发射异常事件
}
e.onNext(i);
}
e.onComplete();
}
})
//.retry(2, new Predicate<Throwable>() {
// @Override
// public boolean test(Throwable throwable) throws Exception {
// return true;
// }
//
.retry(new BiPredicate<Integer, Throwable>() {
@Override
public boolean test(Integer integer, Throwable throwable) throws Exception {
if (integer>=3){
return false;
}
return true;
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer); // 400
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: ");
}
});
onNext: 0
onNext: 1
onNext: 2
onNext: 0
onNext: 1
onNext: 2
onNext: 0
onNext: 1
onNext: 2
onError: 我要报错了
retry操作符是发生异常时的重试操作符,上面两种的结果都是一样的,第二种写法可以得到当前发送的次数。
这里需要注意的是,第一种写法数量是重试的数量,并没有计算第一次发送的。
至此,Rxjava中常见的操作符已经全部讲解完成。
网友评论