1.retry
当出现错误事件,即当被观察者发送错误事件时。被观察者重新订阅并发送事件。并且是无条件重新订阅。
//当出现错误的时候,无条件重新发送
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new NullPointerException());
e.onNext(3);
}
})
//3 重新订阅发送的次数
.retry(3,new Predicate<Throwable>() {
@Override
public boolean test(Throwable throwable) throws Exception {
//返回true表示重新订阅发送事件
return true;
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext: "+integer );
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
2.retryWhen
当出现错误事件,即当被观察者发送错误事件时。被观察者重新订阅并发送事件。并且是有条件重新订阅。
条件为:
1.当apply方法返回的是Observable.empty(),Observable.error()。
则不发送事件。
2.throwableObservable.delay(1,TimeUnit.SECONDS);为重新订阅发送事件的条件,
每隔1秒重新订阅发送事件
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onError(new NullPointerException());
e.onNext(3);
}
}).retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
return Observable.empty();
// return throwableObservable.delay(1,TimeUnit.SECONDS);
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext: "+integer );
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
3.repeat
无条件重新订阅发送事件
//无条件重新订阅
Observable.just(2).repeat().subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext: "+integer );
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
4.repeatWhen
有条件重新订阅发送事件
条件和retryWhen原理一样
//有条件重新订阅
Observable.just(2).repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {
//传递被观察者的事件
//制定轮询时间,每一秒钟轮询一次
//当发送error或者empty时间,轮询被终止
// Observable.empty();
// Observable.error(new NullPointerException());
return objectObservable.delay(1,TimeUnit.SECONDS);
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext: "+integer );
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: " );
}
@Override
public void onComplete() {
}
});
网友评论