给初学者的RxJava2.0教程(八)

作者: Season_zlc | 来源:发表于2016-12-22 16:30 被阅读24875次

Outline

[TOC]

前言

在上一节中, 我们学习了FLowable的一些基本知识, 同时也挖了许多坑, 这一节就让我们来填坑吧.

正题

在上一节中最后我们有个例子, 当上游一次性发送128个事件的时候是没有任何问题的, 一旦超过128就会抛出MissingBackpressureException异常, 提示你上游发太多事件了, 下游处理不过来, 那么怎么去解决呢?

我们先来思考一下, 发送128个事件没有问题是因为FLowable内部有一个大小为128的水缸, 超过128就会装满溢出来, 那既然你水缸这么小, 那我给你换一个大水缸如何, 听上去很有道理的样子, 来试试:

Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; i < 1000; i++) {
                    Log.d(TAG, "emit " + i);
                    emitter.onNext(i);
                }
            }
        }, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

这次我们直接让上游发送了1000个事件,下游仍然不调用request去请求, 与之前不同的是, 这次我们用的策略是BackpressureStrategy.BUFFER, 这就是我们的新水缸啦, 这个水缸就比原来的水缸牛逼多了,如果说原来的水缸是95式步枪, 那这个新的水缸就好比黄金AK , 它没有大小限制, 因此可以存放许许多多的事件.

所以这次的运行结果就是:

zlc.season.rxjava2demo D/TAG: onSubscribe
zlc.season.rxjava2demo D/TAG: emit 0
zlc.season.rxjava2demo D/TAG: emit 1
zlc.season.rxjava2demo D/TAG: emit 2
...
zlc.season.rxjava2demo D/TAG: emit 997
zlc.season.rxjava2demo D/TAG: emit 998
zlc.season.rxjava2demo D/TAG: emit 999

不知道大家有没有发现, 换了水缸的FLowable和Observable好像是一样的嘛...

不错, 这时的FLowable表现出来的特性的确和Observable一模一样, 因此, 如果你像这样单纯的使用FLowable, 同样需要注意OOM的问题, 例如下面这个例子:

Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; ; i++) {
                    emitter.onNext(i);
                }
            }
        }, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

按照我们以前学习Observable一样, 让上游无限循环发送事件, 下游一个也不去处理, 来看看运行结果吧:

flowable.gif

同样可以看到, 内存迅速增长, 直到最后抛出OOM. 所以说不要迷恋FLowable, 它只是个传说.

可能有朋友也注意到了, 之前使用Observable测试的时候内存增长非常迅速, 几秒钟就OOM, 但这里增长速度却比较缓慢, 可以翻回去看之前的文章中的GIF图进行对比, 这也看出FLowable相比Observable, 在性能方面有些不足, 毕竟FLowable内部为了实现响应式拉取做了更多的操作, 性能有所丢失也是在所难免, 因此单单只是说因为FLowable是新兴产物就盲目的使用也是不对的, 也要具体分场景,

那除了给FLowable换一个大水缸还有没有其他的办法呢, 因为更大的水缸也只是缓兵之计啊, 动不动就OOM给你看.

想想看我们之前学习Observable的时候说到的如何解决上游发送事件太快的, 有一招叫从数量上取胜, 同样的FLowable中也有这种方法, 对应的就是BackpressureStrategy.DROPBackpressureStrategy.LATEST这两种策略.

从名字上就能猜到它俩是干啥的, Drop就是直接把存不下的事件丢弃,Latest就是只保留最新的事件, 来看看它们的实际效果吧.

先来看看Drop:

public static void request() {
        mSubscription.request(128);
    }

public static void demo3() {
        Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; ; i++) {
                    emitter.onNext(i);
                }
            }
        }, BackpressureStrategy.DROP).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }

我们仍然让上游无限循环发送事件, 这次的策略选择了Drop, 同时把Subscription保存起来, 待会我们在外部调用request(128)时, 便可以看到运行的结果.

我们先来猜一下运行结果, 这里为什么request(128)呢, 因为之前不是已经说了吗, FLowable内部的默认的水缸大小为128, 因此, 它刚开始肯定会把0-127这128个事件保存起来, 然后丢弃掉其余的事件, 当我们request(128)的时候,下游便会处理掉这128个事件, 那么上游水缸中又会重新装进新的128个事件, 以此类推, 来看看运行结果吧:

drop.gif

从运行结果中我们看到的确是如此, 第一次request的时候, 下游的确收到的是0-127这128个事件, 但第二次request的时候就不确定了, 因为上游一直在发送事件. 内存占用也很正常, drop的作用相信大家也很清楚了.

再来看看Latest吧:

public static void request() {
        mSubscription.request(128);
    }

public static void demo4() {
        Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; ; i++) {
                    emitter.onNext(i);
                }
            }
        }, BackpressureStrategy.LATEST).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }

同样的, 上游无限循环发送事件, 策略选择Latest, 同时把Subscription保存起来, 方便在外部调用request(128).来看看这次的运行结果:

latest.gif

诶, 看上去好像和Drop差不多啊, Latest也首先保存了0-127这128个事件, 等下游把这128个事件处理了之后才进行之后的处理, 光从这里没有看出有任何区别啊...

古人云,师者,所以传道受业解惑也。人非生而知之者,孰能无惑?惑而不从师,其为惑也,终不解矣.

作为初学者的入门导师, 是不能给大家留下一点点疑惑的, 来让我们继续揭开这个疑问.

我们把上面两段代码改良一下, 先来看看DROP的改良版:

 Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; i < 10000; i++) {  //只发1w个事件
                    emitter.onNext(i);
                }
            }
        }, BackpressureStrategy.DROP).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;
                        s.request(128);  //一开始就处理掉128个事件
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

这段代码和之前有两点不同, 一是上游只发送了10000个事件, 二是下游在一开始就立马处理掉了128个事件, 然后我们在外部再调用request(128)试试, 来看看运行结果:

drop_1.gif

这次可以看到, 一开始下游就处理掉了128个事件, 当我们再次request的时候, 只得到了第3317的事件, 后面的事件直接被抛弃了.

再来看看Latest的运行结果吧:

latest_1.gif

从运行结果中可以看到, 除去前面128个事件, 与Drop不同, Latest总是能获取到最后最新的事件, 例如这里我们总是能获得最后一个事件9999.

好了, 关于FLowable的策略我们也讲完了, 有些朋友要问了, 这些FLowable是我自己创建的, 所以我可以选择策略, 那面对有些FLowable并不是我自己创建的, 该怎么办呢? 比如RxJava中的interval操作符, 这个操作符并不是我们自己创建的, 来看下面这个例子吧:

Flowable.interval(1, TimeUnit.MICROSECONDS)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Long>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;
                        s.request(Long.MAX_VALUE);
                    }

                    @Override
                    public void onNext(Long aLong) {
                        Log.d(TAG, "onNext: " + aLong);
                        try {
                            Thread.sleep(1000);  //延时1秒
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

interval操作符发送Long型的事件, 从0开始, 每隔指定的时间就把数字加1并发送出来, 在这个例子里, 我们让它每隔1毫秒就发送一次事件, 在下游延时1秒去接收处理, 不用猜也知道结果是什么:

zlc.season.rxjava2demo D/TAG: onSubscribe
zlc.season.rxjava2demo W/TAG: onError: 
                              io.reactivex.exceptions.MissingBackpressureException: Can't deliver value 128 due to lack of requests
                                  at io.reactivex.internal.operators.flowable.FlowableInterval$IntervalSubscriber.run(FlowableInterval.java:87)
                                  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:428)
                                  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:278)
                                  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:273)
                                  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
                                  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
                                  at java.lang.Thread.run(Thread.java:761)

一运行就抛出了MissingBackpressureException异常, 提醒我们发太多了, 那么怎么办呢, 这个又不是我们自己创建的FLowable啊...

别慌, 虽然不是我们自己创建的, 但是RxJava给我们提供了其他的方法:

  • onBackpressureBuffer()
  • onBackpressureDrop()
  • onBackpressureLatest()

熟悉吗? 这跟我们上面学的策略是一样的, 用法也简单, 拿刚才的例子现学现用:

Flowable.interval(1, TimeUnit.MICROSECONDS)
                .onBackpressureDrop()  //加上背压策略
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Long>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;
                        s.request(Long.MAX_VALUE);
                    }

                    @Override
                    public void onNext(Long aLong) {
                        Log.d(TAG, "onNext: " + aLong);
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

其余的我就不一一列举了.

好了, 今天的教程就到这里吧, 这一节我们学习了如何使用内置的BackpressureStrategy来解决上下游事件速率不均衡的问题. 这些策略其实之前我们将Observable的时候也提到过, 其实大差不差, 只要理解了为什么会上游发事件太快, 下游处理太慢这一点, 你就好处理了, FLowable无非就是给你封装好了, 确实对初学者友好一点, 但是很多初学者往往只知道How, 却不知道Why, 最重要的其实是知道why, 而不是How.

(其余的教程大多数到这里就结束了, 但是, 你以为FLowable就这么点东西吗, 骚年, Too young too simple, sometimes naive! 这仅仅是开始, 真正牛逼的还没来呢. 敬请关注下一节, 下节见 ! )

相关文章

  • RxJava

    教程 给初学者的RxJava2.0教程(一) 给初学者的RxJava2.0教程(二) 给初学者的RxJava2.0...

  • RXjave总结

    文章 给初学者的RxJava2.0教程(一)给初学者的RxJava2.0教程(二)

  • RxJava2.0的使用

    这里的讲解比较简单,易懂 给初学者的RxJava2.0教程(一) :基本工作原理给初学者的RxJava2.0教程(...

  • Rxjava2

    Season_zl给初学者的RxJava2.0教程 ObservableEmitter emitter 1....

  • rx - 收藏集 - 掘金

    给初学者的 RxJava2.0 教程 (二) - Android - 掘金作者博客 http://www.jian...

  • RxJava整理

    给初学者的RxJava2.0教程 ObservableEmitter 上游可以发送无限个onNext, 下游也可以...

  • test RxJava

    参考自: 给初学者的RxJava2.0教程(一) http://www.jianshu.com/p/464fa02...

  • Rxjava介绍<1>

    Rxjava github地址给初学者的RxJava2.0教程------水管系列手把手教你使用 RxJava 2...

  • Android

    大佬们的传送门: Season_zlc RxJava2 : 1.给初学者的RxJava2.0教程(一)

  • RxJava2使用

    学习入门RxJava2,推荐一位简书作者Season_zlc,他写的教程很详细。 给初学者的RxJava2.0教程...

网友评论

  • ITIan:Flowable.interval(1, TimeUnit.MICROSECONDS) 这是微秒
    c5e780c6d6ed:是的,我看了半天,是微秒 ,毫秒是MILLISECONDS:grin:
  • xiaosa_fu:很奇怪 每次测试无限发送事件包括在之前的demo中都不会出现OOM,在Android Profiler中看了,内存在不断升高但是到一定程度三百多兆的时候就升不上去了,不知道是不是VM自动GC了? 求大神解答~~
    ITIan:你测试系统应该是安卓6.0以上吧
  • leilifengxingmw:I am too young ,向大佬多学习。
  • 4b24b4c0fd7e:从原理到实现,很详尽,点个赞
  • bc1ff59c5b30:古人云,师者,所以传道受业解惑也。人非生而知之者,孰能无惑?惑而不从师,其为惑也,终不解矣.
  • 弓行佬:期待提高系列?
  • 5e2881e187d6:DROP的改良版,打印出log只显示出127条,是不是我哪里写错了呢,大神
    88a9fd9dbe0b:是不是一开始就处理掉128个事件的时候上游已经发完了,DROP后就没了,所以你拉取的时候就获取不到,要不试试request(50)或者上游发送10W条试试?我没写代码,只是猜想。
  • 7800ee121603:是微秒,不是毫秒啦
    2746e6b709f0:@金刚狼的粪叉 MILLISECONDS才是毫秒,而文中用的是MICROSECONDS,确实是微秒
    361579c6b5ba:ms单位。一般写1000是一秒,写1不就是1毫秒吗?没毛病
  • LowB界的扛把子:真的写得很详细,谢谢大神
  • 郑捡书:interval这个栗子想不明白,发射者和接收者都在同一线程,发送者每隔一秒发送一个事件,接收者处理完一次事件后暂停一秒,它们不就间隔一秒吗,为什么还会抛出异常?
    CCY0122:楼上注意了,是一微秒~
    891d6e4d1ded: @郑捡书 发射者是一毫秒,同学注意了
    37e1a998ccc8: @郑捡书 看第九节就明白了
  • 神秘的_空指针:不睡觉也要看完,喜欢95机枪变黄金AK:joy:
  • 7800ee121603:microsecond millisecond 傻傻分不清楚
    LeoWing:类比米、毫米、微米、纳米
  • 27efec53a72d:多谢作者
  • 代码君_Coder:Too young too simple, sometimes naive!
  • 斑马搬码:是不是漏了一个BackpressureStrategy.MISSING?这个是没有背压效果吗
  • BUG君:楼主用什么软件录屏的??
  • 212e8296c273:DROP的改良版,我运行出来的效果只打印到127后面再拉取,拉不出来了 啊!什么鬼,我以为我写的代码有问题,我又把你写的代码贴进去,效果也是一样?
    88a9fd9dbe0b:是不是一开始就处理掉128个事件的时候上游已经发完了,DROP后就没了,所以你拉取的时候就获取不到,要不试试request(50)或者上游发送10W条试试?我没写代码,只是猜想。
    c50e8290b6e7:我也是这样,如果用LATEST,就再可以拉出128。觉得很奇怪
  • 醉淸风:非常好,希望能出一些具体的使用案例
  • 苏村的南哥:期待下一章。
  • 6937b18671c3:最近这几篇要是再加上实际应用的场景案例就更完美啦:pray:
  • 李安然_5596:大神啊,倒数第二个例子并不会抛出MissingBackpressureException异常吧?有 s.request(Long.MAX_VALUE);这个调用啊?求教
    李安然_5596:@TripleZ 多谢指点哈
    TripleZhao: @李安然_5596 在下游处理的时候每一次都要让线程睡1s,但是上游的事件确实 1ms发送一次,所以还来不及让下游处理事件,程序直接就OOM了。
  • 西园无忌:大神,BackpressureStrategy.LATEST能不能这么理解:
    BackpressureStrategy.LATEST模式下,水缸满了以后,无法保留的事件也是被直接丢弃掉,但是,最后的那1个事件,被额外保存着。下游去拉取水缸中的事件时,如果水缸不为空,拉取到的是水缸中的事件;如果水缸为空,拉取到的是最后1个事件。
    烧烤摊前卖烧烤:我觉得应该也是,会缓存最新的一条,最新的一条会被重复的overwriting。
    2c4117a9984b:不是啊,看api 解释,最后的一个事件是会被重复overwriting 的 。
  • Bugme:求retrofit2好文章~
  • d8184ca3c970:那么到底用BackpressureStrategy哪个呢,error?
  • d8184ca3c970:有当年追越狱的感觉,顶大神。
  • cde86b15d3d6:你这个确实写的不错,但是你不更了 又没有什么推广 不然...............
    巴黎没有摩天轮Li:@沒亊偸著樂 那些 也许就要靠自己了:smiley:
    cde86b15d3d6:@Season_zlc 话说 下次能不能来点实际的 比如rxjava2.0实现轮播图 或者代替handlermessage方法实现页面更新 等等......(有打赏哦!)
    Season_zlc: @沒亊偸著樂 要更新要更新,等忙完这一阵就更,么么哒😚
  • 866f1a4a3a82:写得真是通俗易懂,非常赞,期待下一节!:+1:
  • cde86b15d3d6:说好的更新呢?!
    cde86b15d3d6:@Season_zlc 你这样会失去你的粉丝宝宝的
    cde86b15d3d6:@Season_zlc 我现在在做 rxjava mvp reretrofit 的项目 想挑战一把 有没有这一类的demo啊
    Season_zlc: @沒亊偸著樂 最近项目忙,没时间写,理解一下😂
  • f0f94add655d:这个系列很讲的很通俗易懂啊,支持作者~
  • 56f13502fd7d:每一章必看 希望有一天可以像你一样发一些高含量的文章
  • cde86b15d3d6:期待下一节
  • cde86b15d3d6:不错哦,我一直在看
  • 77356c477023:沙发没有,板凳也是可以得 :blush:
  • 5ee7a756a4d7:留名
    Season_zlc: @专宠玲 火钳

本文标题:给初学者的RxJava2.0教程(八)

本文链接:https://www.haomeiwen.com/subject/rnhhvttx.html