美文网首页
Retrofit & RxJava 实战篇

Retrofit & RxJava 实战篇

作者: kjy_112233 | 来源:发表于2018-09-12 15:26 被阅读0次

    一、网络请求实现:轮询、嵌套、合并数据、缓存、出错重试

    public class RetrofitRxJavaStudy {
    
        private static NetworkInterface networkInterface;
    
        //获取接口类NetworkInterface并初始化Retrofit
        private static NetworkInterface getRequestInterface() {
            if (networkInterface == null) {
                synchronized (NetworkInterface.class) {
                    networkInterface = initRetrofit().create(NetworkInterface.class);
                }
            }
            return networkInterface;
        }
    
        private static Retrofit initRetrofit() {
            OkHttpClient client = new OkHttpClient.Builder()
                    .build();
            Retrofit retrofit = new Retrofit.Builder()
                    .baseUrl(NetworkInterface.HOST)//设置网络请求的Url地址
                    .addConverterFactory(GsonConverterFactory.create())//设置数据解析器
                    .addCallAdapterFactory(RxJava2CallAdapterFactory.create())//支持RxJava平台
                    .client(client)
                    .build();
            return retrofit;
        }
    
        //初始化请求数据参数
        private RequestBody initData() {
            Gson gson = new Gson();
            Map<String, String> map = new HashMap<String, String>();
            map.put("phone", "phone");
            map.put("code", "code");
            String strJson = gson.toJson(map);
            RequestBody body = RequestBody.create(okhttp3.MediaType.parse("application/json; charset=utf-8"), strJson);
            return body;
        }
    
    
        //无条件网络请求轮询
        private void interval() {
            //initialDelay:第一次延迟时间、period:间隔时间数字、unit:时间单位
            Observable.interval(2, 10, TimeUnit.SECONDS).doOnNext(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    //网络请求具体实现
                    getRequestInterface()
                            .registerLogin("login/login", initData())
                            .observeOn(AndroidSchedulers.mainThread())//回调在主线程
                            .subscribeOn(Schedulers.io())
                            .subscribe();//执行在io线程
                }
            }).subscribe();
    
            //start:事件序列起始点、count:事件数量、initialDelay:第1次事件延迟发送时间、period:间隔时间数字、unit:时间单位
            Observable.intervalRange(2, 10, 5, 10, TimeUnit.SECONDS).doOnNext(new Consumer<Long>() {
                @Override
                public void accept(Long aLong) throws Exception {
                    //网络请求具体实现code...
                }
            }).subscribe();
        }
    
        //有条件网络请求轮询
        private void repeatWhen() {
            final int[] i = {0};
            getRequestInterface()
                    .registerLogin("login/login", initData())
                    .repeatWhen(new Function<Observable<Object>, ObservableSource<?>>() {
                        @Override
                        public ObservableSource<?> apply(Observable<Object> objectObservable) throws Throwable {
                            if (i[0] > 3) {
                                // 此处选择发送onError事件以结束轮询
                                return Observable.error(new Throwable("轮询结束"));
                            }
                            //delay操作符延迟一段时间发送以实现轮询间间隔设置
                            return Observable.just(1).delay(5000, TimeUnit.MILLISECONDS);
                        }
                    })
                    .observeOn(AndroidSchedulers.mainThread())//回调在主线程
                    .subscribeOn(Schedulers.io())//执行在io线程
                    .subscribe(new Consumer<RegisterLogin>() {
                        @Override
                        public void accept(RegisterLogin registerLogin) throws Throwable {
                            i[0]++;
                        }
                    });
        }
    
        //网络请求嵌套回调
        private void doOnNext() {
            getRequestInterface()
                    .registerLogin("login/login", initData())
                    .observeOn(AndroidSchedulers.mainThread())//回调在主线程
                    .subscribeOn(Schedulers.io())//执行在io线程
                    .doOnNext(new Consumer<RegisterLogin>() {
                        @Override
                        public void accept(RegisterLogin registerLogin) throws Throwable {
                            //第1次网络请求成功
                        }
                    })
                    .observeOn(Schedulers.io())//新观察者切换到IO线程去发起请求
                    .flatMap(new Function<RegisterLogin, ObservableSource<RegisterLogin>>() {
                        @Override
                        public ObservableSource<RegisterLogin> apply(RegisterLogin registerLogin) throws Throwable {
                            //registerLogin是网络请求1返回数据,发送网络请求2
                            return getRequestInterface()
                                    .registerLogin("login/login", initData());
                        }
                    })
                    .observeOn(AndroidSchedulers.mainThread())//切换到主线程处理网络请求2的结果
                    .subscribe(new Consumer<RegisterLogin>() {
                        @Override
                        public void accept(RegisterLogin o) throws Throwable {
                            //o是网络请求2返回数据
                        }
                    });
        }
    
        //采用 Merge()操作符合并数据
        private void merge() {
            Observable<String> network = Observable.just("网络");
            Observable<String> file = Observable.just("本地文件");
            Observable.merge(network, file).subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {
                }
    
                @Override
                public void onNext(String value) {
                }
    
                @Override
                public void onError(Throwable e) {
                }
    
                @Override
                public void onComplete() {
                }
            });
        }
    
        //采用Zip()操作符合并数据(网络数据 + 网络数据)
        private void zip() {
            final Observable<RegisterLogin> observable1 = getRequestInterface()
                    .registerLogin("login/login", initData())
                    .subscribeOn(Schedulers.io());//执行在io线程
            final Observable<RegisterLogin> observable2 = getRequestInterface()
                    .registerLogin("login/login", initData())
                    .subscribeOn(Schedulers.io());//执行在io线程
            Observable.zip(observable1, observable2, new BiFunction<RegisterLogin, RegisterLogin, Object>() {
                @Override
                public Object apply(RegisterLogin registerLogin, RegisterLogin registerLogin2) throws Throwable {
                    return registerLogin.getMessage() + " & " + registerLogin2.getMessage();
                }
            }).observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<Object>() {
                        @Override
                        public void accept(Object o) throws Throwable {
    
                        }
                    });
        }
    
        //从磁盘、内存缓存中获取缓存数据firstElement和concat
        private void firstElement() {
            final String memoryCache = null;
            final String diskCache = "从磁盘缓存中获取数据";
            Observable<String> memory = Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                    // 先判断内存缓存有无数据
                    if (memoryCache != null) { // 若有该数据,则发送
                        emitter.onNext(memoryCache);
                    } else { // 若无该数据,则直接发送结束事件
                        emitter.onComplete();
                    }
                }
            });
    
            Observable<String> disk = Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                    // 先判断内存缓存有无数据
                    if (diskCache != null) { // 若有该数据,则发送
                        emitter.onNext(diskCache);
                    } else { // 若无该数据,则直接发送结束事件
                        emitter.onComplete();
                    }
                }
            });
    
            Observable<String> observable = getRequestInterface()
                    .getBody("login/login");//执行在io线程
    
            Observable.concat(memory, disk, observable)
                    //通过firstElement(),从串联队列中取出并发送第1个有效事件(Next事件)
                    //即依次判断检查memory、disk、network
                    .firstElement()
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(String s) throws Exception {
                            //最终获取的数据来源
                        }
                    });
        }
    
        // 可重试次数
        private int maxConnectCount = 10;
        // 当前已重试次数
        private int currentRetryCount = 0;
        // 重试等待时间
        private int waitRetryTime = 0;
    
        //网络请求出错重试
        private void flatMap() {
            getRequestInterface()
                    .registerLogin("login/login", initData())
                    .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
                        @Override
                        public ObservableSource<?> apply(@NonNull Observable<Throwable> throwableObservable) throws Exception {
                            return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
                                @Override
                                public ObservableSource<?> apply(@NonNull Throwable throwable) throws Exception {
                                    //根据异常类型选择是否重试,属于IO异常,需重试
                                    if (throwable instanceof IOException) {
                                        //当已重试次数 < 设置的重试次数才重试
                                        if (currentRetryCount < maxConnectCount) {
                                            // 记录重试次数
                                            currentRetryCount++;
                                            // 设置等待时间
                                            waitRetryTime = 1000 + currentRetryCount * 1000;
                                            return Observable.just(1).delay(waitRetryTime, TimeUnit.MILLISECONDS);
                                        } else {
                                            // 若重试次数已 > 设置重试次数,则不重试
                                            return Observable.error(new Throwable("重试次数已超过设置次数 = " + currentRetryCount + ",即 不再重试"));
                                        }
                                    } else {
                                        // 若发生的异常不属于I/O异常,则不重试
                                        return Observable.error(new Throwable("发生了非网络异常(非I/O异常)"));
                                    }
                                }
                            });
                        }
                    })
                    .subscribeOn(Schedulers.io())//执行在io线程
                    .subscribe();
        }
    }
    

    相关文章

      网友评论

          本文标题:Retrofit & RxJava 实战篇

          本文链接:https://www.haomeiwen.com/subject/gmlgwftx.html