美文网首页
Rxjava复杂访问 flatMap + form

Rxjava复杂访问 flatMap + form

作者: 小玉1991 | 来源:发表于2018-08-13 20:51 被阅读25次

先访问一个接口获取一个list<String>,在根据list中的每个对象,(遍历)访问另一接口。根据遍历返回的数据,聚合(增删)之前的list数据,最后输出list

先贴出第一次写的代码(代码有问题,后边有更好的code)

//查询全部大屏信息
    public void getScreenList() {
        if (!UtilMethod.checkNet(context)) return;

        mScreenList = new ArrayList<>();
        Subscription subscribe = serviceManager.queryAllScreen()
                .subscribeOn(Schedulers.io())
                .flatMap(new Func1<QueryAllScreenBean, Observable<String>>() {
                    @Override
                    public Observable<String> call(QueryAllScreenBean queryAllScreenBean) {
                        if (queryAllScreenBean.isSuccFlag()) {
                            if (queryAllScreenBean.getScreenBean() != null) {
                                List<String> screenBeanList = queryAllScreenBean.getScreenBean();
                                LogUtils.e("screenshare", screenBeanList.toString());
    //screenBeanList 是第一次要获取的list<String>。用from遍历发射Observable<String> 
                                return Observable.from(screenBeanList);
                            }
                        }

                        return Observable.empty();
                    }
                })
                .flatMap(new Func1<String, Observable<List<ScreenBean>>>() {

                    @Override
                    public Observable<List<ScreenBean>> call(String screenBeanStr) {
                        ScreenBean screenBean = new Gson().fromJson(screenBeanStr, ScreenBean.class);
                        String url = "http://" + screenBean.getLocalIP() + ":9031/GetState";

                        return getUsefulList(url, screenBean);
                    }


                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<List<ScreenBean>>() {
                    @Override
                    public void onCompleted() {

                        LogUtils.e("获取大屏信息完成了");
                    }

                    @Override
                    public void onError(Throwable throwable) {
                        LogUtils.e("查询屏幕信息失败---" + throwable.toString());
                        mLessonView.onError("查询屏幕信息失败");
                    }

                    @Override
                    public void onNext(List<ScreenBean> list) {
                        if (list == null || list.size()==0) {
                            mLessonView.onError("查询屏幕信息失败");
                        } else {
                            mLessonView.onGotScreenList(list);
                        }

                    }
                });
        mCompositeSubscription.add(subscribe);
    }

    //获取屏幕状态。根据遍历到的地址,获取状态,如果状态不为空,就把这个对应的bean加到最后的list中。否则不加入。
    private Observable<List<ScreenBean>> getUsefulList(String url, final ScreenBean screenBean) {

        Observable<List<ScreenBean>> observable =
                serviceManager.getScreenState(url)
                        .map(new Func1<String, List<ScreenBean>>() {
                            @Override
                            public List<ScreenBean> call(String s) {
                                if (!s.isEmpty()) {
                                    mScreenList.add(screenBean);
                                }
                                return mScreenList;
                            }
                        });

        return observable;
    }

上边的代码是有bug的。

因为,根据一个String的集合List<String>,用from一个一个的发射这个集合List,在根据List中的各各String对象生成对应的多个Observable.每个Observable访问网络结束后都会回调Subscriber.onNext()。所有的Observable都访问完后,会调用Subscriber.onCompleted()。正常情况下是能达到功能的。但如果生成的某个Observable访问网络出现了异常,就直接调用Subscriber.onError(),会导致后边的Observable不再访问网络。也就是数据不完整,如果第一个Observable就出现了error,那后边就都不访问 了。这个问题需要解决。看下边的成功的代码。

private List<ScreenBean> mScreenList;
    //查询全部大屏信息
    public void getScreenList() {
        if (!UtilMethod.checkNet(context)) return;

        mScreenList = new ArrayList<>();

        Subscription subscribe = serviceManager.queryAllScreen()
                .subscribeOn(Schedulers.io())
                .flatMap(new Func1<QueryAllScreenBean, Observable<String>>() {
                    @Override
                    public Observable<String> call(QueryAllScreenBean queryAllScreenBean) {
                        if (queryAllScreenBean.isSuccFlag()) {
                            if (queryAllScreenBean.getScreenBean() != null) {
                                List<String> mScreenBeanStrList = queryAllScreenBean.getScreenBean();
                                LogUtils.e(TAG, mScreenBeanStrList.toString());


                                //test
                                mScreenBeanStrList.clear();
                                mScreenBeanStrList.add("http://192.168.18.1:9031/GetState");
                                mScreenBeanStrList.add("http://192.168.18.1:9031/GetState");
                                mScreenBeanStrList.add("http://192.168.15.241:9031/GetState");
                                mScreenBeanStrList.add("http://192.168.18.1:9031/GetState");

//                                mScreenBeanStrList.add("http://192.168.18.2:9031/GetState");
//                                mScreenBeanStrList.add("http://192.168.18.3:9031/GetState");
//                                mScreenBeanStrList.add("http://192.168.15.241:9031/GetState");
//                                mScreenBeanStrList.add("http://192.168.18.3:9031/GetState");
//                                mScreenBeanStrList.add("http://192.168.18.3:9031/GetState");

                                return Observable.from(mScreenBeanStrList);
                            }
                        }

                        return Observable.error(new Throwable("list is null"));
                    }
                })
                .flatMap(new Func1<String, Observable<ScreenBean>>() {

                    @Override
                    public Observable<ScreenBean> call(String screenBeanStr) {
//                        ScreenBean screenBean = new Gson().fromJson(screenBeanStr, ScreenBean.class);
//                        String url = "http://" + screenBean.getLocalIP() + ":9031/GetState";

                        LogUtils.e(TAG, "getUsefulBean" );
                        ScreenBean screenBean = new ScreenBean();
                        screenBean.setScreenName(screenBeanStr);
               

                        return getUsefulBean(screenBeanStr, screenBean)
                                .onErrorResumeNext(Observable.<ScreenBean>empty());
                    }


                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<ScreenBean>() {
                    @Override
                    public void onCompleted() {
                        mLessonView.onGotScreenList(mScreenList);
                        LogUtils.e(TAG, "获取All大屏信息完成了");
                    }

                    @Override
                    public void onError(Throwable throwable) {
                        LogUtils.e(TAG, "查询屏幕信息失败---" + throwable.getMessage());

                    }

                    @Override
                    public void onNext(ScreenBean bean) {

                        if (bean != null) {
                            mScreenList.add(bean);
                            LogUtils.e(TAG, "获取All大屏信息onNext success:" + mScreenList.size());
                        } else {
                            LogUtils.e(TAG, "获取All大屏信息onNext null bean:" + mScreenList.size());
                        }

                    }
                });
        mCompositeSubscription.add(subscribe);
    }
 //获取屏幕状态
    private Observable<ScreenBean> getUsefulBean(String url, final ScreenBean screenBean) {

        return serviceManager.getScreenState(url)
                .flatMap(new Func1<String, Observable<ScreenBean>>() {
                    @Override
                    public Observable<ScreenBean> call(String s) {

                        if (!s.isEmpty()) {
                            LogUtils.e(TAG, "获取单个大屏信息success");
                            return Observable.just(screenBean);
                        }
                        LogUtils.e(TAG, "获取单个大屏信息null");
                        return Observable.empty();
                    }
                });

    }

上边的代码是最终版的
解释: 因为每个Observable在访问失败后,会调用.onErrorResumeNext(Observable.<ScreenBean>empty());的方法,就避免了后边的Observable被中断。最后每次成功的回调都会走onNext(),在onNext()中将成功的ScreenBean加入到mScreenList中。最后会在onCompleted()中刷新页面的回调。

  • 走过的弯路
    通过查资料,onErrorResumeNext(),这个操作符当发生错误的时候,由另外一个Observable来代替当前的Observable并继续发射数据。于是我就想到了用递归的方法,遇到错误后,继续后边的发射。
private Observable<ScreenBean> getNextObservable(final List<String> list) {

        if (mIndex >= list.size()) {
            return Observable.<ScreenBean>empty();
        }
        List<String> subList = list.subList(mIndex, list.size());
        return Observable.from(subList)
                .flatMap(new Func1<String, Observable<ScreenBean>>() {

                    @Override
                    public Observable<ScreenBean> call(String screenBeanStr) {
//                        ScreenBean screenBean = new Gson().fromJson(screenBeanStr, ScreenBean.class);
//                        String url = "http://" + screenBean.getLocalIP() + ":9031/GetState";

                        LogUtils.e(TAG, "getUsefulBean" + mIndex);
                        ScreenBean screenBean = new ScreenBean();
                        screenBean.setScreenName(screenBeanStr);
                        mIndex++;

                        return getUsefulBean(screenBeanStr, screenBean)
                                .onErrorResumeNext(getNextObservable(list));
                    }
                });
 }

在onErrorResumeNext的时候,继续调用自己getNextObservable,只不过根据mIndex这个数值,舍弃掉了之前的有问题的Observable。开始新的from。
但是,该例子中,是产生了一系列的Observable。就是说,每次遇到一个失败的Observable,就舍弃这个Observable,取而代之的是产生的后边的一系列的几个Observable。比如,有0,0,1,0这样的三个Observable(0代表失败的,1代表成功的)。第一次0的时候,失败了,新产生了(0,1,0)这个序列,但是之前的序列(0,0,1,0)除了第一个0被消费,其他的都存在。以此类推,会产生越来越多的Observable。最后的访问的次数理论是11次,有4个合格的数据(下图中4个1。最后成功的1和最后一次的0不产生新的Observable)。实际是9次,3个合理的数据(可能有没考虑到的问题,或者Rxjava的机制)。总之逻辑是错误的,应该就访问4次才对。所以不应该是用递归迭代。


递归访问的次数分析图

可以参考后边的类似的问题,继续发射另外的数据7/8/9的例子。后边跟我项目中的例子的不同点,是因为后边的那个例子就是一个Observable。

下边补充一下知识:(可自己百度“rxjava错误处理”)

一、错误处理操作符列表
用于对Observable发射的 onError 通知做出响应或者从错误中恢复,例如,你
可以:

吞掉这个错误,切换到一个备用的Observable继续发射数据
吞掉这个错误然后发射默认值
吞掉这个错误并立即尝试重启这个Observable
吞掉这个错误,在一些回退间隔后重启这个Observable


名称解释

OnErrorReturn

当发生错误的时候,让Observable发射一个预先定义好的数据并正常地终止

onErrorReturn方法 返回一个镜像原有Observable行为的新Observable
会忽略前者的onError调用,不会将错误传递给观察者,而是发射一个特殊的项并调用观察者的onCompleted方法。

Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                for (int i = 1; i <= 6; i++) {
                    if (i < 3) {
                        subscriber.onNext("onNext:" + i);
                    } else if (false) {
                        subscriber.onError(new Exception("Exception"));
                    } else {
                        subscriber.onError(new Throwable("Throw error"));
                    }
                }
            }
        }).onErrorReturn(throwable -> "onErrorReturn")
           .subscribe(new Observer<String>() {
                    @Override
                    public void onCompleted() {
                        logger("onErrorReturn-onCompleted\n");
                    }

                    @Override
                    public void onError(Throwable e) {
                        logger("onErrorReturn-onError:" + e.getMessage());
                    }

                    @Override
                    public void onNext(String s) {
                        logger("onErrorReturn-onNext:" + s);
                    }
                });

onErrorReturn-onNext:onNext:1
onErrorReturn-onNext:onNext:2
onErrorReturn-onNext:onErrorReturn
onErrorReturn-onCompleted

onErrorReturn在错误发生的时候继续发射了提前定义好的数据并正常结束Observable

onErrorResumeNext

当发生错误的时候,由另外一个Observable来代替当前的Observable并继续发射数据
onErrorResumeNext方法返回一个镜像原有Observable行为的新Observable

Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                for (int i = 1; i <= 6; i++) {
                    if (i < 3) {
                        subscriber.onNext("onNext:" + i);
                    } else if (false) {
                        subscriber.onError(new Exception("Exception"));
                    } else {
                        subscriber.onError(new Throwable("Throw error"));
                    }
                }
            }
        }).onErrorResumeNext(Observable.just("7", "8", "9"))
                .subscribe(new Observer<String>() {
                    @Override
                    public void onCompleted() {
                        logger("onErrorResume-onCompleted\n");
                    }

                    @Override
                    public void onError(Throwable e) {
                        logger("onErrorResume-onError:" + e.getMessage());
                    }

                    @Override
                    public void onNext(String s) {
                        logger("onErrorResume-onNext:" + s);
                    }
                });

onErrorResume-onNext:onNext:1
onErrorResume-onNext:onNext:2
onErrorResume-onNext:7
onErrorResume-onNext:8
onErrorResume-onNext:9
onErrorResume-onCompleted

onErrorResume在错误发生后,另一个 继续发射另外的数据7/8/9,然后正常结束了Observable。

OnExceptionResumeNext

类似于OnErrorResumeNext,不同之处在于其会对onError抛出的数据类型做判断,
如果是Exception,也会使用另外一个Observable代替原Observable继续发射数据,否则会将错误分发给Subscriber。

Retry

原始Observable遇到错误,重新订阅它期望它能正常终止.

Retry操作符不会将原始Observable的onError通知传递给观察者,它会订阅这个Observable,再给它一次机会无错误地完成它的数据序列。
Retry总是传递onNext通知给观察者,由于重新订阅,可能会造成数据项重复.
这个函数返回一个布尔值,如果返回true,retry应该再次订阅和镜像原始的Observable,
如果返回false,retry会将最新的一个onError通知传递给它的观察者。

retry(long):指定最多重新订阅的次数,如果次数超了,它不会尝试再次订阅,它会把最新的一个onError通知传递给它的观察者。

Rxjava还实现了RetryWhen操作符。和repeatWhen类似,但是一个是重复订阅(触发onComplete),一个是错误后重复订阅(触发onError())。

当错误发生时,retryWhen会接收onError的throwable作为参数,
并根据定义好的函数返回一个Observable,如果这个Observable发射一个数据,就会重新订阅,
否则,就将这个通知传递给观察者然后终止。

//定义createObserver方法,跟前边代码类似
createObserver().retry(2)
                .subscribe(new Observer<String>() {
                    @Override
                    public void onCompleted() {
                        logger("retry-onCompleted\n");
                    }

                    @Override
                    public void onError(Throwable e) {
                        logger("retry-onError:" + e.getMessage());
                    }

                    @Override
                    public void onNext(String s) {
                        logger("retry-onNext:" + s);
                    }
                });

retry-onNext:onNext:1
retry-onNext:onNext:2
retry-onNext:onNext:1
retry-onNext:onNext:2
retry-onNext:onNext:1
retry-onNext:onNext:2
retry-onError:Throw error

在尝试了几次还是产生错误后,retry会将错误分发给观察者

retryWhen

Observable.create((Subscriber<? super String> s) -> {
            logger("subscribing");
            s.onError(new RuntimeException("always fails"));
        }).retryWhen(attempts -> {
            return attempts.zipWith(Observable.range(1, 3), (n, i) -> i).flatMap(i -> {
                logger("delay retry by " + i + " second(s)");
                return Observable.timer(i, TimeUnit.SECONDS);
            });
        }).toBlocking().forEach(this::logger);

subscribing
delay retry by 1 second(s)
subscribing
delay retry by 2 second(s)
subscribing
delay retry by 3 second(s)
subscribing

在尝试了几次还是产生错误后,retryWhen会正常结束,并不会将错误分发出去。di

相关文章

网友评论

      本文标题:Rxjava复杂访问 flatMap + form

      本文链接:https://www.haomeiwen.com/subject/sfqpbftx.html