美文网首页
Rxjava2应用篇

Rxjava2应用篇

作者: 钉某人 | 来源:发表于2018-12-21 11:03 被阅读0次
    Rxjava2实例.png

    Github地址:https://github.com/DingMouRen/RxJava2ExamplesDemo

    1.后台耗时操作,前台实时更新UI(下载文件,实时更新进度)

    • 自定义ResponseBody--ProgressResponseBody,获取到从网络读取的数据,并通过EventBus发送通知
    • 自定义拦截器ProgressInterceptor,对请求返回的响应Response进行处理,
    • 定义一个抽象类FileCallBack,1.用于封装类似Observer的接口,2.注册EventBus来接收文件的接收进度,
    • 自定义Observer--FileDownloadObserver,用于包裹FileCallBack
    final FileCallBack<ResponseBody> fileCallBack = new FileCallBack<ResponseBody>("","") {
                @Override
                public void onSuccess(ResponseBody responseBody) {
                    Log.e(TAG,"onSuccess:"+responseBody.toString());
                }
    
                @Override
                public void progress(long progress, long total) {
                    Log.e(TAG,total+"/"+progress);
                    mProgressBar.setMax((int) total);
                    mProgressBar.setProgress((int) progress);
    
                    DecimalFormat decimalFormat = new DecimalFormat("0.00");
                    String scaleStr = decimalFormat.format(progress * 1f/ total );
                    mTvProgress.setText( (int)(Float.parseFloat(scaleStr) * 100) +"%");
                }
    
                @Override
                public void onStart(Disposable disposable) {
    
                }
    
                @Override
                public void onCompleted() {
                    Log.e(TAG,"onComplete");
                    show("下载完成");
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.e(TAG,"onError:"+e.getMessage());
                }
            };
    
            HttpManager.createService(Api.class,new ProgressInterceptor())
                    .downloadApk()
                    .subscribeOn(Schedulers.io())
                    .observeOn(Schedulers.io())
                    .doOnNext(new Consumer<ResponseBody>() {
                        @Override
                        public void accept(ResponseBody responseBody) throws Exception {
    //                                    fileCallBack.saveFile(responseBody);
                        }
                    }).observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new FileDownloadObserver<ResponseBody>(fileCallBack));
    

    2.计算一段时间内数据的平均值

    • 通过操作符buffer(3000, TimeUnit.MILLISECONDS),将这段时间发射的数据缓存在集合中
    • 在Observer的onNext中对接收到的集合数据进行求平均值
     /**
         * rxjava处理
         */
        private void rxjavaCompose() {
            mPublishSubject = PublishSubject.create();
            DisposableObserver<List<Double>> disposableObserver = new DisposableObserver<List<Double>>() {
                @Override
                public void onNext(List<Double> temperatureList) {
                    double resultSum = 0;//温度的和
                    double resultAvera = 0;
                    Log.e("onNext","接收到集合的大小:"+temperatureList.size());
                    if (temperatureList.size() > 0){
                        for(Double temperature : temperatureList){
                            resultSum += temperature;
                        }
                        resultAvera = resultSum / temperatureList.size();
                    }
                    Log.e(mActivity.getClass().getSimpleName(),"更新平均温度:"+resultAvera);
    
                    final double finalResultAvera = resultAvera;
                    mTvAveraTemperature.post(new Runnable() {
                        @Override
                        public void run() {
                            mTvAveraTemperature.append("平均3秒温度:"+ finalResultAvera +"℃   时间:"+ new Date().toLocaleString()+"\n");
                            int scrollAmount = mTvAveraTemperature.getLayout().getLineTop(mTvAveraTemperature.getLineCount()) - mTvAveraTemperature.getHeight();
                            if (scrollAmount > 0){
                                mTvAveraTemperature.scrollTo(0,scrollAmount);
                            }else {
                                mTvAveraTemperature.scrollTo(0,0);
                            }
                        }
                    });
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            };
    
            mPublishSubject.buffer(3000, TimeUnit.MILLISECONDS).observeOn(AndroidSchedulers.mainThread()).subscribe(disposableObserver);
            mCompositeDisposable = new CompositeDisposable();//用于管理订阅与解除订阅
            mCompositeDisposable.add(disposableObserver);
    

    3.搜索联想功能优化

    需要优化的问题

    • 避免用户连续输入时造成发起不必要的请求。(debounce操作符来解决)
    • 避免用户输入未空时发起不必要的请求。(filter操作符来解决)
    • 避免前后发起两个请求,后面请求响应先于前面请求响应返回。(switch操作符来解决)
     /**
         * 初始化Observable
         */
        private void initObservable() {
    
            mPublishSubject = PublishSubject.create();
    
            mDisposableObserver = new DisposableObserver<MyResponse<String>>() {//Disposable是一个抽象的观察者,可以通过disposable进行异步取消
                @Override
                public void onNext(MyResponse<String> myResponse) {
                    Gson gson = new Gson();
                    mTvLog.setText(JsonUtils.formatJson(gson.toJson(myResponse)));
    
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            };
    
            mPublishSubject.debounce(200, TimeUnit.MILLISECONDS)//不会发射时间间隔小于200毫秒的,
                    .filter(new Predicate<String>() {//过滤操作符,只有字符串长度大于0才能发射
                        @Override
                        public boolean test(String s) throws Exception {
                            return s.length() > 0;
                        }
                    }).switchMap(new Function<String, ObservableSource<MyResponse<String>>>() {//switchMap操作符会保存最新的Observable产生的结果而舍弃旧的结果
                @Override
                public ObservableSource<MyResponse<String >> apply(String s) throws Exception {
                    return HttpManager.createService(Api.class).search(s).subscribeOn(Schedulers.io());
                }
            }).observeOn(AndroidSchedulers.mainThread())
                    .subscribe(mDisposableObserver);
    
            mCompositeDisposable = new CompositeDisposable();//用于取消订阅关系
    
            mCompositeDisposable.add(mDisposableObserver);//添加到订阅关系
        }
    

    4.轮询操作

    应用场景:有的时候需要我们尝试间隔一段时间就向服务器发起一次请求,但是又不适合引入长连接的场景。

    可以使用intervalRange操作符,参数含义:

    • start:发送数据的起始值,为Long型。
    • count:总共发送多少项数据。
    • initialDelay:发送第一个数据项时的起始时延。
    • period:两项数据之间的间隔时间。
    • TimeUnit:时间单位。
     /**
         * 固定时间间隔的轮询
         */
        private void startFixPolling() {
    
            Observable<MyResponse<String>> observableFix = Observable.intervalRange(0,5,0,1000, TimeUnit.MILLISECONDS)
                    .take(5)
                    .flatMap(new Function<Long, ObservableSource<MyResponse<String>>>() {
                        @Override
                        public ObservableSource<MyResponse<String>> apply(Long aLong) throws Exception {
                            return HttpManager.createService(Api.class).polling().subscribeOn(Schedulers.io());
                        }
                    });
    
            DisposableObserver<MyResponse<String>> disposableObserverFix = new DisposableObserver<MyResponse<String>>() {
                @Override
                public void onNext(MyResponse<String> response) {
                    mTvFix.append(response.data+"\n");
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            };
    
            observableFix.subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(disposableObserverFix);
    
            mCompositeDisposable.add(disposableObserverFix);
        }
    
    

    5.根据错误类型进行重试请求

    网络请求出错,重试的情况下需要处理的问题:

    • 限制重试的次数
    • 根据错误类型,判断是否需要重新请求
    • 根据错误类型,等待特定的时间后再去重新请求

    retryWhen操作符可以实现重新订阅,由onError事件来触发。

     private void retryDemo() {
            Observable<MyResponse<String>> observable = HttpManager.createService(Api.class).retry()
                    .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
                private int mRetryCount;
                @Override
                public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Exception {
                    return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
                        @Override
                        public ObservableSource<?> apply(Throwable throwable) throws Exception {
    
                            long waitTime = 0;//等待时间
                            if (throwable instanceof ConnectException){
                                mainThreadTextChange("ConnectException异常\n");
                                waitTime = 2000;
                            }
                            mRetryCount++;
                            if (waitTime > 0){
                                mainThreadTextChange("2秒后重新发起请求\n");
                            }
                            return waitTime > 0 && mRetryCount <= 4 ?
                                    Observable.timer(waitTime,TimeUnit.MILLISECONDS):
                                    Observable.error(throwable);
                        }
                    });
                }
            });
    
            DisposableObserver<MyResponse<String>> disposableObserver = new DisposableObserver<MyResponse<String>>() {
                @Override
                public void onNext(MyResponse<String> response) {
                    Gson gson = new Gson();
                    mTv.append("onNext:\n"+ JsonUtils.formatJson(gson.toJson(response))+"\n");
                }
    
                @Override
                public void onError(Throwable e) {
                    mTv.append("onError:"+e.getMessage()+"\n");
                }
    
                @Override
                public void onComplete() {
                    mTv.append("onComplete\n");
                }
            };
    
            observable.subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(disposableObserver);
    
            mCompositeDisposable.add(disposableObserver);
        }
    
        private void mainThreadTextChange(final String content){
            runOnUiThread(new Runnable() {
                @Override
                public void run() {
                    mTv.append(content);
                }
            });
        }
    

    6.多个表单的验证

    应用场景:登录场景中,需要账户是一定的长度,密码也有特定的长度,此时使用操作符combineLatest来实现需求。

    combineLatest可以接受多个Observable和一个函数作为参数。当其中的任意一个Observable发射数据后,会去获取其他的Observable最后一次发射的数据,回调到函数中。(此函数回调的前提是都至少发射过一次数据)

      private void initRxjava2() {
            mCompositeDisposable = new CompositeDisposable();
            mAccoountPublishSubject = PublishSubject.create();
            mPwdPublishSubject = PublishSubject.create();
    
            Observable<Boolean> observable = Observable.combineLatest(mAccoountPublishSubject, mPwdPublishSubject,
                    new BiFunction<String, String, Boolean>() {
                        @Override
                        public Boolean apply(String account, String pwd) throws Exception {
                            int nameLength = account.length();
                            int pwdLength = pwd.length();
                            return (nameLength >=3 && nameLength <=5) && (pwdLength >=6 && pwdLength <=10);
                        }
                    });
            DisposableObserver<Boolean> disposableObserver = new DisposableObserver<Boolean>() {
                @Override
                public void onNext(Boolean aBoolean) {
                    if (aBoolean){//两个输入框的内容都符合要求
                        mTvLogin.setEnabled(true);
                        mTvLogin.setBackgroundColor(Color.GREEN);
                    }else {//两个输入框有不符合要求的内容
                        mTvLogin.setEnabled(false);
                        mTvLogin.setBackgroundColor(Color.GRAY);
                    }
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            };
    
            observable.subscribe(disposableObserver);
            mCompositeDisposable.add(disposableObserver);
    
        }
    
        @Override
        public void initListener() {
            mEditAccount.addTextChangedListener(new EditTextWatcher(mAccoountPublishSubject));
            mEditPwd.addTextChangedListener(new EditTextWatcher(mPwdPublishSubject));
    
            mTvLogin.setOnClickListener(new View.OnClickListener() {
                @Override
                public void onClick(View v) {
                    Toast.makeText(mActivity,"登录成功",Toast.LENGTH_SHORT).show();
                }
            });
        }
    
        @Override
        protected void onDestroy() {
            super.onDestroy();
            if (mCompositeDisposable != null) mCompositeDisposable.clear();
        }
    
        /**
         * EditText的TextWatcher
         */
        public static class EditTextWatcher implements TextWatcher{
    
            private PublishSubject mPublishSubject;
    
            public EditTextWatcher(PublishSubject publishSubject){
                this.mPublishSubject = publishSubject;
            }
    
            @Override
            public void beforeTextChanged(CharSequence s, int start, int count, int after) {
    
            }
    
            @Override
            public void onTextChanged(CharSequence s, int start, int before, int count) {
    
            }
    
            @Override
            public void afterTextChanged(Editable s) {
                mPublishSubject.onNext(s.toString());
            }
        }
    

    7.优先加载本地缓存,同时发起网络请求

    应用场景:进入新页面,为了提升用户体验,再网络请求没有返回时,优先显示缓存数据。
    要求:同时发起请求网络数据和加载本地缓存数据。在网络数据未返回时,显示本地缓存数据;网络数据返回时,显示最新的网络数据

    几种实现方式的缺点:

    • concat实现:concat连接发射器A和发射器B,发射器A加载本地缓存数据,发射器B请求网络数据,此时只有发射器A读取完本地缓存数据后,发射器B才会去请求网络。这样的时间是加载本地缓存和请求网络的时间和。
    • concatEager实现:可以解决concat无法同时加载本地缓存和请求网络的需求,但是当读取本地缓存的时间大于请求网络的时间时,请求到的网络数据必须等本地缓存读取结束之后,才能传递给下游。所耗时间为加载本地缓存的时间,但是网络数据已经返回,本不需要等待加载本地缓存数据的。
    • merge实现:可以让多个observable同时发射数据,也不需要observable之间的相互等待,直接发射给下游。但是如果加载本地缓存的时间大于请求网络数据的时间,数据本来已经是最新的数据,但是此时又会被刷新成过时的本地缓存数据。

    好的实现方式:publish+merge+takeUntil

    • publish操作符:将普通的Observable转变成可连接的Observable,可连接的Observable类似于普通的Observable,除了它在订阅时不会开始发出项目,只有当Connect操作符应用于它时才开始。
    • takeUntil操作符:当事件满足设定的条件时,该事件的下一个事件不会被发送了。
     /**
         * 优先加载本地缓存数据,同时请求网络数据
         */
        private void requestData(final long delayTimeLocal, long delayTimeNet) {
    
            mProgressBar.setVisibility(View.VISIBLE);
    
            Observable<MyResponse<List<CacheToNetData>>> observable =
                    getNetData(delayTimeNet).publish(new Function<Observable<MyResponse<List<CacheToNetData>>>, ObservableSource<MyResponse<List<CacheToNetData>>>>() {
                        @Override
                        public ObservableSource<MyResponse<List<CacheToNetData>>> apply(Observable<MyResponse<List<CacheToNetData>>> netResponseObservable) throws Exception {
    
                            return Observable.merge(getLocalCacheData(delayTimeLocal),netResponseObservable )
                                    .takeUntil(new Predicate<MyResponse<List<CacheToNetData>>>() {
                                        @Override
                                        public boolean test(MyResponse<List<CacheToNetData>> listMyResponse) throws Exception {
                                            mainThreadTextLog("获取到的数据类型:"+listMyResponse.msg);
                                            return listMyResponse.msg.equals("成功");
                                        }
                                    });
                        }
                    });
    
    
            DisposableObserver<MyResponse<List<CacheToNetData>>> disposableObserver =
                    new DisposableObserver<MyResponse<List<CacheToNetData>>>() {
                        @Override
                        public void onNext(MyResponse<List<CacheToNetData>> listMyResponse) {
    
                            mProgressBar.setVisibility(View.GONE);
    
                            if (listMyResponse.code == 1) {
                                if (listMyResponse.msg.equals("本地数据")) {
                                    mainThreadTextLog("onNext --- 加载了本地数据");
                                } else {
                                    mainThreadTextLog("onNext --- 加载了网络数据");
                                }
                                mAdapter.setData(listMyResponse.data);
                            }
    
    
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            mainThreadTextLog("onError:" + e.getMessage());
                        }
    
                        @Override
                        public void onComplete() {
                            mainThreadTextLog("onComplete");
                        }
                    };
    
            observable.observeOn(AndroidSchedulers.mainThread())
                    .subscribe(disposableObserver);
            mCompositeDisposable.add(disposableObserver);
        }
    
        @Override
        protected void onDestroy() {
            super.onDestroy();
            if (mCompositeDisposable != null) mCompositeDisposable.clear();
        }
    
        /**
         * 获取本地缓存数据
         */
        public Observable<MyResponse<List<CacheToNetData>>> getLocalCacheData(final long delayTime) {
            return Observable.create(new ObservableOnSubscribe<MyResponse<List<CacheToNetData>>>() {
                @Override
                public void subscribe(ObservableEmitter<MyResponse<List<CacheToNetData>>> emitter) throws Exception {
                    try {
                        mainThreadTextLog("开始加载本地缓存数据");
    
                        Thread.sleep(delayTime);
    
                        List<CacheToNetData> list = new ArrayList<>();
    
                        for (int i = 0; i < 10; i++) {
                            CacheToNetData bean = new CacheToNetData("来自本地缓存", "数据项 --- " + i);
                            list.add(bean);
                        }
                        mainThreadTextLog("结束加载本地缓存数据");
                        emitter.onNext(new MyResponse<List<CacheToNetData>>("本地数据", 1, list));
                        emitter.onComplete();
    
    
                    } catch (Exception e) {
                        mainThreadTextLog("加载本地缓存数据异常:" + e.getMessage());
                        if (!emitter.isDisposed()) emitter.onError(e);
                    }
    
                }
            }).subscribeOn(Schedulers.io());
        }
    
        /**
         * 获取网络数据
         *
         * @param delayTime
         * @return
         */
        public Observable<MyResponse<List<CacheToNetData>>> getNetData(long delayTime) {
            mainThreadTextLog("开始请求网络数据");
            return HttpManager.createService(Api.class)
                    .getNetData(delayTime)
                    .subscribeOn(Schedulers.io())
                    .onErrorResumeNext(new Function<Throwable, ObservableSource<? extends MyResponse<List<CacheToNetData>>>>() {
                        @Override
                        public ObservableSource<? extends MyResponse<List<CacheToNetData>>> apply(Throwable throwable) throws Exception {
                            mainThreadTextLog("请求网络数据失败:" + throwable.getMessage());
                            return Observable.never();
                        }
                    });
        }
    
        /**
         * 主线程更新UI日志
         *
         * @param content
         */
        private void mainThreadTextLog(final String content) {
            mActivity.runOnUiThread(new Runnable() {
                @Override
                public void run() {
                    mTvLog.append(content + "\n");
                }
            });
        }
    

    8.倒计时

    使用intervalRange操作符实现倒计时功能

       /**
         * 开始倒计时
         * @param countDownTimeLong
         */
        private void startCountDown(final long countDownTimeLong) {
    
            mCompositeDisposable.clear();
    
            Observable<Long> observable = Observable.intervalRange(0,countDownTimeLong + 1,0,1, TimeUnit.SECONDS);
    
            DisposableObserver<Long> disposableObserver = new DisposableObserver<Long>() {
                @Override
                public void onNext(final Long aLong) {
                    runOnUiThread(new Runnable() {
                        @Override
                        public void run() {
                            mTvTime.setText(formatDuring((countDownTimeLong - aLong) * 1000));
                        }
                    });
    
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            };
    
            observable.subscribe(disposableObserver);
    
            mCompositeDisposable.add(disposableObserver);
    
        }
    

    9.嵌套请求

    应用场景:第一个网络请求之后,再进行一次网络请求,才能拿到需要得数据。
    例子:有两道门,宝藏在第二道门后面。需要先后打开两道门,必须打开第一道门之后,才能获取到第二道门得开门密码。

     private void requestData() {
    
            String inputStr = mEditText.getText().toString().trim();
    
            if (TextUtils.isEmpty(inputStr)){
                Toast.makeText(mActivity,"输入不能为空",Toast.LENGTH_SHORT).show();
                return;
            }
            int intputInt = Integer.parseInt(inputStr);
    
            HttpManager.createService(Api.class)
                    .openFirstDoor(intputInt)
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .doOnNext(new Consumer<MyResponse<Nest1Bean>>() {
                        @Override
                        public void accept(MyResponse<Nest1Bean> nest1BeanMyResponse) throws Exception {
    
                            mainThreadTextLog("doOnNext:\n"+JsonUtils.formatJson(new Gson().toJson(nest1BeanMyResponse)));
                        }
                    })
                    .observeOn(Schedulers.io())
                    .flatMap(new Function<MyResponse<Nest1Bean>, ObservableSource<MyResponse<Nest2Bean>>>() {
                        @Override
                        public ObservableSource<MyResponse<Nest2Bean>> apply(MyResponse<Nest1Bean> nest1BeanMyResponse) throws Exception {
                            mainThreadTextLog("获取第二道门得密码,去打开第二道门");
                            return HttpManager.createService(Api.class).openSecondDoor(nest1BeanMyResponse.data.password);
                        }
                    })
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Observer<MyResponse<Nest2Bean>>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                            mainThreadTextLog("onSubscribe");
                        }
    
                        @Override
                        public void onNext(MyResponse<Nest2Bean> nest2BeanMyResponse) {
                            mainThreadTextLog("onNext:\n"+JsonUtils.formatJson(new Gson().toJson(nest2BeanMyResponse)));
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            mainThreadTextLog("onError:"+e.getMessage());
                        }
    
                        @Override
                        public void onComplete() {
                            mainThreadTextLog("onComplete\n\n");
                        }
                    });
        }
    

    10.合并两个网络请求的数据

    应用场景:有的时候我们需要的数据,可能需要请求两个不同的接口才能得到,使用zip操作符可以实现需求

     private void requestData() {
            String gradeStr = mEditGrade.getText().toString().trim();
    
            if (TextUtils.isEmpty(gradeStr)){
                Toast.makeText(mActivity,"输入不能为空",Toast.LENGTH_SHORT).show();
                return;
            }
            int gradeInt = Integer.parseInt(gradeStr);
    
            Observable<MyResponse<Teacher>> observableTeacher = HttpManager.createService(Api.class).getTeacher(gradeInt);
    
            Observable<MyResponse<List<Student>>> observableStudents = HttpManager.createService(Api.class).getStudents(gradeInt);
    
            Observable.zip(observableTeacher, observableStudents,
                    new BiFunction<MyResponse<Teacher>, MyResponse<List<Student>>, ClassBean>() {
                        @Override
                        public ClassBean apply(MyResponse<Teacher> teacherMyResponse, MyResponse<List<Student>> studentListMyResponse) throws Exception {
    
                            mainThreadTextLog("请求到得老师数据:\n"+JsonUtils.formatJson(new Gson().toJson(teacherMyResponse))+
                                    "\n请求到得学生数据:\n"+JsonUtils.formatJson(new Gson().toJson(studentListMyResponse)));
    
                            String teacherName = teacherMyResponse.data.name;
                            String grade = teacherMyResponse.data.grade;
                            List<Student> studentList = studentListMyResponse.data;
                            ClassBean classBean = new ClassBean(teacherName,grade,studentList);
                            return classBean;
                        }
                    })
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Observer<ClassBean>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(ClassBean classBean) {
                            mainThreadTextLog("onNext合并后得数据:\n"+JsonUtils.formatJson(new Gson().toJson(classBean)));
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onComplete() {
                            mainThreadTextLog("onComplete\n\n");
                        }
                    });
    
    
        }
    

    相关文章

      网友评论

          本文标题:Rxjava2应用篇

          本文链接:https://www.haomeiwen.com/subject/kxhmkqtx.html