美文网首页Android-RxJava
RxJava 3.x系列(三)RxJava操作符使用场景举例

RxJava 3.x系列(三)RxJava操作符使用场景举例

作者: 文泰ChrisTwain | 来源:发表于2020-09-21 16:29 被阅读0次

    一.Rxjava架构场景

    rxjava_mvp

    RxJava通过Observable传递数据流,以MVP为例,我们可以在Model层创建Observable并封装基础数据,Presenter层订阅后开始数据流动,P层、M层之间可以采用各种操作符做数据转换,P层Observer收到数据后回调View层显示数据。

    一些基于RxJava的基础库通过将数据获取相关逻辑封装在上游Observable中,依赖基础库后即可直接通过订阅相关上游Observable获取数据,这样也利于开发上的分工。

    RxJava仅能单向获取数据,图中显示从Model层获取,若View层需要更新Model数据怎么办呢?上图RxXX.set()方法相当于给Model层一个通知做一些操作,并不能传数据。此时更新Model数据得自己实现通知机制也可利用EventBus等开源库。

    二、具体使用

    1.封装Rx函数库

    将上游数据处理封装为静态方法并提供给调用方

    RxXXXLib.applyChanges().subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(xxx)
    
    2.注册成功后立即登陆-结合Retrofit & Gson
    MyRetrofit.getsInstance().getLoginService().  // 封装Retrofit & 网络请求接口
    .register(new RegisterRequest())            // 发起注册请求
    .subscribeOn(Schedulers.io())               // 在IO线程发起网络请求
    .observeOn(AndroidSchedulers.mainThread())  // 回到主线程处理请求注册结果
    .doOnNext(new Consumer<RegisterResponse>() {
        @Override
        public void accept(RegisterResponse registerResponse) {
            // Register success & do something
        }
    })
    .doOnError(new Consumer<Throwable>() {
        @Override
        public void accept(Throwable throwable) throws Throwable {
            // Register failed
        }
    })
    .map(new Function<RegisterResponse, MyResponse>() {
        @Override
        public String apply(RegisterResponse response) throws Throwable {
            // do something
            MyResponse myResponse = XXX
            return myResponse;
        }
    })
    .observeOn(Schedulers.io())    // 回到IO线程发起登录请求
    .flatMap(new Function<MyResponse, ObservableSource<LoginResponse>>() {
        @Override
        public ObservableSource<LoginResponse> apply(MyResponse myResponse) {
            return MyRetrofit.XXX.login(new LoginRequest(myResponse));
        }
    })
    .observeOn(AndroidSchedulers.mainThread())  // 回到主线程处理请求登录的结果
    .subscribe(new Observer<LoginResponse>() {
        @Override
        public void accept(LoginResponse loginResponse) {
            // Login success
        }
    }, new Consumer<Throwable>() {
        @Override
        public void accept(Throwable throwable) {
            // Login failed
        }
    });
    
    3.从两个后端接口获取数据并组合

    组合严格按照两个Observable的发射顺序,且下游收到的事件数量和上游发送事件最少的那个Observable的事件数量相同

    Observable<MainResponse> observableMain =                                            
            retrofitApi.getMain(new MainRequest()).subscribeOn(Schedulers.io());                                                                  
    Observable<ExtraResponse> observableExtra =                                           
            retrofitApi.getExtra(new ExtraRequest()).subscribeOn(Schedulers.io());                                                                              
    Observable.zip(observableMain, observableExtra,                                                  
            new BiFunction<MainResponse, ExtraResponse, NewResponse>() {         
                @Override                                                                     
                public NewResponse apply(MainResponse main,                          
                                      ExtraResponse extra) {     
                    return new NewResponse(main, extra);                                 
                }                                                                             
            }).observeOn(AndroidSchedulers.mainThread())                                      
            .subscribe(new Consumer<UserInfo>() {                                             
                @Override                                                                     
                public void accept(UserInfo userInfo) {                      
                    // do something;                                                           
                }                                                                             
            });     
    
    4.大数据流用Flowable,小数据流用Observable

    上游数据流过大而下游来不及处理可以进行数据过滤,否则将导致内存溢出(极端情况),较好的方式是根据业务场景选择合适的策略:BackpressureStrategy.DROPBackpressureStrategy.LATEST。采用Flowable下游可控制取数据的时机。

    5.避免内存泄漏

    适当时机调用dispose(),或使用CompositeDisposable统一切断,一般会在生命周期onDestroy()回调时进行数据流切断处理。

    三、问题整理:

    1.链式调用整体流程:
    层层包装 - 逐级订阅(构造回调链) - onNext逐级回调
    操作符内部方法被穿插在onNext之间

    2.为什么subscribeOn()在后面执行,而前面代码的线程被切换?
    由于RxJava的层层包装,subscribeOn()执行后生成的Observable对象ObservableSubscribeOn包含前面生成的Observable对象,
    下游逐级订阅上游时,将subscribeOn()的上游全部放入runnable中,故上游线程被切换,若subscribeOn()下游不切换线程,下游将运行在subscribeOn()指定的线程

    3.订阅好的关系为什么可以通过disposable.dispose切断呢

    • Disposable对象获取:subscribe()、connect()执行后会获得Disposable,参数为Observer的subscribe()方法在onSubscribe(Disposable d)中接收Disposable
    public final Disposable subscribe() {}
    public final Disposable subscribe(Consumer<? super T> onNext) {}
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {} 
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
    public final void subscribe(Observer<? super T> observer) {}
    
    public final Disposable connect()
    
    • Disposable是什么:一个接口,由Observer实现用以切断数据流
      以observeOn()操作符对应的ObservableObserveOn为例,其内部类ObserveOnObserver实现了Disposable接口
      并在其内部onSubscribe(Disposable d)中接收上游Disposable,自己内部做处理后调用下游Observer的onSubscribe(this)并传入自身对象。它的dispose()方法将回调上游dispose()方法同时调用worker.dispose()queue.clear();又如create()操作符对应的ObservableCreate,内部CreateEmitter实现了Disposable接口,调用下游onNext()方法时会进行是否dispose判断
      若已经dispose则不会调用下游onNext() if (!isDisposed()) { observer.onNext(t); }
    • dispose()将从下游到上游逐级回调,最上游仍能发射数据,下游不回调
    • CompositeDisposable:一个Disposable集合(由RxJava内部提供的OpenHashSet维护, 线程安全), CompositeDisposable.dispose()会遍历内部的所有Disposable执行dispose操作.如Activity销毁的时候需统一对这些订阅进行取消,那么就可以新增CompositeDisposable对象作为成员,订阅后将Disposable对象都添加到CompositeDisposable对象中,等到Activity执行onDestroy生命周期时调用CompositeDisposable.dispose(),进行统一取消订阅操作
    dispose_DisposableHelper.png dispose_ObservableCreate_1.png dispose_ObservableCreate_2.png dispose_ObservableObserveOn_1.png dispose_ObservableObserveOn_2.png

    4.内存泄漏问题:ondestroy中调用dispose
    在rxjava中主要注意的就是内存泄漏问题,现有比较有名的管理rxjava内存的库有RxLifecycle和AutoDispose 这里使用AutoDispose管理,在0.8.0版本之后针对Androidx,如果不是androidx 要用之前版本。在activity和fragment中可以直接使用,在Androidx中activity和fragment本身是实现了lifecycle的
    https://juejin.im/post/6873448411727986701

    5.多次设置subscribeOn()的问题

    .subscribeOn(Schedulers.io()) //第一次
    .subscribeOn(Schedulers.newThread()) //第二次
    .subscribeOn(AndroidSchedulers.mainThread()) //第三次
    

    如果我们多次设置subscribeOn()只有第一次的subscribeOn()起作用,如果此时下游不切线程,下游将在第一次subscribeOn指定的线程执行subscribeOn()将上游整个订阅链封装到Runnable中,丢入指定线程执行,连续多次调用subscribeOn()由于订阅链从下至上,故最上层subscribeOn()的线程切换才会将更上游的操作放入其指定的线程,而后者的线程切换则不起作用

    6.如果我们多次设置ObserveOn()下游将在最后一次ObserveOn()指定的线程执行,因为ObserveOn()将下游onNext()丢入指定的线程
    若为.observeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()),上下游都将在主线程执行,而上游发射的数据将会入队,造成上游数据发射完下游才取数据在主线程执行

    7.上游onNext()与下游onNext()同步与异步情况下调用执行顺序
    上下游同步,上游onNext执行1次后下游立即回调onNext1此,上下游异步,上游发射的数据将入队,下游从队列中取数据

    8.RxPlugins


    RxJavaPlugins.png

    RxJava操作符执行在返回新Observable时可Hook一层自己的封装,在真实操作符执行前完成自己预设的逻辑。

    RxJavaPlugins.setOnObservableAssembly(new Function<Observable, Observable>() {
        @Override
        public Observable apply(Observable observable) throws Throwable {
            return new MyObservable(observable);  // 实现自己的MyObservable类
        }
    });
    
    RxJavaPlugins.setOnObservableSubscribe(new BiFunction<Observable, Observer, Observer>() {
        @Override
        public Observer apply(Observable observable, Observer observer) throws Throwable {
            return new MyObserver(observer);  // 实现自己的MyObserver类
        }
    });
    

    参考:RxJavaPlugins

    9.自定义操作符

    自定义处理上游Observable发射的单独数据项implements ObservableOperator,并通过lift()连接操作符;若为变换Observable发射的整个数据序列则implements ObservableTransformer,重写apply()方法,通过compose()连接操作符。参考RxJava自定义操作符

    P.S. 本文源码基于RxJava 3.0.4
    By the way: 非常感谢能提出批评和修改意见
    contact: tu.wentai@nexuslink.cn

    相关文章

      网友评论

        本文标题:RxJava 3.x系列(三)RxJava操作符使用场景举例

        本文链接:https://www.haomeiwen.com/subject/mbgvyktx.html