RxJava与Retrofit的使用案例

作者: tmyzh | 来源:发表于2018-03-12 14:43 被阅读51次

    创建一个被观察者,发起一个网络请求。
    observable

     Retrofit retrofit = new Retrofit.Builder()
                    .baseUrl(Urls.baseUrl)
                    .addConverterFactory(GsonConverterFactory.create())
                    .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                    .build();
            GetRequest request=retrofit.create(GetRequest.class);
            Observable<Translation> observable=request.getCall1();
    

    GetRequest代码

    public interface GetRequest  {
    
        @GET(Urls.test)
        Observable<Translation> getCall();
    
        @GET(Urls.test1)
        Observable<Translation> getCall1();
    

    Translation代码

    public class Translation {
    
        private int status;
        private content content;
        private static class content{
            private String from;
            private String to;
            private String vendor;
            private String out;
            private int errNo;
        }
        public String show(){
            Log.e("yzh",content.out);
            return content.out;
        }
    }
    

    对retrofit不太了解的可以去看一下retrofit入门

    1 模拟一次重复查询的情况,使用了Rxjava的interval操作符
    //最外面是一个循环的操作
      Observable.interval(2,1, TimeUnit.SECONDS)
                    .doOnNext(new Consumer<Long>() {
                        @Override
                        public void accept(Long aLong) throws Exception {
                            Log.e("yzh","第"+aLong+"次轮询");
                            //请求在这里执行,注意看到线程的切换
                            observable.subscribeOn(Schedulers.io())
                                    .observeOn(AndroidSchedulers.mainThread())
                                    .subscribe(new Observer<Translation>() {
                                        @Override
                                        public void onSubscribe(Disposable d) {
    
                                        }
    
                                        @Override
                                        public void onNext(Translation translation) {
                                            translation.show();
                                        }
    
                                        @Override
                                        public void onError(Throwable e) {
                                            Log.e("yzh","请求失败");
                                        }
    
                                        @Override
                                        public void onComplete() {
    
                                        }
                                    });
                        }
                    }).subscribe(new Observer<Long>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(Long aLong) {
    
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.e("yzh","对Error事件作出响应");
                }
    
                @Override
                public void onComplete() {
                    Log.e("yzh","对Complete事件作出响应");
                }
            });
    

    打印结果

    //        03-12 14:56:16.528 8733-8760/com.example.issuser.rxtest E/yzh: 第0次轮询
    //        03-12 14:56:16.886 8733-8733/com.example.issuser.rxtest E/yzh: 嗨世界
    //        03-12 14:56:17.528 8733-8760/com.example.issuser.rxtest E/yzh: 第1次轮询
    //        03-12 14:56:17.702 8733-8733/com.example.issuser.rxtest E/yzh: 嗨世界
    //        03-12 14:56:18.529 8733-8760/com.example.issuser.rxtest E/yzh: 第2次轮询
    //        03-12 14:56:18.675 8733-8733/com.example.issuser.rxtest E/yzh: 嗨世界
    //        03-12 14:56:19.528 8733-8760/com.example.issuser.rxtest E/yzh: 第3次轮询
    //        03-12 14:56:19.674 8733-8733/com.example.issuser.rxtest E/yzh: 嗨世界
    
    2.条件轮询 ,使用RxJava的repeatWhen

    被观察者的网络请求同上

    observable.repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
                @Override
                public ObservableSource<?> apply(Observable<Object> objectObservable) throws Exception {
                    return objectObservable.flatMap(new Function<Object, ObservableSource<?>>() {
                        @Override
                        public ObservableSource<?> apply(Object o) throws Exception {
                            //设置轮询条件
                            if(i>3){
                                return Observable.error(new Throwable("轮询结束"));
                            }else{
                                return Observable.just(1).delay(2, TimeUnit.SECONDS);
                            }
    
                        }
                    });
                }
            }).subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Observer<Translation>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            Log.e("yzh","onSubscribe");
                        }
    
                        @Override
                        public void onNext(Translation translation) {
                            translation.show();
                            i++;
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.e("yzh","onError--"+e.toString());
                        }
    
                        @Override
                        public void onComplete() {
    
                        }
                    });
    

    打印结果

            03-12 15:25:00.524 12910-12910/com.example.issuser.rxtest E/yzh: onSubscribe
            03-12 15:25:00.715 12910-12910/com.example.issuser.rxtest E/yzh: hi china
            03-12 15:25:02.852 12910-12910/com.example.issuser.rxtest E/yzh: hi china
            03-12 15:25:05.092 12910-12910/com.example.issuser.rxtest E/yzh: hi china
            03-12 15:25:07.244 12910-12910/com.example.issuser.rxtest E/yzh: hi china
            03-12 15:25:09.382 12910-12910/com.example.issuser.rxtest E/yzh: hi china
            03-12 15:25:09.395 12910-12910/com.example.issuser.rxtest E/yzh: onError--java.lang.Throwable: 轮询结束
    
    3 网络异常重连 这个是常见的情况但是不好模拟所以没有打印结果

    用到的是RxJava的retryWhen

    observable.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
               @Override
               public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
                   return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
                       @Override
                       public ObservableSource<?> apply(Throwable throwable) throws Exception {
                           Log.e("yzh","发生异常=="+throwable.toString());
                           if(throwable instanceof IOException){
                               Log.e("yzh","属于IO异常,重试");
                               if(currentRetryCount<maxConnectCount){
                                   currentRetryCount++;
                                   Log.e("yzh","重试的次数--"+currentRetryCount);
                                   waitRetryTime=1000+currentRetryCount*1000;
                                   Log.e("yzh","等待时间=="+waitRetryTime);
                                    return Observable.just(1).delay(waitRetryTime, TimeUnit.MILLISECONDS);
                               }else{
                                   return Observable.error(new Throwable("重试次数已超过设置次数 = " +currentRetryCount  + ",即 不再重试"));
    
                               }
                           }else{
                               return Observable.error(new Throwable("发生了非网络异常(非I/O异常)"));
                           }
    
                       }
                   });
               }
           }).subscribeOn(Schedulers.io())
                   .observeOn(AndroidSchedulers.mainThread())
                   .subscribe(new Observer<Translation>() {
                       @Override
                       public void onSubscribe(Disposable d) {
    
                       }
    
                       @Override
                       public void onNext(Translation translation) {
                           Log.e("yzh",  "发送成功");
                           translation.show();
                       }
    
                       @Override
                       public void onError(Throwable e) {
                            Log.e("yzh",e.toString());
                       }
    
                       @Override
                       public void onComplete() {
    
                       }
                   });
    
    4 对网络请求返回的数据再做一次处理,用到RxJava的flatmap

    常见的使用场景是注册并且直接登录这个场景,注意线程的切换

    observable.subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .doOnNext(new Consumer<Translation>() {
                        @Override
                        public void accept(Translation translation) throws Exception {
                            Log.e("yzh","doOnNextThread--"+Thread.currentThread().getName());
                            Log.e("yzh","第一次网络请求成功");
                            translation.show();
                        }
                    })
                    //切换观察者所在线程
                    .observeOn(Schedulers.io())
                    .flatMap(new Function<Translation, ObservableSource<Translation>>() {
                        @Override
                        public ObservableSource<Translation> apply(Translation translation) throws Exception {
                            Log.e("yzh","flatMapThread--"+Thread.currentThread().getName());
                            Log.e("yzh","flatmap--");
                            translation.show();
                            //另一个网络请求,这里就不详细列举了
                            return observable1;
                        }
                    })
                    //切换观察者所在线程
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<Translation>() {
                        @Override
                        public void accept(Translation translation) throws Exception {
                            Log.e("yzh","acceptThread--"+Thread.currentThread().getName());
                            Log.e("yzh","accept--");
                            translation.show();
                        }
                    }, new Consumer<Throwable>() {
                        @Override
                        public void accept(Throwable throwable) throws Exception {
                        }
                    });
    

    打印结果 注意在第二次请求时 观察者所在线程的切换

    03-12 11:50:40.681 11667-11667/com.example.issuser.rxtest E/yzh: doOnNextThread--main
    03-12 11:50:40.681 11667-11667/com.example.issuser.rxtest E/yzh: 第一次网络请求成功
    03-12 11:50:40.681 11667-11667/com.example.issuser.rxtest E/yzh: 嗨世界
    03-12 11:50:40.681 11667-11720/com.example.issuser.rxtest E/yzh: flatMapThread--RxCachedThreadScheduler-2
    03-12 11:50:40.681 11667-11720/com.example.issuser.rxtest E/yzh: flatmap--
    03-12 11:50:40.681 11667-11720/com.example.issuser.rxtest E/yzh: 嗨世界
    03-12 11:50:40.949 11667-11667/com.example.issuser.rxtest E/yzh: acceptThread--main
    03-12 11:50:40.949 11667-11667/com.example.issuser.rxtest E/yzh: accept--
    03-12 11:50:40.949 11667-11667/com.example.issuser.rxtest E/yzh: hi china
    
    5 联合判断多个条件 应用于登录注册等需要填完多个信息

    a. RxTextView.textChanges()监听控件的数据变化 ,需要引入依赖:compile 'com.jakewharton.rxbinding2:rxbinding:2.0.0'
    b. skip(1) 跳过控件一开始无任何输入值的情况

    Observable<CharSequence> nameObservable = RxTextView.textChanges(et_name).skip(1);
     Observable<CharSequence> ageObservable = RxTextView.textChanges(et_age).skip(1);
     Observable<CharSequence> jobObservable = RxTextView.textChanges(et_job).skip(1);
    
    Observable.combineLatest(nameObservable, ageObservable, jobObservable, new Function3<CharSequence, CharSequence, CharSequence, Boolean>() {
                @Override
                public Boolean apply(CharSequence charSequence, CharSequence charSequence2, CharSequence charSequence3) throws Exception {
                    boolean isUserNameValid= !TextUtils.isEmpty(et_name.getText().toString());
                    boolean isUserAgeValid=!TextUtils.isEmpty(et_age.getText().toString());
                    boolean isUserJobValid=!TextUtils.isEmpty(et_job.getText().toString());
    
                    return isUserNameValid&isUserAgeValid&isUserJobValid;
                }
            }).subscribe(new Consumer<Boolean>() {
                @Override
                public void accept(Boolean aBoolean) throws Exception {
                    Log.e("yzh","提交按钮是否可以点击--"+aBoolean);
                }
            });
    
    6 使用RxJava时要注意的内存泄露,例如有时候页面结束 异步操作并未完成
    //        防止activity结束 出现问题
              private final CompositeDisposable disposables = new CompositeDisposable();
              disposables.add(observer);
              @Override
              protected void onDestroy() {
                  super.onDestroy();
                  // 将所有的 observer 取消订阅
                  disposables.clear();
             }
    

    相关文章

      网友评论

        本文标题:RxJava与Retrofit的使用案例

        本文链接:https://www.haomeiwen.com/subject/bzcnfftx.html