美文网首页
Rxjava复杂应用案例

Rxjava复杂应用案例

作者: 灬鱼飞 | 来源:发表于2020-08-21 14:36 被阅读0次
    /**
    * 先获取缓存数据,再获取网络数据
    **/
    Observable<String> observableDB = Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter emitter) throws Exception {
                    String data = getFormDB();
                    if(data!=null) {
                        emitter.onNext(data);
                    }else{
                        emitter.onComplete();
                    }
                }
            });
    
            Observable<String> observableNet= Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter emitter) throws Exception {
                    String data = getFormNet();
                    emitter.onNext(data);
    
                }
            }).doOnNext(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    save2DB(s);
                }
            });
    
            Observable.concat(observableDB,observableNet)
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(String s) throws Exception {
                            Log.i(TAG,s);
                        }
                    });
    
    
    /**
     * 变种
     * 首先我们需要从数据库中获取当前城市的天气数据,
     * 如果数据库中存在天气数据则在UI页面上展示天气数据;
     * 如果数据库中未存储当前城市的天气数据,
     * 或者已存储的天气数据的发布时间相比现在已经超过了一小时,并
     * 且网络属于连接状态则调用API从服务端获取天气数据。
     * 如果获取到到的天气数据发布时间和当前数据库中的天气数据发布时间一致则丢弃掉从服务端获取到的天气数据,
     * 如果不一致则更新数据库并且在页面上展示最新的天气信息。
     */
    
            Observable<Weather> obserDB = Observable.create(new ObservableOnSubscribe<Weather>() {
                @Override
                public void subscribe(ObservableEmitter<Weather> emitter) throws Exception {
                    //数据库获取缓存数据
                    Weather weather = getdbWather();
    
                    if (!"".equals(weather.msg))
                        emitter.onNext(weather);
    
                    //有网络且缓存的数据已经超过一个小时
                    //或者缓存为空时
                    if (isNetWork && System.currentTimeMillis() - weather.time > 60 * 60 * 1000) {
                        emitter.onComplete();
                    }
                }
            });
    
            Observable<Weather> obserNet = Observable.create(new ObservableOnSubscribe<Weather>() {
                @Override
                public void subscribe(final ObservableEmitter<Weather> emitter) throws Exception {
                    //请求网络数据源
                    Observable.timer(1, TimeUnit.SECONDS)
                            .subscribe(new Consumer<Long>() {
                                @Override
                                public void accept(Long aLong) throws Exception {
                                    Log.i(TAG, "获取网络数据");
                                    emitter.onNext(getnetWeather());
                                }
                            });
                }
            }).filter(new Predicate<Weather>() {
                @Override
                public boolean test(Weather weather) throws Exception {
                    boolean flag1 = weather != null && !TextUtils.isEmpty(weather.msg);
                    boolean flag2 = getdbWather().time - weather.time != 0;
                    Log.i(TAG, "过滤网络数据- 数据是否为空:" + flag1 + " 发布时间是否和缓存一致:" + flag2);
                    return flag1 && flag2;
                }
            }).doOnNext(new Consumer<Weather>() {
                @Override
                public void accept(final Weather weather) throws Exception {
                    Schedulers.io().createWorker().schedule(new Runnable() {
                        @Override
                        public void run() {
                            //插入缓存
                            Log.i(TAG, "插入缓存:" + weather.msg);
                        }
                    });
                }
            });
    
            Observable.concat(obserDB, obserNet).subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<Weather>() {
                        @Override
                        public void accept(Weather weather) throws Exception {
                            //展示天气
                            Log.i(TAG, "展示:" + weather.msg);
                        }
                    });
    
    
    /**
    
    * 4、支持切换城市,默认显示当前城市;
    
    *      a.若此时没有打开GPS权限,则弹窗提示先开启权限
    
    *      b.若获取不到当前城市,则按照用户上次所在城市显示;
    
    *      c.若没有上次所在城市,则按照用户所填所在城市显示;
    
    *      d.以上均没有数据,则默认所在城市为广州;
    
    */
    
    public void getCity() {
            Observable<City> obserGPS = Observable.create(new ObservableOnSubscribe<City>() {
                @Override
                public void subscribe(ObservableEmitter emitter) throws Exception {
                    boolean isPermission = getPermission();
                    boolean isOpenGPS = isOpenGps();
                    if (!isPermission) {
                        //提示权限
                    }
    
                    if (!isOpenGPS) {
                        //提示开关
                    }
                    
                    City city = getGPSCity();
    
                    if(city==null){
                        emitter.onComplete();
                    }
                    emitter.onNext(city);
    
                }
            });
    
    
            final Observable<City> obserDB = Observable.create(new ObservableOnSubscribe<City>() {
                @Override
                public void subscribe(ObservableEmitter emitter) throws Exception {
                        City city = getLastCity();
    
                        if(city==null){
                            city = getUserCity();
                        }
                        if(city==null){
                            city = new City("广州");
                        }
                        emitter.onNext(city);
    
                }
            });
    
            Observable.concat(obserGPS,obserDB)
                    .doOnNext(new Consumer<City>() {
                        @Override
                        public void accept(City city) throws Exception {
                            //缓存数据
                            saveCity(city);
                        }
                    }).subscribe(new Observer<City>() {
                @Override
                public void onSubscribe(Disposable d) {
                    
                }
    
                @Override
                public void onNext(City city) {
    
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            });
        }
    

    相关文章

      网友评论

          本文标题:Rxjava复杂应用案例

          本文链接:https://www.haomeiwen.com/subject/ekvujktx.html