玩转RxJava2.x 学习教程(一)

作者: 依然范特稀西 | 来源:发表于2017-06-25 18:13 被阅读3673次
    封面.png

    从16年11月份推出RxJava 2.0 ,到现在差不多大半年的时间里,RxJava已经来到了2.x时代,RxJava 1.x 可能也慢慢地被2.x 代替。RxJava 2.x在Reactive-Streams规范的基础上从头开始完全重写,虽然操作符基本没有发生变化,但是因为Reactive-Streams具有不同的架构,因此对一些众所周知的RxJava类型进行了更改。

    RxJava 2.x相对与1.x还是有很多的不同,RxJava的类型更改了,很多类的命名和方法的命名发生了变化(可能功能与1.x相同),要想从RxJava 1.x 顺利地过渡到2.x, 就得了解这些变化。因此本教程就带你了解这些变化,从而能更快地上手RxJava2.x。

    本篇文章就来先了解一下RxJava 2.x的5种响应类型。


    一、RxJava 2.x中5种类型介绍

    1 . Observable and Flowable

    关于在RxJava 0.x版本引入背压的一个小的遗憾是,没有设计一个单独的基础反应类,而是对Observable本身进行了改装。背压的主要问题在于热源(如:UI事件),不能合理地反压并导致不可预料的异常MissingBackpressureException,这是初学者不期望看到的。

    在RxJava 2.x版本中修复了这种情况,将o.reactivex.Observable作为非背压,引入新的io.reactivex.Flowable作为启用背压基础反应类。

    好消息是,在2.x版本中,主要的操作符保持不变(同1.x版本),坏消息是,在导入的时候应当小心,它可能会无意的选择非背压的o.reactivex.Observable.

    我们应该选哪种?

    当构建数据流(作为RxJava的最终消费者)或考虑您的2.x兼容库应该采取和返回什么类型时,您可以考虑几个因素,以帮助您避免诸如MissingBackpressureException或OutOfMemoryError之类的问题。

    Observable使用场景:

    • 数据流最长不超过1000个元素,即随着时间的流逝,应用没有机会报OOME(OutOfMemoryError)错误。
    • 处理诸如鼠标移动或触摸事件之类的GUI事件

    Flowable使用场景:

    • 处理超过10K+ 的元素流
    • 从磁盘读取(解析文件)
    • 从数据库读取数据
    • 从网络获取数据流
    2 . Single 使用介绍

    Single是2.x版本中的一种基础响应类型,Single是从头开始重新设计的,能单独发射一个onSuccess或者onError事件,它现在的架构来自于the Reactive-Streams设计。它的消费者类型已经从接受rx.Subscriptionrx.Single.SingleSubscriber<T>变为了io.reactivex.SingleObserver<T>,有3个方法:

    interface SingleObserver<T> {
        void onSubscribe(Disposable d);
        void onSuccess(T value);
        void onError(Throwable error);
    }
    
    3 . Completable使用介绍

    Completeble类型基本保持不变,1.x的版本已经沿着Reactive-Streams风格设计,所以没有用户级别的更改。

    相似地命名改变,rx.Completable.CompletableSubscriber变为带有onSubscribe(Disposable)方法的io.reactivex.CompletableObserver:

    interface CompletableObserver<T> {
        void onSubscribe(Disposable d);
        void onComplete();
        void onError(Throwable error);
    }
    
    4 . Maybe 使用介绍

    RxJava 2.0.0-RC2 介绍了一个新的基础响应类型,它叫做Maybe。从概念上来将,它像是 Single和Completable的结合,它可能发射0个或者1个项目,或者一个error信号。

    Maybe类通过依赖MaybeSource作为它的基础接口类型MaybeObserver<T>作为信号响应接口并且遵循协议onSubscribe (onSuccess | onError | onComplete)?因为最多可能发射1个元素,所以Maybe类型没有背压的概念(因为它没有像Flowable和Observable一样有未知长度的可膨胀缓冲区)

    这意味着onSubscribe(Disposable)的调用潜在地跟随着其他onXXX方法之一的调用,不同于Flowable,如果这儿只有一个信号值发射信号,那么只有onSuccess被调用,而不会调用complete

    二、RxJava 2.x中5种类型使用示例

    1 . Observable示例

    在写示例之前,我们先来回顾一下 1.x 版本是如何创建Observable和如何订阅的:
    RxJava 1.x :

      //创建 observable
           Observable observable =  Observable.create(new Observable.OnSubscribe<String>() {
                @Override
                public void call(Subscriber<? super String> subscriber) {
                    subscriber.onNext("hello world");
                    subscriber.onCompleted();
                }
            }).subscribeOn(Schedulers.io())
              .observeOn(AndroidSchedulers.mainThread());
                    
           //订阅方式一
            observable.subscribe(new Subscriber() {
                @Override
                public void onCompleted() {
                    
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onNext(Object o) {
    
                }
            });
            
            // 订阅方式二
             observable.subscribe(new Action1() {
                 @Override
                 public void call(Object o) {
                     // onNext
                 }
             });
    

    通过create方法创建Observable,接收一个OnSubscribe接口,其中有一个回调方法call,参数为Subscriber,我们用Subscriber来发射数据。通过subscribe方法来订阅,可以接收一个Subscriber 实现全部方法,也可以接收一个Action1只实现onNext方法。

    RxJava 2.x :

           //创建Observable
            Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                    Log.e(TAG,"start emitter data");
                    e.onNext("Hello");
                    e.onNext("world");
                    e.onComplete();
                }
            })
              .subscribeOn(Schedulers.io())
              .observeOn(AndroidSchedulers.mainThread());
            
              // 订阅方式一:下游消费者 Observer
              observable.subscribe(new Observer<String>() {
                  @Override
                 public void onSubscribe(@NonNull Disposable d) {
                // onSubscribe 是2.x新添加的方法,在发射数据前被调用,相当于1.x的onStart方法
                     Log.e(TAG,"onSubscribe");
                  }
    
                  @Override
                  public void onNext(@NonNull String s) {
                      Log.e(TAG,"onNext");
                      Log.e(TAG,s);
                  }
    
                  @Override
                  public void onError(@NonNull Throwable e) {
                      Log.e(TAG,"onError");
                  }
    
                  @Override
                  public void onComplete() {
                      Log.e(TAG,"onComplete");
                  }
              });
    
             // 订阅方式二:Consumer
             observable.subscribe(new Consumer<String>() {
                 @Override
                 public void accept(@NonNull String o) throws Exception {
                      Log.e(TAG,"consumer:"+o);
                 }
             });
    

    打印结果如下:

    06-25 14:31:35.435 21505-21505/com.zhouwei.demoforrxjava2 E/MainActivity: onSubscribe
    06-25 14:31:35.437 21505-21853/com.zhouwei.demoforrxjava2 E/MainActivity: start emitter data
    06-25 14:31:35.438 21505-21505/com.zhouwei.demoforrxjava2 E/MainActivity: onNext:Hello
    06-25 14:31:35.438 21505-21505/com.zhouwei.demoforrxjava2 E/MainActivity: onNext:world
    06-25 14:31:35.438 21505-21505/com.zhouwei.demoforrxjava2 E/MainActivity: onComplete
    
    

    其实我们可以对比一下,1.x 和 2.x 方法都试一样的,只是它们所接收的响应接口改变了,对应变化如下:

     RxJava 1.x       ->   RxJava 2.x
     ---------------------------------------
     OnSubscribe<T>   ->   ObservableOnSubscribe<T>
     Subscriber<T>    ->   Observer<T>
     Subscriber<T>    ->   ObservableEmitter<T>
     Action1<T>       ->   Consumer<T>
     
    

    RxJava 2.x 中对这些接口进行了重新设计,让一个接口的职责更加单一,类的命名和方法命名与它的功能更佳符合(见名知意)。如在1.x 中,Subscriber 既能发射数据,又能消费数据,充当观察者和被观察者。在2.x 中 把它拆解成了2个接口。ObservableEmitter<T>专门用来发射数据,Consumer 专门用来消费数据。 除此之外,在RxJava 2.x 中,多了一个void onSubscribe(@NonNull Disposable d)回调方法,参数为Disposable,Disposable是用来解除订阅关系的,这让我们的解除订阅变得更佳容易(比起1.x 通过subscribe返回 Subscription)。

    上面对比了在RxJava 1.x 和2.x 版本创建Observable的方式,其实在RxJava 2.x中,这5种类型的用法是非常相似的,它们的接口命名规则相同,只要你知道其中一种,就知道其他几种类型该如何在上游发射数据和在下游消费数据。create接收的类型都为xxxOnSubscrible(xxx为5种类型对应的名字),发射器为xxxEmitter,具体如下表:

    RxJava 2.x 类型 create参数(响应接口) 发射器 Observer
    Observable ObservableOnSubscribe ObservableEmitter Observer
    Flowable FlowableOnSubscribe FlowableEmitter FlowableSubscriber
    Single SingleOnSubscribe SingleEmitter SingleObserver
    Completable CompletableOnSubscribe CompletableEmitter CompletableObserver
    Maybe MaybeOnSubscribe MaybeEmitter MaybeObserver
    2 . Flowable示例

    上面对比了RxJava 1.x 和 2.x 创建使用Observable的方式,并且总结了2.x 相关类的改变,如上面表。那么使用Flowable的方式和Observable是很相似的,看一下代码:

    Flowable.create(new FlowableOnSubscribe<Integer>() {
                @Override
                public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception {
                    Log.e(TAG,"start send data ");
                    for(int i=0;i<100;i++){
                        e.onNext(i);
                    }
                    e.onComplete();
                }
            }, BackpressureStrategy.DROP)//指定背压策略
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new FlowableSubscriber<Integer>() {
                @Override
                public void onSubscribe(@NonNull Subscription s) {
                   //1, onSubscribe 是2.x新添加的方法,在发射数据前被调用,相当于1.x的onStart方法
                   //2, 参数为  Subscription ,Subscription 可用于向上游请求发射多少个元素,也可用于取笑请求
                   //3,  必须要调用Subscription 的request来请求发射数据,不然上游是不会发射数据的。
                   Log.e(TAG,"onSubscribe...");
                   subscription = s;
                    s.request(100);
                }
    
                @Override
                public void onNext(Integer integer) {
                    Log.e(TAG,"onNext:"+integer);
                }
    
                @Override
                public void onError(Throwable t) {
                    Log.e(TAG,"onError..."+t);
                }
    
                @Override
                public void onComplete() {
                    Log.e(TAG,"onComplete...");
                }
            });
    

    Flowable和Observable的使用基本相同,只不过Observable不支持背压,而Flowable支持背压。使用的时候,还是要注意几个小细节:
    1,创建Flowable的时候需要指定一个背压策略,本文使用的是PBackpressureStrategy.DROP(丢弃策略),RxJava 2.x中,内置了5种背压策略,由于篇幅有限,背压和背压策略下一篇拿出来单独讲。
    2,onSubscribe 回调方法中,参数是Subscription而不是Disposable,前文说过,RxJava 2.x 中,订阅的管理换成了Disposable,但是Flowable使用的是Subscription,这个Subscription不是1.x 版本中的Subscription,虽然它有取消订阅的能力。主要用于请求上游元素和取消订阅。
    3,在使用Flowable的时候,必须调用Subscription 的requsest方法请求,不然上游是不会发射数据的。看request的方法解释:

    3 . Single、Completable 和 Maybe 示例

    Single、Completable和Maybe就比较简单,Single用于只发射一个数据,Completable不发送数据,它给下游发射一个信号。而Maybe则是Single和Completable的结合,根据名字就可以看出,它的结果是不确定的,可能发发射0(Completable)或1(Single) 个元素,或者收到一个Error信号。

    Single示例:

    Single.create(new SingleOnSubscribe<Boolean>() {
                @Override
                public void subscribe(@NonNull SingleEmitter<Boolean> e) throws Exception {
                    Log.e(TAG,"subscribe...");
                    e.onSuccess(true);
                }
            })
            .observeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new SingleObserver<Boolean>() {
                @Override
                public void onSubscribe(@NonNull Disposable d) {
                    Log.e(TAG,"onSubscribe...");
                }
    
                @Override
                public void onSuccess(@NonNull Boolean aBoolean) {
                    Log.e(TAG,"onSuccess...");
                }
    
                @Override
                public void onError(@NonNull Throwable e) {
                    Log.e(TAG,"onError...");
                }
            });
    

    Single只发射一个元素,所以没有complete 方法,不像Observable或者Flowable,数据发射完成之后,需要调用complete告诉下游已经完成。

    Completable示例:

     Completable.create(new CompletableOnSubscribe() {
                @Override
                public void subscribe(@NonNull CompletableEmitter e) throws Exception {
                   Log.e(TAG,"start send data");
                   e.onComplete();
                }
            }).subscribeOn(Schedulers.io())
              .observeOn(AndroidSchedulers.mainThread())
              .subscribe(new CompletableObserver() {
                  @Override
                  public void onSubscribe(@NonNull Disposable d) {
                      Log.e(TAG,"onSubscribe");
                  }
    
                  @Override
                  public void onComplete() {
                    Log.e(TAG,"onComplete");
                  }
    
                  @Override
                  public void onError(@NonNull Throwable e) {
                      Log.e(TAG,"onError");
                  }
              });
    

    Completable 不会发射数据,只会给下游发送一个信号。回调 onComplete方法。

    Maybe示例:

     Maybe.create(new MaybeOnSubscribe<Boolean>() {
                @Override
                public void subscribe(@NonNull MaybeEmitter<Boolean> e) throws Exception {
                    Log.e(TAG,"start send data");
                     e.onSuccess(true);
                     e.onComplete();
    
                }
            }).subscribeOn(Schedulers.io())
    
               .observeOn(AndroidSchedulers.mainThread())
    
               .subscribe(new MaybeObserver<Boolean>() {
                @Override
                public void onSubscribe(@NonNull Disposable d) {
                    Log.e(TAG,"onSubscribe");
                }
    
                @Override
                public void onSuccess(@NonNull Boolean aBoolean) {
                    Log.e(TAG,"1->onSuccess:"+aBoolean);
                }
    
                @Override
                public void onError(@NonNull Throwable e) {
                    Log.e(TAG,"onError");
                }
    
                @Override
                public void onComplete() {
                    Log.e(TAG,"onComplete");
                }
            });
    

    Maybe是Single和Completable的结合,需要注意的是onSuccessonComplete方法只会执行其中一个,这不同于Observable和Flowable最后是以onComplete()结尾.

    如上面的代码,先调用onSuccess发射一个元素,再调用onComplete

    e.onComplete();
    e.onSuccess(true);
    

    最后打印结果如下:

    E/MainActivity: onSubscribe
    E/MainActivity: start send data
    E/MainActivity: onComplete
    

    可以看到只回调了 onComplete,我们把调用的顺序调换一下:

    e.onSuccess(true);
    e.onComplete();
    

    打印结果如下:

    E/MainActivity: onSubscribe
    E/MainActivity: start send data
    E/MainActivity: 1->onSuccess:true
    

    可以看到调换了之后打印OnSucces()而没有打印onComplete(),这也印证了只会回调其中之一。

    三、总结

    RxJava 2.x 相比于 1.x 还是有很大的变化,虽然操作符基本不变,但是很多类和接口都是基于Reactive-Streams 规范重新设计的,命名也发生了变换,要想玩转RxJava 2.x ,你得了解这些变化和使用场景,本文介绍了RxJava 2.x 的5种基础响应类型,希望对才开始学习RxJava 2.x 的同学有所帮助。

    参考:
    What's different in 2.0

    相关文章

      网友评论

      • defb1f03cad3:写的特备好,我收藏了40个RxJava文章都没有一个讲解这5中类型的,作者怎么只有系列一,后面都没有了.....,看作者的写作功底都是很深厚的,果断收藏,关键是评论还这么少
      • 丶叫我官人:其实使用起来还是很简单
      • Xdjm:看了大神的分析,有种想写几个操作符使用demo的冲动
        依然范特稀西:@C春和景明C别犹豫,说写就写啊😄

      本文标题:玩转RxJava2.x 学习教程(一)

      本文链接:https://www.haomeiwen.com/subject/xnrrcxtx.html