RxJava+Retrofit Samples解析

作者: 青藤绿 | 来源:发表于2016-04-01 17:38 被阅读1126次

扔物线大神写的示例代码:RxJavaSamples
apk下载地址:RxJavaSamples.apk
光看代码虽然看得懂,但是要自己写起来又感觉力不从心,所以决定将代码解析记录一下,加深印象。

1.基本使用

RxJava和Retrofit结合使用最基本的格式:用 subscribeOn()observeOn()来控制线程,并通过 subscribe()来触发网络请求的开始。代码大致形式:

api.getData()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(observer);

其中的observer是(后面例子中的observer基本相同不再列举):

Observer<List<Images>> observer = new Observer<List<Images>>() {
        @Override
        public void onCompleted() {
        }

        @Override
        public void onError(Throwable e) {
              //失败处理
            swipeRefreshLayout.setRefreshing(false);
            Toast.makeText(getActivity(), R.string.loading_failed, Toast.LENGTH_SHORT).show();
        }

        @Override
        public void onNext(List<ZhuangbiImage> images) {
             //成功处理
            swipeRefreshLayout.setRefreshing(false);
            adapter.setImages(images);
        }
    };

2.转换(map)

有些服务端的接口设计,会在返回的数据外层包裹一些额外信息,这些信息对于调试很有用,但本地显示是用不到的。使用 map()可以把外层的格式剥掉,只留下本地会用到的核心格式。当然,map() 也可以用于基于其他各种需求的格式转换。代码大致形式:

api.getData()
    .map(response->response.data)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(observer);

3.压合(zip)

有的时候,app 中会需要同时访问不同接口,然后将结果糅合后转为统一的格式后输出(例如将第三方广告 API 的广告夹杂进自家平台返回的数据 List 中)。这种并行的异步处理比较麻烦,不过用了 zip() 之后就会简单得多。代码大致形式:

Observable.zip(api.getData(),adApi.getAds(),zipFunc())
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(observer);

4.一次性 token(flatMap)

出于安全性、性能等方面的考虑,多数服务器会有一些接口需要传入 token 才能正确返回结果,而 token 是需要从另一个接口获取的,这就需要使用两步连续的请求才能获取数据(①token -> ②目标数据)。使用 flatMap() 可以用较为清晰的代码实现这种连续请求,避免 Callback 嵌套的结构。代码大致形式:

api.getToken().flatMap(token -> api.getData(token))
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(observer);

5.非一次性 token(retryWhen)

有的 token 并非一次性的,而是可以多次使用,直到它超时或被销毁(多数 token 都是这样的)。这样的 token 处理起来比较麻烦:需要把它保存起来,并且在发现它失效的时候要能够自动重新获取新的 token 并继续访问之前由于 token 失效而失败的请求。如果项目中有多处的接口请求都需要这样的自动修复机制,使用传统的 Callback 形式需要写出非常复杂的代码。而使用 RxJava ,可以用 retryWhen() 来轻松地处理这样的问题。代码大致形式:

api.getData(token)
    .retryWhen(observable ->
        observable.flatMap( ->
            api.getToken()
                .doOnNext(->updateToken())))
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(observer);

上面几个例子都比较简单,而这个有点小复杂,所以详细记录一下,具体代码如下:

 subscription = Observable.just(null)
                .flatMap(new Func1<Object, Observable<FakeThing>>() {
                    @Override
                    public Observable<FakeThing> call(Object o) {
                        //判断cachedToken是否为空
                        //是的话就发送一个error的observable
                        //否的话就用该token去取得数据
                        return cachedFakeToken.token == null
                                ? Observable.<FakeThing>error(new NullPointerException("Token is null!"))
                                : fakeApi.getFakeData(cachedFakeToken);
                    }
                })
                .retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
                    @Override
                    public Observable<?> call(Observable<? extends Throwable> observable) {
                        return observable.flatMap(new Func1<Throwable, Observable<?>>() {
                            @Override
                            //这里接收到一个error的Observable
                            public Observable<?> call(Throwable throwable) {
                                if (throwable instanceof IllegalArgumentException || throwable instanceof NullPointerException) {
                                    //执行获取token,并且再次执行上面的通过token获取数据操作
                                    return fakeApi.getFakeToken("fake_auth_code")
                                            .doOnNext(new Action1<FakeToken>() {
                                                @Override
                                                public void call(FakeToken fakeToken) {
                                                    tokenUpdated = true;
                                                    cachedFakeToken.token = fakeToken.token;
                                                    cachedFakeToken.expired = fakeToken.expired;
                                                }
                                            });
                                }
                                //其他错误
                                return Observable.just(throwable);
                            }
                        });
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(observer);

6.缓存(BehaviorSubject)

RxJava 中有一个较少被人用到的类叫做 Subject,它是一种『既是 Observable,又是 Observer』的东西,因此可以被用作中间件来做数据传递。例如,可以用它的子类 BehaviorSubject 来制作缓存。代码大致形式:

api.getData().subscribe(behaviorSubject); // 判断cache为空则获取数据,网络数据会被缓存
behaviorSubject.subscribe(observer);// 之前的缓存将直接送达observer

具体例子:

BehaviorSubject<List<Item>> cache;
public Subscription subscribeData(@NonNull Observer<List<Item>> observer) {
         //判断内存缓存是否为空
        if (cache == null) {
            cache = BehaviorSubject.create();
            Observable.create(new Observable.OnSubscribe<List<Item>>() {
                @Override
                public void call(Subscriber< ? super List<Item>> subscriber) {
                    List<Item> items = Database.getInstance().readItems();
                    //判断硬盘缓存是否为空
                    if (items == null) {
                        //从网络读取数据
                        loadFromNetwork();
                    } else {
                        //发送硬盘数据
                        subscriber.onNext(items);
                    }
                }
            }).subscribeOn(Schedulers.io())
              .subscribe(cache);
        } 
        return cache.observeOn(AndroidSchedulers.mainThread()).subscribe(observer);
    }

  subscription = subscribeData(new Observer<List<Item>>() {
                    @Override
                    public void onCompleted() {
                    }

                    @Override
                    public void onError(Throwable e) {
                        swipeRefreshLayout.setRefreshing(false);
                        Toast.makeText(getActivity(), R.string.loading_failed, Toast.LENGTH_SHORT).show();
                    }

                    @Override
                    public void onNext(List<Item> items) {
                        swipeRefreshLayout.setRefreshing(false);
                        adapter.setItems(items);
                    }
                });

相关文章

网友评论

  • 皮球二二:你好,BehaviorSubject将数据存到什么地方去了呢?还有如何删除?
    青藤绿: @r17171709 存在内存里 cache=null就没了

本文标题:RxJava+Retrofit Samples解析

本文链接:https://www.haomeiwen.com/subject/pedylttx.html