先访问一个接口获取一个list<String>,在根据list中的每个对象,(遍历)访问另一接口。根据遍历返回的数据,聚合(增删)之前的list数据,最后输出list
先贴出第一次写的代码(代码有问题,后边有更好的code)
//查询全部大屏信息
public void getScreenList() {
if (!UtilMethod.checkNet(context)) return;
mScreenList = new ArrayList<>();
Subscription subscribe = serviceManager.queryAllScreen()
.subscribeOn(Schedulers.io())
.flatMap(new Func1<QueryAllScreenBean, Observable<String>>() {
@Override
public Observable<String> call(QueryAllScreenBean queryAllScreenBean) {
if (queryAllScreenBean.isSuccFlag()) {
if (queryAllScreenBean.getScreenBean() != null) {
List<String> screenBeanList = queryAllScreenBean.getScreenBean();
LogUtils.e("screenshare", screenBeanList.toString());
//screenBeanList 是第一次要获取的list<String>。用from遍历发射Observable<String>
return Observable.from(screenBeanList);
}
}
return Observable.empty();
}
})
.flatMap(new Func1<String, Observable<List<ScreenBean>>>() {
@Override
public Observable<List<ScreenBean>> call(String screenBeanStr) {
ScreenBean screenBean = new Gson().fromJson(screenBeanStr, ScreenBean.class);
String url = "http://" + screenBean.getLocalIP() + ":9031/GetState";
return getUsefulList(url, screenBean);
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<List<ScreenBean>>() {
@Override
public void onCompleted() {
LogUtils.e("获取大屏信息完成了");
}
@Override
public void onError(Throwable throwable) {
LogUtils.e("查询屏幕信息失败---" + throwable.toString());
mLessonView.onError("查询屏幕信息失败");
}
@Override
public void onNext(List<ScreenBean> list) {
if (list == null || list.size()==0) {
mLessonView.onError("查询屏幕信息失败");
} else {
mLessonView.onGotScreenList(list);
}
}
});
mCompositeSubscription.add(subscribe);
}
//获取屏幕状态。根据遍历到的地址,获取状态,如果状态不为空,就把这个对应的bean加到最后的list中。否则不加入。
private Observable<List<ScreenBean>> getUsefulList(String url, final ScreenBean screenBean) {
Observable<List<ScreenBean>> observable =
serviceManager.getScreenState(url)
.map(new Func1<String, List<ScreenBean>>() {
@Override
public List<ScreenBean> call(String s) {
if (!s.isEmpty()) {
mScreenList.add(screenBean);
}
return mScreenList;
}
});
return observable;
}
上边的代码是有bug的。
因为,根据一个String的集合List<String>,用from一个一个的发射这个集合List,在根据List中的各各String对象生成对应的多个Observable.每个Observable访问网络结束后都会回调Subscriber.onNext()。所有的Observable都访问完后,会调用Subscriber.onCompleted()。正常情况下是能达到功能的。但如果生成的某个Observable访问网络出现了异常,就直接调用Subscriber.onError(),会导致后边的Observable不再访问网络。也就是数据不完整,如果第一个Observable就出现了error,那后边就都不访问 了。这个问题需要解决。看下边的成功的代码。
private List<ScreenBean> mScreenList;
//查询全部大屏信息
public void getScreenList() {
if (!UtilMethod.checkNet(context)) return;
mScreenList = new ArrayList<>();
Subscription subscribe = serviceManager.queryAllScreen()
.subscribeOn(Schedulers.io())
.flatMap(new Func1<QueryAllScreenBean, Observable<String>>() {
@Override
public Observable<String> call(QueryAllScreenBean queryAllScreenBean) {
if (queryAllScreenBean.isSuccFlag()) {
if (queryAllScreenBean.getScreenBean() != null) {
List<String> mScreenBeanStrList = queryAllScreenBean.getScreenBean();
LogUtils.e(TAG, mScreenBeanStrList.toString());
//test
mScreenBeanStrList.clear();
mScreenBeanStrList.add("http://192.168.18.1:9031/GetState");
mScreenBeanStrList.add("http://192.168.18.1:9031/GetState");
mScreenBeanStrList.add("http://192.168.15.241:9031/GetState");
mScreenBeanStrList.add("http://192.168.18.1:9031/GetState");
// mScreenBeanStrList.add("http://192.168.18.2:9031/GetState");
// mScreenBeanStrList.add("http://192.168.18.3:9031/GetState");
// mScreenBeanStrList.add("http://192.168.15.241:9031/GetState");
// mScreenBeanStrList.add("http://192.168.18.3:9031/GetState");
// mScreenBeanStrList.add("http://192.168.18.3:9031/GetState");
return Observable.from(mScreenBeanStrList);
}
}
return Observable.error(new Throwable("list is null"));
}
})
.flatMap(new Func1<String, Observable<ScreenBean>>() {
@Override
public Observable<ScreenBean> call(String screenBeanStr) {
// ScreenBean screenBean = new Gson().fromJson(screenBeanStr, ScreenBean.class);
// String url = "http://" + screenBean.getLocalIP() + ":9031/GetState";
LogUtils.e(TAG, "getUsefulBean" );
ScreenBean screenBean = new ScreenBean();
screenBean.setScreenName(screenBeanStr);
return getUsefulBean(screenBeanStr, screenBean)
.onErrorResumeNext(Observable.<ScreenBean>empty());
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<ScreenBean>() {
@Override
public void onCompleted() {
mLessonView.onGotScreenList(mScreenList);
LogUtils.e(TAG, "获取All大屏信息完成了");
}
@Override
public void onError(Throwable throwable) {
LogUtils.e(TAG, "查询屏幕信息失败---" + throwable.getMessage());
}
@Override
public void onNext(ScreenBean bean) {
if (bean != null) {
mScreenList.add(bean);
LogUtils.e(TAG, "获取All大屏信息onNext success:" + mScreenList.size());
} else {
LogUtils.e(TAG, "获取All大屏信息onNext null bean:" + mScreenList.size());
}
}
});
mCompositeSubscription.add(subscribe);
}
//获取屏幕状态
private Observable<ScreenBean> getUsefulBean(String url, final ScreenBean screenBean) {
return serviceManager.getScreenState(url)
.flatMap(new Func1<String, Observable<ScreenBean>>() {
@Override
public Observable<ScreenBean> call(String s) {
if (!s.isEmpty()) {
LogUtils.e(TAG, "获取单个大屏信息success");
return Observable.just(screenBean);
}
LogUtils.e(TAG, "获取单个大屏信息null");
return Observable.empty();
}
});
}
上边的代码是最终版的
解释: 因为每个Observable在访问失败后,会调用.onErrorResumeNext(Observable.<ScreenBean>empty());的方法,就避免了后边的Observable被中断。最后每次成功的回调都会走onNext(),在onNext()中将成功的ScreenBean加入到mScreenList中。最后会在onCompleted()中刷新页面的回调。
- 走过的弯路
通过查资料,onErrorResumeNext(),这个操作符当发生错误的时候,由另外一个Observable来代替当前的Observable并继续发射数据。于是我就想到了用递归的方法,遇到错误后,继续后边的发射。
private Observable<ScreenBean> getNextObservable(final List<String> list) {
if (mIndex >= list.size()) {
return Observable.<ScreenBean>empty();
}
List<String> subList = list.subList(mIndex, list.size());
return Observable.from(subList)
.flatMap(new Func1<String, Observable<ScreenBean>>() {
@Override
public Observable<ScreenBean> call(String screenBeanStr) {
// ScreenBean screenBean = new Gson().fromJson(screenBeanStr, ScreenBean.class);
// String url = "http://" + screenBean.getLocalIP() + ":9031/GetState";
LogUtils.e(TAG, "getUsefulBean" + mIndex);
ScreenBean screenBean = new ScreenBean();
screenBean.setScreenName(screenBeanStr);
mIndex++;
return getUsefulBean(screenBeanStr, screenBean)
.onErrorResumeNext(getNextObservable(list));
}
});
}
在onErrorResumeNext的时候,继续调用自己getNextObservable,只不过根据mIndex这个数值,舍弃掉了之前的有问题的Observable。开始新的from。
但是,该例子中,是产生了一系列的Observable。就是说,每次遇到一个失败的Observable,就舍弃这个Observable,取而代之的是产生的后边的一系列的几个Observable。比如,有0,0,1,0这样的三个Observable(0代表失败的,1代表成功的)。第一次0的时候,失败了,新产生了(0,1,0)这个序列,但是之前的序列(0,0,1,0)除了第一个0被消费,其他的都存在。以此类推,会产生越来越多的Observable。最后的访问的次数理论是11次,有4个合格的数据(下图中4个1。最后成功的1和最后一次的0不产生新的Observable)。实际是9次,3个合理的数据(可能有没考虑到的问题,或者Rxjava的机制)。总之逻辑是错误的,应该就访问4次才对。所以不应该是用递归迭代。
递归访问的次数分析图
可以参考后边的类似的问题,继续发射另外的数据7/8/9的例子。后边跟我项目中的例子的不同点,是因为后边的那个例子就是一个Observable。
下边补充一下知识:(可自己百度“rxjava错误处理”)
一、错误处理操作符列表
用于对Observable发射的 onError 通知做出响应或者从错误中恢复,例如,你
可以:
吞掉这个错误,切换到一个备用的Observable继续发射数据
吞掉这个错误然后发射默认值
吞掉这个错误并立即尝试重启这个Observable
吞掉这个错误,在一些回退间隔后重启这个Observable
名称解释
OnErrorReturn
当发生错误的时候,让Observable发射一个预先定义好的数据并正常地终止
onErrorReturn方法 返回一个镜像原有Observable行为的新Observable
会忽略前者的onError调用,不会将错误传递给观察者,而是发射一个特殊的项并调用观察者的onCompleted方法。
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
for (int i = 1; i <= 6; i++) {
if (i < 3) {
subscriber.onNext("onNext:" + i);
} else if (false) {
subscriber.onError(new Exception("Exception"));
} else {
subscriber.onError(new Throwable("Throw error"));
}
}
}
}).onErrorReturn(throwable -> "onErrorReturn")
.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
logger("onErrorReturn-onCompleted\n");
}
@Override
public void onError(Throwable e) {
logger("onErrorReturn-onError:" + e.getMessage());
}
@Override
public void onNext(String s) {
logger("onErrorReturn-onNext:" + s);
}
});
onErrorReturn-onNext:onNext:1
onErrorReturn-onNext:onNext:2
onErrorReturn-onNext:onErrorReturn
onErrorReturn-onCompleted
onErrorReturn在错误发生的时候继续发射了提前定义好的数据并正常结束Observable
onErrorResumeNext
当发生错误的时候,由另外一个Observable来代替当前的Observable并继续发射数据
onErrorResumeNext方法返回一个镜像原有Observable行为的新Observable
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
for (int i = 1; i <= 6; i++) {
if (i < 3) {
subscriber.onNext("onNext:" + i);
} else if (false) {
subscriber.onError(new Exception("Exception"));
} else {
subscriber.onError(new Throwable("Throw error"));
}
}
}
}).onErrorResumeNext(Observable.just("7", "8", "9"))
.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
logger("onErrorResume-onCompleted\n");
}
@Override
public void onError(Throwable e) {
logger("onErrorResume-onError:" + e.getMessage());
}
@Override
public void onNext(String s) {
logger("onErrorResume-onNext:" + s);
}
});
onErrorResume-onNext:onNext:1
onErrorResume-onNext:onNext:2
onErrorResume-onNext:7
onErrorResume-onNext:8
onErrorResume-onNext:9
onErrorResume-onCompleted
onErrorResume在错误发生后,另一个 继续发射另外的数据7/8/9,然后正常结束了Observable。
OnExceptionResumeNext
类似于OnErrorResumeNext,不同之处在于其会对onError抛出的数据类型做判断,
如果是Exception,也会使用另外一个Observable代替原Observable继续发射数据,否则会将错误分发给Subscriber。
Retry
原始Observable遇到错误,重新订阅它期望它能正常终止.
Retry操作符不会将原始Observable的onError通知传递给观察者,它会订阅这个Observable,再给它一次机会无错误地完成它的数据序列。
Retry总是传递onNext通知给观察者,由于重新订阅,可能会造成数据项重复.
这个函数返回一个布尔值,如果返回true,retry应该再次订阅和镜像原始的Observable,
如果返回false,retry会将最新的一个onError通知传递给它的观察者。
retry(long):指定最多重新订阅的次数,如果次数超了,它不会尝试再次订阅,它会把最新的一个onError通知传递给它的观察者。
Rxjava还实现了RetryWhen操作符。和repeatWhen类似,但是一个是重复订阅(触发onComplete),一个是错误后重复订阅(触发onError())。
当错误发生时,retryWhen会接收onError的throwable作为参数,
并根据定义好的函数返回一个Observable,如果这个Observable发射一个数据,就会重新订阅,
否则,就将这个通知传递给观察者然后终止。
//定义createObserver方法,跟前边代码类似
createObserver().retry(2)
.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
logger("retry-onCompleted\n");
}
@Override
public void onError(Throwable e) {
logger("retry-onError:" + e.getMessage());
}
@Override
public void onNext(String s) {
logger("retry-onNext:" + s);
}
});
retry-onNext:onNext:1
retry-onNext:onNext:2
retry-onNext:onNext:1
retry-onNext:onNext:2
retry-onNext:onNext:1
retry-onNext:onNext:2
retry-onError:Throw error
在尝试了几次还是产生错误后,retry会将错误分发给观察者
retryWhen
Observable.create((Subscriber<? super String> s) -> {
logger("subscribing");
s.onError(new RuntimeException("always fails"));
}).retryWhen(attempts -> {
return attempts.zipWith(Observable.range(1, 3), (n, i) -> i).flatMap(i -> {
logger("delay retry by " + i + " second(s)");
return Observable.timer(i, TimeUnit.SECONDS);
});
}).toBlocking().forEach(this::logger);
subscribing
delay retry by 1 second(s)
subscribing
delay retry by 2 second(s)
subscribing
delay retry by 3 second(s)
subscribing
在尝试了几次还是产生错误后,retryWhen会正常结束,并不会将错误分发出去。di
网友评论