美文网首页RxAndroid开发经验谈Android开发
RxJava 知识梳理(3) - RxJava2 基础知识小结

RxJava 知识梳理(3) - RxJava2 基础知识小结

作者: 泽毛 | 来源:发表于2017-08-17 13:03 被阅读422次

    前言

    首先要感谢 Season_zlc 的一系列RxJava2的教程,关于上游、下游、水缸的类比,让我对于整个RxJava2的基本思想有了更加清晰的认识。大家有兴趣的话一定要多看看,写的通俗易懂,传送门:给初学者的 RxJava 2.0 教程 (一) ,本文的思想都来源于它的一系列文章。

    文章比较长,为了避免耽误大家的时间,先列出需要介绍的知识点:


    一、RxJava2 的基本模型

    1.1 使用实例

    在开始学习之前,我们先看一下最简单的例子:

    • 第一步:导入依赖包:
    dependencies {
        //在build.gradle中,导入依赖。
        compile 'io.reactivex.rxjava2:rxjava:2.0.1'
        compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
    }
    
    • 第二步:使用最基本的Observable + Observer的最简单示例,这里我们在上游发送了四个onNext(String s)事件之后,最后发送了一个onComplete()事件。
        public static void classicalSample() {
            Observable.create(new ObservableOnSubscribe<String>() {
    
                @Override
                public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
                    observableEmitter.onNext("1");
                    observableEmitter.onNext("2");
                    observableEmitter.onNext("3");
                    observableEmitter.onNext("4");
                    observableEmitter.onComplete();
    
                }
            }).subscribe(new Observer<String>() {
    
                private Disposable mDisposable;
    
                @Override
                public void onSubscribe(Disposable disposable) {
                    Log.d(TAG, "onSubscribe");
                    mDisposable = disposable;
                }
    
                @Override
                public void onNext(String s) {
                    Log.d(TAG, "onNext=" + s);
                }
    
                @Override
                public void onError(Throwable throwable) {
                    Log.d(TAG, "onError");
    
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
    
                }
            });
        }
    
    • 第三步:运行结果,订阅成功之后,会依次回调以下三步操作:onSubscribeonNextonComplete

    1.2 基本元素

    在上面的例子中,涉及到了以下五个类:

    • Observable:上游。
    • ObservableOnSubscribe:上游的create方法所接收的参数。
    • ObservableEmitter:上游事件的发送者。
    • Observer:下游的接收者。
    • Disposable:用于维系上游、下游之间的联系。

    对于整个模型,可以总结为以下几点:

    • RxJava2简单的来说,就是一个发送事件、接收事件的过程,我们可以将发送事件方类比作上游,而接收事件方类比作下游。
    • 上游每产生一个事件,下游就能收到事件,上游对应Observable,而下游对应Observer
    • 只有当上游和下游建立连接之后,上游才会开始发送事件,这一关系的建立是通过subscribe方法。

    各关键元素的UML图如下:

    1.3 ObservableEmitter

    用于 发出事件,它可以分别发出onNext/onComplete/onError事件:

    • 上游可以发送无限个onNext,下游也可以接收无限个onNext
    • 当上游发送了一个onComplete/onError后,上游onComplete/onError后的事件将会继续发送,但是下游在收到onComplete/onError事件后不再继续接收事件。
    • 上游可以不发送onComplete或者onError事件。
    • 调用onError或者onComplete切断了上游和下游的联系,在联系切断后上游再发送onError事件就会报错,onCompleteonError的调用情况有以下几种:
      (1) onComplete可以发送多次,但是只会收到一次回调。
      (2) onError只可以发送一次,发送多次会报错。
      (3) onComplete之后不可以发送onError,否则会报错。
      (4) onError之后可以发送onComplete,但是只会收到onError事件。
    • onError的参数不允许为空。

    其继承关系如下图所示:


    1.4 Disposable

    理解成为 水管的机关,当调用它的dispose方法时,将会将上游和下游之间的管道切断,从而导致 下游接收不到事件

    • ObserveronSubscribe回调中,会传入一个Disposable对象,下游可以通过该对象的dispose()方法主动切断和上游的联系,在这之后上游的observableEmitter.isDisposed()方法将返回true
    • 当上游和下游的联系切断之后,下游收不到包括onComplete/onError在内的任何事件,若此时上游再调用onError方法发送事件,那么将会报错。

    我们来模拟一下,在下游收到2之后,通过Disposable来切断上游和下游之间的联系:

        public static void classicalSample() {
            Observable.create(new ObservableOnSubscribe<String>() {
    
                @Override
                public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
                    observableEmitter.onNext("1");
                    observableEmitter.onNext("2");
                    observableEmitter.onNext("3");
                    observableEmitter.onNext("4");
                    observableEmitter.onComplete();
    
                }
            }).subscribe(new Observer<String>() {
    
                private Disposable mDisposable;
    
                @Override
                public void onSubscribe(Disposable disposable) {
                    Log.d(TAG, "onSubscribe");
                    mDisposable = disposable;
                }
    
                @Override
                public void onNext(String s) {
                    Log.d(TAG, "onNext=" + s);
                    if ("2".equals(s)) {
                        mDisposable.dispose();
                    }
                }
    
                @Override
                public void onError(Throwable throwable) {
                    Log.d(TAG, "onError");
    
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
    
                }
            });
        }
    

    最终的运行结果为:


    1.5 Subscribe 的重载方法

    通过subscribe确定上游和下游的联系有以下几种方法:


    可以看到,这里可以分为三类:
    • 不带参数
    • Consumer<T>
    • Observer
    • Action

    对于不使用Observer类作为形参的subscribe函数,其实实现的功能和使用Observer类作为参数的方法相同,只不过它们是将Observer的四个回调分解成形参,有参数的回调用Consumer<T>代替,而没有参数的则用Action代替。

    二、线程切换

    2.1 基本概念

    • 当我们在上游创建一个Observable来发送事件,那么这个上游就默认在主线程发送事件;而当我们在下游创建一个Observer来接收事件,那么这个下游就默认在主线程中接收事件。
    • subscribeOn指定的是 上游发送事件 的线程,而observeOn指定的是 下游接收事件 的线程。
    • 多次调用subscribeOn只有第一次有效,而每调用一次observeOn,那么下游接收消息的线程就会切换一次。
    • CompositeDisposable可以用来容纳Disposable对象,每当我们得到一个Disposable对象时,就通过add方法将它添加进入容器,在退出的时候,调用clear方法,即可切断所有的水管。

    2.2 线程类型

    • Schedulers.io():代表IO操作,通常用于网络请求、文件读写等IO密集型的操作。
    • Schedulers.computation():代表CPU密集型的操作,适用于大量计算。
    • Schedulers.newThread():创建新的常规线程。
    • AndroidSchedulers.mainThread():代表Android的主线程。

    2.3 示例

    在链式调用当中,我们可以通过observeOn方法多次切换管道下游处理消息的线程,例如下面的代码,我们对下游进行了两次线程的切换:

        static void mapSample() {
            Observable.create(new ObservableOnSubscribe<String>() {
    
                @Override
                public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
                    Log.d(TAG, "observableEmitter's thread=" + Thread.currentThread().getId() + ",string=true");
                    observableEmitter.onNext("true");
                    Log.d(TAG, "observableEmitter's thread=" + Thread.currentThread().getId() + ",string=false");
                    observableEmitter.onNext("false");
                    Log.d(TAG, "observableEmitter's thread=" + Thread.currentThread().getId() + ",onComplete");
                    observableEmitter.onComplete();
                }
            //1.指定了subscribe方法执行的线程,并进行第一次下游线程的切换,将其切换到新的子线程。   
            }).subscribeOn(Schedulers.io()).observeOn(Schedulers.newThread()).map(new Function<String, Boolean>() {
    
                @Override
                public Boolean apply(String s) throws Exception {
                    Log.d(TAG, "apply's thread=" + Thread.currentThread().getId() + ",s=" + s);
                    return "true".equals(s);
                }
            //2.进行第二次下游线程的切换,将其切换到主线程。    
            }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Boolean>() {
    
                @Override
                public void onSubscribe(Disposable disposable) {
    
                }
    
                @Override
                public void onNext(Boolean aBoolean) {
                    Log.d(TAG, "Observer's thread=" + Thread.currentThread().getId() + ",boolean=" + aBoolean);
                }
    
                @Override
                public void onError(Throwable throwable) {
    
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "Observer's thread=" + Thread.currentThread().getId() + ",onComplete");
                }
            });
        }
    

    以上代码的运行的结果为:


    三、Map 和 FlatMap 操作符

    3.1 Map

    • Map操作符的作用是对上游发送的每一个事件应用一个函数,使得每个事件按照函数的逻辑进行变换,通过Map就可以把上游发送的每一个事件,转换成Object或者集合,其英文注释为:
    • 以下面使用map的代码为例,可以看到map接收一个Function类,它有两个泛型变量,分别为调用map方法的Observable<T><T>泛型,和返回的Obervable<R><R>泛型。
        public static void mapVerify() {
            Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                    e.onNext(1);
                }
            });
            Observable<String> convertObservable = sourceObservable.map(new Function<Integer, String>() {
                @Override
                public String apply(Integer integer) throws Exception {
                    return integer.toString();
                }
            });
            Log.d(TAG, "sourceObservable=" + sourceObservable + "\n convertObservable=" + convertObservable);
        }
    

    Function为一个接口:


    并且在map函数调用完毕之后,将返回一个新的Observable,它的类型为ObservableMap

    3.2 FlatMap

    • FlatMap用于将一个发送事件的上游Observable变换成多个发送事件的Observable,然后将它们发送的事件合并,放进一个单独的Observable中,其注释为:
    • 上游每发送一个事件,就会针对该事件创建一个单独的水管,然后发送转换后的新的事件,下游接收到的就是这些新的水管发送的事件。
    • FlatMap不保证不同水管之间事件的顺序,如果需要保证顺序,则需要使用contactMap

    3.2.1 示例

        static void flatMapSample() {
            Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
    
                @Override
                public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
                    observableEmitter.onNext(1);
                    observableEmitter.onNext(2);
                    observableEmitter.onNext(3);
                }
            });
            Observable<String> flatObservable = sourceObservable.flatMap(new Function<Integer, ObservableSource<String>>() {
    
                @Override
                public ObservableSource<String> apply(Integer integer) throws Exception {
                    return Observable.fromArray("a value of " + integer + ",b value of " + integer);
                }
            });
            flatObservable.subscribe(new Consumer<String>() {
    
                @Override
                public void accept(String s) throws Exception {
                    Log.d(TAG, s);
                }
            });
        }
    

    map操作符类似,它也接收一个类型为Function的接口,只不过它的? extends R参数类型换成了? extends Observable<? extends R>

    3.2.2 FlatMap 不保证下游接收事件的顺序

    前面我们说到,flatMap操作符不会保证下游接收事件的顺序,下面,我们就以一个例子来说明,在flatMapapply函数中,我们将一个事件转换成两个Observable,并且加上了延时:

        static void flatMapOrderSample() {
            Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
    
                @Override
                public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
                    Log.d(TAG, "flatMapOrderSample emit 1");
                    observableEmitter.onNext(1);
                    Log.d(TAG, "flatMapOrderSample emit 2");
                    observableEmitter.onNext(2);
                    Log.d(TAG, "flatMapOrderSample emit 3");
                    observableEmitter.onNext(3);
                }
            });
            Observable<String> flatObservable = sourceObservable.flatMap(new Function<Integer, ObservableSource<String>>() {
    
                @Override
                public ObservableSource<String> apply(Integer integer) throws Exception {
                    Log.d(TAG, "flatMapOrderSample apply=" + integer);
                    long delay = (3 - integer) * 100;
                    return Observable.fromArray("a value of " + integer, "b value of " + integer).delay(delay, TimeUnit.MILLISECONDS);
                }
            });
            flatObservable.subscribe(new Consumer<String>() {
    
                @Override
                public void accept(String s) throws Exception {
                    Log.d(TAG, s);
                }
            });
        }
    

    可以看到,最终的输出结果和flatMap收到事件的顺序并不相同:


    下面,还是同样的场景,将flatMap换成contactMap
        static void contactMapOrderSample() {
            Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
    
                @Override
                public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
                    Log.d(TAG, "contactMapOrderSample emit 1");
                    observableEmitter.onNext(1);
                    Log.d(TAG, "contactMapOrderSample emit 1");
                    observableEmitter.onNext(2);
                    Log.d(TAG, "contactMapOrderSample emit 1");
                    observableEmitter.onNext(3);
                }
            });
            Observable<String> flatObservable = sourceObservable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).concatMap(new Function<Integer, ObservableSource<String>>() {
    
                @Override
                public ObservableSource<String> apply(Integer integer) throws Exception {
                    Log.d(TAG, "contactMapOrderSample apply=" + integer);
                    long delay = (3 - integer) * 100;
                    return Observable.fromArray("a value of " + integer, "b value of " + integer).delay(delay, TimeUnit.MILLISECONDS);
                }
            });
            flatObservable.subscribe(new Consumer<String>() {
    
                @Override
                public void accept(String s) throws Exception {
                    Log.d(TAG, s);
                }
            });
        }
    

    最终的运行结果为:


    四、Zip 操作符

    4.1 基本概念

    • Zip通过一个函数从多个Observable每次各取出一个事件,合并成一个新的事件发送给下游。
    • 组合的顺序是严格按照事件发送的顺序来的。
    • 最终下游收到的事件数量和上游中发送事件最少的那一根水管的事件数量相同。

    4.1.1 两个 Observable 运行在同一线程当中

        static void zipSample() {
            Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
    
                @Override
                public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
                    Log.d(TAG, "sourceObservable emit 1");
                    observableEmitter.onNext(1);
                    Thread.sleep(1000);
                    Log.d(TAG, "sourceObservable emit 2");
                    observableEmitter.onNext(2);
                    Log.d(TAG, "sourceObservable emit 3");
                    observableEmitter.onNext(3);
                    Log.d(TAG, "sourceObservable emit 4");
                    observableEmitter.onNext(4);
                }
            });
            Observable<Integer> otherObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
    
                @Override
                public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
                    Log.d(TAG, "otherObservable emit 1");
                    observableEmitter.onNext(1);
                    Log.d(TAG, "otherObservable emit 2");
                    observableEmitter.onNext(2);
                    Log.d(TAG, "otherObservable emit 3");
                    observableEmitter.onNext(3);
                }
            });
            Observable.zip(sourceObservable, otherObservable, new BiFunction<Integer, Integer, Integer>() {
    
                @Override
                public Integer apply(Integer integer, Integer integer2) throws Exception {
                    return integer + integer2;
                }
    
            }).subscribe(new Observer<Integer>() {
    
                @Override
                public void onSubscribe(Disposable disposable) {
                    Log.d(TAG, "resultObservable onSubscribe");
                }
    
                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG, "resultObservable onNext=" + integer);
                }
    
                @Override
                public void onError(Throwable throwable) {
                    Log.d(TAG, "resultObservable onError");
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "resultObservable onComplete");
    
                }
            });
        }
    

    此时的运行结果为:


    4.1.2 两个 Observable 运行在不同的线程

        static void zipSample() {
            Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
    
                @Override
                public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
                    Log.d(TAG, "sourceObservable emit 1");
                    observableEmitter.onNext(1);
                    Thread.sleep(1000);
                    Log.d(TAG, "sourceObservable emit 2");
                    observableEmitter.onNext(2);
                    Log.d(TAG, "sourceObservable emit 3");
                    observableEmitter.onNext(3);
                    Log.d(TAG, "sourceObservable emit 4");
                    observableEmitter.onNext(4);
                }
            });
            Observable<Integer> otherObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
    
                @Override
                public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
                    Log.d(TAG, "otherObservable emit 1");
                    observableEmitter.onNext(1);
                    Log.d(TAG, "otherObservable emit 2");
                    observableEmitter.onNext(2);
                    Log.d(TAG, "otherObservable emit 3");
                    observableEmitter.onNext(3);
                }
            }).subscribeOn(Schedulers.io());
            Observable.zip(sourceObservable, otherObservable, new BiFunction<Integer, Integer, Integer>() {
    
                @Override
                public Integer apply(Integer integer, Integer integer2) throws Exception {
                    return integer + integer2;
                }
    
            }).subscribe(new Observer<Integer>() {
    
                @Override
                public void onSubscribe(Disposable disposable) {
                    Log.d(TAG, "resultObservable onSubscribe");
                }
    
                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG, "resultObservable onNext=" + integer);
                }
    
                @Override
                public void onError(Throwable throwable) {
                    Log.d(TAG, "resultObservable onError");
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "resultObservable onComplete");
    
                }
            });
        }
    

    运行结果为:


    五、背压

    “背压”其实就是一种用于解决问题的工具,那么我们的问题又是什么呢?

    • 问题:当上游发送事件的速度很快,下游消费事件的速度又很慢,而系统又必须缓存这些上游发送的消息以便下游处理,那么就会导致系统中堆积了很多的资源。
    • 工具:下游告知上游目前自己的处理能力,上游根据下游的处理能力,进行适当的调整。

    想必大家在很多文章中都听过这个一句话:在RxJava2中,Observable不支持“背压”,而Flowable支持背压。

    5.1 不支持背压的 Observable

    关于Observable不支持背压,我们应当从两种情况去考虑,即上游、下游是否位于相同的线程。

    5.1.1 Observable 之上游、下游位于相同线程

    首先,我们不调用observeOnsubscribeOn方法来改变上游、下游的工作线程,这样,上游和下游就位于同一线程,同时,我们在下游的处理函数中,每收到一个消息就休眠2000ms,以模拟上游处理速度大于下游的场景。

        static void oomSample() {
            Observable.create(new ObservableOnSubscribe<Integer>() {
    
                @Override
                public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
                    for (int i = 0; i < 1000; i++) {
                        Log.d(TAG, "observableEmitter=" + i);
                        observableEmitter.onNext(i);
                    }
                }
            }).subscribe(new Consumer<Integer>() {
    
                @Override
                public void accept(Integer integer) throws Exception {
                    Thread.sleep(2000);
                    Log.d(TAG, "accept=" + integer);
                }
    
            });
        }
    

    从下面的打印结果可以看到,当“使用 Observable,并且上游、下游位于相同线程”时,并不会出现消息堆积的情况,因为上游发射完一条消息后,必须要等到下游处理完该消息,才会发射一条新的消息。

    5.1.2 Observable 之上游、下游位于不同线程

    接着,我们采用subscribeOnobserveOn来使得上游和下游位于不同的工作线程,其它均和2.2中相同。

        static void oomSample() {
            Observable.create(new ObservableOnSubscribe<Integer>() {
    
                @Override
                public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
                    for (int i = 0; i < 1000; i++) {
                        Log.d(TAG, "observableEmitter=" + i);
                        observableEmitter.onNext(i);
                    }
                }
            }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Integer>() {
    
                @Override
                public void accept(Integer integer) throws Exception {
                    Thread.sleep(2000);
                    Log.d(TAG, "accept=" + integer);
                }
    
            });
        }
    

    2.2中不同,当上游和下游位于不同的工作线程,那么上游发送消息时,不会考虑下游是否已经处理了之前的消息,它会直接发送,而这些发送的消息被存放在水缸当中,下游每处理完一条消息,就去水缸中取下一条数据,那么随着水缸中数据越来越多,那么系统中的无用资源就会急剧增加。

    5.1.3 关于 Observable 不支持背压的小结

    我们之所以说Observable不支持“背压”,就是在2.1介绍的整个族谱中,没有一个类,一种方法能让下游通知上游说:不要再发消息到水缸里了,我已经处理不过来了!

    那是不是说Flowable支持“背压”,而Observable不支持,那么Observable就要被取代了呢,其实不然,Flowable对于“背压”的支持是以性能为代价的,我们应当只在有可能出现2.3中上游下游速率不匹配的问题时,才去使用Flowable,否则就应当使用Observable,也就是满足两点条件:

    • 上游和下游位于不同的工作线程
    • 上游发送消息的速度,要远远大于下游处理消息的速度,有可能造成消息的堆积。

    5.2 支持背压的 Flowable

    5.2.1 基本概念

    • FlowableSubscriber分别对应于之前讨论的ObservableObserver,它们直接的连接仍然是通过subscribe方法。
    • Flowable在设计的时候采用了 响应式拉取 的思想,当下游调用了Subscriptionrequest方法时,就表明了下游处理事件的能力,这样上游就可以根据这个值来控制事件发送的频率,避免出现前面谈到的上游发送太快,而下游处理太慢从而导致OOM的发生。
    • 只有上游根据下游的处理能力来发送事件,才能达到理想的效果。

    5.2.2 基本使用

        static void flowSample() {
            Flowable<Integer> sourceFlow = Flowable.create(new FlowableOnSubscribe<Integer>() {
    
                @Override
                public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                    emitter.onNext(1);
                    emitter.onNext(2);
                    emitter.onNext(3);
                    emitter.onComplete();
                }
    
            }, BackpressureStrategy.ERROR);
    
            sourceFlow.subscribe(new Subscriber<Integer>() {
    
                @Override
                public void onSubscribe(Subscription subscription) {
                    Log.d(TAG, "onSubscribe");
                    subscription.request(Long.MAX_VALUE);
                }
    
                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG, "onNext=" + integer);
                }
    
                @Override
                public void onError(Throwable throwable) {
                    Log.d(TAG, "onError");
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            });
        }
    

    其类结构图和Observable几乎完全一致:

    5.3 Flowable 支持背压的策略

    从上面的类图可以看出,FlowableObservable最大的不同,就是在create方法中,需要传入额外的参数,它表示的是“背压”的策略,这里可选的值包括:

    • ERROR
    • BUFFER
    • DROP
    • LATEST

    5.3.1 使用 ERROR 的策略

    • 当上游和下游位于同一个线程时,如果上游发送的事件超过了下游声明的request(n)的值,那么会抛出MissingBackpressureException异常。
    • 当上游和下游位于不同线程时,如果上游发送的事件超过了下游的声明,事件会被放在水缸当中,这个水缸默认的大小是128,只有当下游调用request时,才从水缸中取出事件发送给下游,如果水缸中事件的个数超过了128,那么也会抛出MissingBackpressureException异常。

    下面这段代码,我们先将三个事件放入到水缸当中,之后每次调用request方法就会从水缸当中取出一个事件发送给下游。

       static void flowSample() {
            Flowable<Integer> sourceFlow = Flowable.create(new FlowableOnSubscribe<Integer>() {
    
                @Override
                public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                    emitter.onNext(1);
                    emitter.onNext(2);
                    emitter.onNext(3);
                    emitter.onComplete();
                }
    
            }, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io());
    
            sourceFlow.observeOn(Schedulers.newThread()).subscribe(new Subscriber<Integer>() {
    
                @Override
                public void onSubscribe(Subscription subscription) {
                    Log.d(TAG, "onSubscribe");
                    sSubscription = subscription;
                }
    
                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG, "onNext=" + integer);
                }
    
                @Override
                public void onError(Throwable throwable) {
                    Log.d(TAG, "onError");
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            });
        }
    
        static void clickSubscription() {
            if (sSubscription != null) {
                sSubscription.request(1);
            }
        }
    

    当上游和下游位于不同的线程,每次通过Subscription调用request就会从水缸中取出一个事件,发送给下游:

    5.3.2 BUFFER 策略

    • 使用BUFFER策略时,相当于在上游放置了一个容量无限大的水缸,所有下游暂时无法处理的消息都放在水缸当中,这里不再像ERROR策略一样,区分上游和下游是否位于同一线程。
    • 因此,如果下游一直没有处理消息,那么将会导致内存一直增长,从而引起OOM
        static void clickSubscription() {
            if (sSubscription != null) {
                sSubscription.request(10);
            }
        }
    
        static void flowBufferSample() {
            Flowable<Integer> sourceFlow = Flowable.create(new FlowableOnSubscribe<Integer>() {
    
                @Override
                public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                    for (int i = 0; i < 10000;i ++) {
                        emitter.onNext(i);
                    }
                    emitter.onComplete();
                }
    
            }, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io());
    
            sourceFlow.observeOn(Schedulers.newThread()).subscribe(new Subscriber<Integer>() {
    
                @Override
                public void onSubscribe(Subscription subscription) {
                    Log.d(TAG, "onSubscribe");
                    sSubscription = subscription;
                }
    
                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG, "onNext=" + integer);
                }
    
                @Override
                public void onError(Throwable throwable) {
                    Log.d(TAG, "onError");
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            });
        }
    

    在上面的例子中,我们先把10000条消息放入到水缸当中,之后通过Subscription每次从水缸中取出10条消息发送给下游,演示结果为:

    5.3.3 DROP 策略

    • 使用DROP策略时,会把水缸无法存放的事件丢弃掉,这里同样不会受到下游和下游是否处于同一个线程的限制。
        static void flowDropSample() {
            Flowable<Integer> sourceFlow = Flowable.create(new FlowableOnSubscribe<Integer>() {
    
                @Override
                public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                    for (int i = 0; i < 130; i++) {
                        emitter.onNext(i);
                    }
                }
    
            }, BackpressureStrategy.DROP).subscribeOn(Schedulers.io());
    
            sourceFlow.observeOn(Schedulers.io()).subscribe(new Subscriber<Integer>() {
    
                @Override
                public void onSubscribe(Subscription subscription) {
                    Log.d(TAG, "onSubscribe");
                    sSubscription = subscription;
                }
    
                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG, "onNext=" + integer);
                }
    
                @Override
                public void onError(Throwable throwable) {
                    Log.d(TAG, "onError");
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            });
        }
    

    我们先往水缸中放入130条消息,之后每次通过Subscription取出60条消息发送给下游,可以看到,最后最多只取到了第128条消息,第129/130条消息被丢弃了。

    5.3.4 LATEST 策略

    • DROP类似,当水缸无法容纳下消息时,会将它丢弃,但是除此之外,上游还会缓存最新的一条消息,实例如下:
        static void flowLatestSample() {
            Flowable<Integer> sourceFlow = Flowable.create(new FlowableOnSubscribe<Integer>() {
    
                @Override
                public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                    for (int i = 0; i < 130; i++) {
                        emitter.onNext(i);
                    }
                }
    
            }, BackpressureStrategy.LATEST).subscribeOn(Schedulers.io());
    
            sourceFlow.observeOn(Schedulers.io()).subscribe(new Subscriber<Integer>() {
    
                @Override
                public void onSubscribe(Subscription subscription) {
                    Log.d(TAG, "onSubscribe");
                    sSubscription = subscription;
                }
    
                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG, "onNext=" + integer);
                }
    
                @Override
                public void onError(Throwable throwable) {
                    Log.d(TAG, "onError");
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            });
        }
    

    从下面的运行结果可以看出,当取出最后一批数据的时候,上游除了收到存储在水缸当中的数据,还额外收到了最后一条消息,也就是第130条数据,这就是DROP策略和LATEST策略的区别:


    更多文章,欢迎访问我的 Android 知识梳理系列:

    相关文章

      网友评论

        本文标题:RxJava 知识梳理(3) - RxJava2 基础知识小结

        本文链接:https://www.haomeiwen.com/subject/zodxrxtx.html