美文网首页Android-RxJavaAndroid-Rxjava&retrofit&dagger奔跑吧,JAVA!
这可能是最好的RxJava 2.x 入门教程(五)

这可能是最好的RxJava 2.x 入门教程(五)

作者: nanchen2251 | 来源:发表于2017-07-03 12:12 被阅读12132次

    这可能是最好的 RxJava 2.x入门教程系列专栏
    文章链接:
    这可能是最好的 RxJava 2.x 入门教程(完结版)【重磅推出】
    这可能是最好的 RxJava 2.x 入门教程(一)
    这可能是最好的 RxJava 2.x 入门教程(二)
    这可能是最好的 RxJava 2.x 入门教程(三)
    这可能是最好的 RxJava 2.x 入门教程(四)
    这可能是最好的 RxJava 2.x 入门教程(五)
    GitHub 代码同步更新:https://github.com/nanchen2251/RxJava2Examples
    为了满足大家的饥渴难耐,GitHub 将同步更新代码,主要包含基本的代码封装,RxJava 2.x 所有操作符应用场景介绍和实际应用场景,后期除了 RxJava 可能还会增添其他东西,总之,GitHub 上的 Demo 专为大家倾心打造。传送门:https://github.com/nanchen2251/RxJava2Examples

    前言

    终于如愿来到让我小伙伴们亢奋的 RxJava 2 使用场景举例了,前面几章中我们讲解完了 RxJava 1.x 到 RxJava 2.x 的异同以及 RxJava 2.x 的各种操作符使用,如有疑问,欢迎点击上方的链接进入你想要的环节。

    正题

    简单的网络请求

    想必大家都知道,很多时候我们在使用 RxJava 的时候总是和 Retrofit 进行结合使用,而为了方便演示,这里我们就暂且采用 OkHttp3 进行演示,配合 mapdoOnNext ,线程切换进行简单的网络请求:

    • 1)通过 Observable.create() 方法,调用 OkHttp 网络请求;
    • 2)通过 map 操作符集合 gson,将 Response 转换为 bean 类;
    • 3)通过 doOnNext() 方法,解析 bean 中的数据,并进行数据库存储等操作;
    • 4)调度线程,在子线程中进行耗时操作任务,在主线程中更新 UI ;
    • 5)通过 subscribe(),根据请求成功或者失败来更新 UI 。
    Observable.create(new ObservableOnSubscribe<Response>() {
                @Override
                public void subscribe(@NonNull ObservableEmitter<Response> e) throws Exception {
                    Builder builder = new Builder()
                            .url("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512")
                            .get();
                    Request request = builder.build();
                    Call call = new OkHttpClient().newCall(request);
                    Response response = call.execute();
                    e.onNext(response);
                }
            }).map(new Function<Response, MobileAddress>() {
                        @Override
                        public MobileAddress apply(@NonNull Response response) throws Exception {
    
                            Log.e(TAG, "map 线程:" + Thread.currentThread().getName() + "\n");
                            if (response.isSuccessful()) {
                                ResponseBody body = response.body();
                                if (body != null) {
                                    Log.e(TAG, "map:转换前:" + response.body());
                                    return new Gson().fromJson(body.string(), MobileAddress.class);
                                }
                            }
                            return null;
                        }
                    }).observeOn(AndroidSchedulers.mainThread())
                    .doOnNext(new Consumer<MobileAddress>() {
                        @Override
                        public void accept(@NonNull MobileAddress s) throws Exception {
                            Log.e(TAG, "doOnNext 线程:" + Thread.currentThread().getName() + "\n");
                            mRxOperatorsText.append("\ndoOnNext 线程:" + Thread.currentThread().getName() + "\n");
                            Log.e(TAG, "doOnNext: 保存成功:" + s.toString() + "\n");
                            mRxOperatorsText.append("doOnNext: 保存成功:" + s.toString() + "\n");
    
                        }
                    }).subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<MobileAddress>() {
                        @Override
                        public void accept(@NonNull MobileAddress data) throws Exception {
                            Log.e(TAG, "subscribe 线程:" + Thread.currentThread().getName() + "\n");
                            mRxOperatorsText.append("\nsubscribe 线程:" + Thread.currentThread().getName() + "\n");
                            Log.e(TAG, "成功:" + data.toString() + "\n");
                            mRxOperatorsText.append("成功:" + data.toString() + "\n");
                        }
                    }, new Consumer<Throwable>() {
                        @Override
                        public void accept(@NonNull Throwable throwable) throws Exception {
                            Log.e(TAG, "subscribe 线程:" + Thread.currentThread().getName() + "\n");
                            mRxOperatorsText.append("\nsubscribe 线程:" + Thread.currentThread().getName() + "\n");
    
                            Log.e(TAG, "失败:" + throwable.getMessage() + "\n");
                            mRxOperatorsText.append("失败:" + throwable.getMessage() + "\n");
                        }
                    });
    

    为了方便,我们后面的讲解大部分采用开源的 Rx2AndroidNetworking 来处理,数据来源于天狗网等多个公共API接口。

    mRxOperatorsText.append("RxNetworkActivity\n");
            Rx2AndroidNetworking.get("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512")
                    .build()
                    .getObjectObservable(MobileAddress.class)
                    .observeOn(AndroidSchedulers.mainThread()) // 为doOnNext() 指定在主线程,否则报错
                    .doOnNext(new Consumer<MobileAddress>() {
                        @Override
                        public void accept(@NonNull MobileAddress data) throws Exception {
                            Log.e(TAG, "doOnNext:"+Thread.currentThread().getName()+"\n" );
                            mRxOperatorsText.append("\ndoOnNext:"+Thread.currentThread().getName()+"\n" );
                            Log.e(TAG,"doOnNext:"+data.toString()+"\n");
                            mRxOperatorsText.append("doOnNext:"+data.toString()+"\n");
                        }
                    })
                    .map(new Function<MobileAddress, ResultBean>() {
                        @Override
                        public ResultBean apply(@NonNull MobileAddress mobileAddress) throws Exception {
                            Log.e(TAG, "\n" );
                            mRxOperatorsText.append("\n");
                            Log.e(TAG, "map:"+Thread.currentThread().getName()+"\n" );
                            mRxOperatorsText.append("map:"+Thread.currentThread().getName()+"\n" );
                            return mobileAddress.getResult();
                        }
                    })
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<ResultBean>() {
                        @Override
                        public void accept(@NonNull ResultBean data) throws Exception {
                            Log.e(TAG, "subscribe 成功:"+Thread.currentThread().getName()+"\n" );
                            mRxOperatorsText.append("\nsubscribe 成功:"+Thread.currentThread().getName()+"\n" );
                            Log.e(TAG, "成功:" + data.toString() + "\n");
                            mRxOperatorsText.append("成功:" + data.toString() + "\n");
                        }
                    }, new Consumer<Throwable>() {
                        @Override
                        public void accept(@NonNull Throwable throwable) throws Exception {
                            Log.e(TAG, "subscribe 失败:"+Thread.currentThread().getName()+"\n" );
                            mRxOperatorsText.append("\nsubscribe 失败:"+Thread.currentThread().getName()+"\n" );
                            Log.e(TAG, "失败:"+ throwable.getMessage()+"\n" );
                            mRxOperatorsText.append("失败:"+ throwable.getMessage()+"\n");
                        }
                    });
    

    先读取缓存,如果缓存没数据再通过网络请求获取数据后更新UI

    想必在实际应用中,很多时候(对数据操作不敏感时)都需要我们先读取缓存的数据,如果缓存没有数据,再通过网络请求获取,随后在主线程更新我们的 UI。

    concat 操作符简直就是为我们这种需求量身定做。

    concat 可以做到不交错的发射两个甚至多个 Observable 的发射事件,并且只有前一个 Observable 终止( onComplete() ) 后才会定义下一个 Observable

    利用这个特性,我们就可以先读取缓存数据,倘若获取到的缓存数据不是我们想要的,再调用 onComplete() 以执行获取网络数据的 Observable,如果缓存数据能应我们所需,则直接调用 onNext() ,防止过度的网络请求,浪费用户的流量。

    Observable<FoodList> cache = Observable.create(new ObservableOnSubscribe<FoodList>() {
                @Override
                public void subscribe(@NonNull ObservableEmitter<FoodList> e) throws Exception {
                    Log.e(TAG, "create当前线程:"+Thread.currentThread().getName() );
                    FoodList data = CacheManager.getInstance().getFoodListData();
    
                    // 在操作符 concat 中,只有调用 onComplete 之后才会执行下一个 Observable
                    if (data != null){ // 如果缓存数据不为空,则直接读取缓存数据,而不读取网络数据
                        isFromNet = false;
                        Log.e(TAG, "\nsubscribe: 读取缓存数据:" );
                        runOnUiThread(new Runnable() {
                            @Override
                            public void run() {
                                mRxOperatorsText.append("\nsubscribe: 读取缓存数据:\n");
                            }
                        });
    
                        e.onNext(data);
                    }else {
                        isFromNet = true;
                        runOnUiThread(new Runnable() {
                            @Override
                            public void run() {
                                mRxOperatorsText.append("\nsubscribe: 读取网络数据:\n");
                            }
                        });
                        Log.e(TAG, "\nsubscribe: 读取网络数据:" );
                        e.onComplete();
                    }
    
    
                }
            });
    
            Observable<FoodList> network = Rx2AndroidNetworking.get("http://www.tngou.net/api/food/list")
                    .addQueryParameter("rows",10+"")
                    .build()
                    .getObjectObservable(FoodList.class);
    
    
            // 两个 Observable 的泛型应当保持一致
    
            Observable.concat(cache,network)
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<FoodList>() {
                        @Override
                        public void accept(@NonNull FoodList tngouBeen) throws Exception {
                            Log.e(TAG, "subscribe 成功:"+Thread.currentThread().getName() );
                            if (isFromNet){
                                mRxOperatorsText.append("accept : 网络获取数据设置缓存: \n");
                                Log.e(TAG, "accept : 网络获取数据设置缓存: \n"+tngouBeen.toString() );
                                CacheManager.getInstance().setFoodListData(tngouBeen);
                            }
    
                            mRxOperatorsText.append("accept: 读取数据成功:" + tngouBeen.toString()+"\n");
                            Log.e(TAG, "accept: 读取数据成功:" + tngouBeen.toString());
                        }
                    }, new Consumer<Throwable>() {
                        @Override
                        public void accept(@NonNull Throwable throwable) throws Exception {
                            Log.e(TAG, "subscribe 失败:"+Thread.currentThread().getName() );
                            Log.e(TAG, "accept: 读取数据失败:"+throwable.getMessage() );
                            mRxOperatorsText.append("accept: 读取数据失败:"+throwable.getMessage()+"\n");
                        }
                    });
    

    有时候我们的缓存可能还会分为 memory 和 disk ,实际上都差不多,无非是多写点 Observable ,然后通过 concat 合并即可。

    多个网络请求依次依赖

    想必这种情况也在实际情况中比比皆是,例如用户注册成功后需要自动登录,我们只需要先通过注册接口注册用户信息,注册成功后马上调用登录接口进行自动登录即可。

    我们的 flatMap 恰好解决了这种应用场景,flatMap 操作符可以将一个发射数据的 Observable 变换为多个 Observables ,然后将它们发射的数据合并后放到一个单独的 Observable,利用这个特性,我们很轻松地达到了我们的需求。

    Rx2AndroidNetworking.get("http://www.tngou.net/api/food/list")
                    .addQueryParameter("rows", 1 + "")
                    .build()
                    .getObjectObservable(FoodList.class) // 发起获取食品列表的请求,并解析到FootList
                    .subscribeOn(Schedulers.io())        // 在io线程进行网络请求
                    .observeOn(AndroidSchedulers.mainThread()) // 在主线程处理获取食品列表的请求结果
                    .doOnNext(new Consumer<FoodList>() {
                        @Override
                        public void accept(@NonNull FoodList foodList) throws Exception {
                            // 先根据获取食品列表的响应结果做一些操作
                            Log.e(TAG, "accept: doOnNext :" + foodList.toString());
                            mRxOperatorsText.append("accept: doOnNext :" + foodList.toString()+"\n");
                        }
                    })
                    .observeOn(Schedulers.io()) // 回到 io 线程去处理获取食品详情的请求
                    .flatMap(new Function<FoodList, ObservableSource<FoodDetail>>() {
                        @Override
                        public ObservableSource<FoodDetail> apply(@NonNull FoodList foodList) throws Exception {
                            if (foodList != null && foodList.getTngou() != null && foodList.getTngou().size() > 0) {
                                return Rx2AndroidNetworking.post("http://www.tngou.net/api/food/show")
                                        .addBodyParameter("id", foodList.getTngou().get(0).getId() + "")
                                        .build()
                                        .getObjectObservable(FoodDetail.class);
                            }
                            return null;
    
                        }
                    })
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<FoodDetail>() {
                        @Override
                        public void accept(@NonNull FoodDetail foodDetail) throws Exception {
                            Log.e(TAG, "accept: success :" + foodDetail.toString());
                            mRxOperatorsText.append("accept: success :" + foodDetail.toString()+"\n");
                        }
                    }, new Consumer<Throwable>() {
                        @Override
                        public void accept(@NonNull Throwable throwable) throws Exception {
                            Log.e(TAG, "accept: error :" + throwable.getMessage());
                            mRxOperatorsText.append("accept: error :" + throwable.getMessage()+"\n");
                        }
                    });
    

    结合多个接口的数据更新 UI

    在实际应用中,我们极有可能会在一个页面显示的数据来源于多个接口,这时候我们的 zip 操作符为我们排忧解难。

    zip 操作符可以将多个 Observable 的数据结合为一个数据源再发射出去。

    Observable<MobileAddress> observable1 = Rx2AndroidNetworking.get("http://api.avatardata.cn/MobilePlace/LookUp?key=ec47b85086be4dc8b5d941f5abd37a4e&mobileNumber=13021671512")
                    .build()
                    .getObjectObservable(MobileAddress.class);
    
            Observable<CategoryResult> observable2 = Network.getGankApi()
                    .getCategoryData("Android",1,1);
    
            Observable.zip(observable1, observable2, new BiFunction<MobileAddress, CategoryResult, String>() {
                @Override
                public String apply(@NonNull MobileAddress mobileAddress, @NonNull CategoryResult categoryResult) throws Exception {
                    return "合并后的数据为:手机归属地:"+mobileAddress.getResult().getMobilearea()+"人名:"+categoryResult.results.get(0).who;
                }
            }).subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(@NonNull String s) throws Exception {
                            Log.e(TAG, "accept: 成功:" + s+"\n");
                        }
                    }, new Consumer<Throwable>() {
                        @Override
                        public void accept(@NonNull Throwable throwable) throws Exception {
                            Log.e(TAG, "accept: 失败:" + throwable+"\n");
                        }
                    });
    

    间隔任务实现心跳

    想必即时通讯等需要轮训的任务在如今的 APP 中已是很常见,而 RxJava 2.x 的 interval 操作符可谓完美地解决了我们的疑惑。

    这里就简单的意思一下轮训。

    private Disposable mDisposable;
        @Override
        protected void doSomething() {
            mDisposable = Flowable.interval(1, TimeUnit.SECONDS)
                    .doOnNext(new Consumer<Long>() {
                        @Override
                        public void accept(@NonNull Long aLong) throws Exception {
                            Log.e(TAG, "accept: doOnNext : "+aLong );
                        }
                    })
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<Long>() {
                        @Override
                        public void accept(@NonNull Long aLong) throws Exception {
                            Log.e(TAG, "accept: 设置文本 :"+aLong );
                            mRxOperatorsText.append("accept: 设置文本 :"+aLong +"\n");
                        }
                    });
        }
    
        /**
         * 销毁时停止心跳
         */
        @Override
        protected void onDestroy() {
            super.onDestroy();
            if (mDisposable != null){
                mDisposable.dispose();
            }
        }
    

    后记

    姑且先讲到这里吧,喜欢的小伙伴别忘了关注和点赞哦~ https://github.com/nanchen2251/RxJava2Examples

    做不完的开源,写不完的矫情。欢迎扫描下方二维码或者公众号搜索「nanchen」关注我的微信公众号,目前多运营 Android ,尽自己所能为你提升。如果你喜欢,为我点赞分享吧~


    nanchen

    相关文章

      网友评论

      • aab70d1332a0:rxjava系列看完了,楼主很棒
      • Mr_panmin:结合场景更清晰了,谢谢楼主分享
      • ca5fdfefaee2:作为一个新手,我怎么觉得好难看懂呢?????
        nanchen2251:@小哲_5a2d 哈哈。推荐边写边看。看下github的demo
      • 小恐龙v6:感谢作者的无私分享 一到五看完了 感想 了解是一回事 看完是一回事 会不会又是另一回事 :stuck_out_tongue_closed_eyes:
        nanchen2251:@号土猪 多实战实战,什么都不是问题,推荐还是看完结版
      • 真像大白:想请教个问题 关于flatmap 这个操作符
        如果想把整个 FoodList 都取到 FoodDetail 怎么做呢
        而不是只取第一个foodList.getTngou().get(0)
        不想用 for 循环
      • liuhedev:doOnNext 里边,你不是要做保存到数据库操作吗?
      • 21d4822bdee6:谢谢大神的指点。请问doOnNext和onNext是什么区别:joy:
      • 043072463d5c:问一下rxlifecycle还有必要用,谢谢
      • a9da78ea2416:上面提供的接口数据。好多都没有数据吧、
        nanchen2251:@a9da78ea2416 接口都是第三方的,第三方可能不太稳定,有时候就会出现这样的情况。
      • a9da78ea2416:感谢分享你的前四篇关于操作符的写的很好,第五篇关于网络感觉应该封装一些,感觉代码好多。
        nanchen2251:@a9da78ea2416 哈哈,其实只是想做个demo展示,封装没有在讲解范围内哦。
      • 黑白咖:签到
      • KingJA:读取网络缓存那个例子,博主可以尝试抽取出基类,这样意义更大
      • 74025b8b0dd9:“concat 可以做到不交错的发射两个甚至多个 Observable 的发射事件,并且只有前一个 Observable 终止( onComplete() ) 后才会定义下一个 Observable。”(出自本篇文章,先读取缓存,如果缓存没数据再通过网络请求获取数据后更新UI)
        “对于单一的把两个发射器连接成一个发射器,虽然 zip 不能完成,但我们还是可以自力更生,官方提供的 concat 让我们的问题得到了完美解决。”(出自《这可能是最好的RxJava2第二篇》concat的介绍)
        上面两句话不相互矛盾吗,第一段说concat当前一个发射器终止才会定义下一个,第二段说concat把两个发射器连接成一个发射器再发送事件。
        xiaobailong24: @空窗记忆 不矛盾吧,都是等着上一个发送完成,再发送另一个,然后组合起来一起发送
      • 14b54e286699:感谢博主,非常实用,期待后续!
      • 泽毛:实际应用这块写的真棒👍
      • 爱笑的眼婧:天狗网现在还能用?
        nanchen2251:@爱笑的眼婧 写的时候肯定能用的,现在还能不能就不清楚了。
      • 24K纯帅豆:楼主有没有试过retryWhen呢?根据错误类型进行重试
      • 6db6e8841e43:感谢楼主分享:+1:
      • C_Sev:厉害👍,谢谢你的教程,加快了我学习的速度,我是做系统的,真心没时间看应用框架
        C_Sev:@南尘2251 其实我是个大彩笔,怎么办??
        nanchen2251:@晴天之助 没事儿,都是应该的。权当开发笔记吧。

      本文标题:这可能是最好的RxJava 2.x 入门教程(五)

      本文链接:https://www.haomeiwen.com/subject/vevacxtx.html