美文网首页Android: RxAndroid-RxJavaRx系列
给初学者的RxJava2.0教程(五)

给初学者的RxJava2.0教程(五)

作者: Season_zlc | 来源:发表于2016-12-13 12:19 被阅读27136次

    Outline

    [TOC]

    前言

    大家喜闻乐见的Backpressure来啦.

    这一节中我们将来学习Backpressure. 我看好多吃瓜群众早已坐不住了, 别急, 我们先来回顾一下上一节讲的Zip.

    正题

    上一节中我们说到Zip可以将多个上游发送的事件组合起来发送给下游, 那大家有没有想过一个问题, 如果其中一个水管A发送事件特别快, 而另一个水管B 发送事件特别慢, 那就可能出现这种情况, 发得快的水管A 已经发送了1000个事件了, 而发的慢的水管B 才发一个出来, 组合了一个之后水管A 还剩999个事件, 这些事件需要继续等待水管B 发送事件出来组合, 那么这么多的事件是放在哪里的呢? 总有一个地方保存吧? 没错, Zip给我们的每一根水管都弄了一个水缸 , 用来保存这些事件, 用通俗易懂的图片来表示就是:

    zip2.png

    如图中所示, 其中蓝色的框框就是zip给我们的水缸! 它将每根水管发出的事件保存起来, 等两个水缸都有事件了之后就分别从水缸中取出一个事件来组合, 当其中一个水缸是空的时候就处于等待的状态.

    题外话: 大家来分析一下这个水缸有什么特点呢? 它是按顺序保存的, 先进来的事件先取出来, 这个特点是不是很熟悉呀? 没错, 这就是我们熟知的队列, 这个水缸在Zip内部的实现就是用的队列, 感兴趣的可以翻看源码查看.

    好了回到正题上来, 这个水缸有大小限制吗? 要是一直往里存会怎样? 我们来看个例子:

    Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {    
        @Override                                                                          
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {       
            for (int i = 0; ; i++) {   //无限循环发事件                                                    
                emitter.onNext(i);                                                         
            }                                                                              
        }                                                                                  
    }).subscribeOn(Schedulers.io());    
                                                                                    
    Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {      
        @Override                                                                          
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {        
            emitter.onNext("A");                                                           
        }                                                                                  
    }).subscribeOn(Schedulers.io());    
                                                                   
    Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {                 
        @Override                                                                          
        public String apply(Integer integer, String s) throws Exception {                  
            return integer + s;                                                            
        }                                                                                  
    }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<String>() {                               
        @Override                                                                          
        public void accept(String s) throws Exception {                                    
            Log.d(TAG, s);                                                                 
        }                                                                                  
    }, new Consumer<Throwable>() {                                                         
        @Override                                                                          
        public void accept(Throwable throwable) throws Exception {                         
            Log.w(TAG, throwable);                                                         
        }                                                                                  
    });                                                                                    
    

    在这个例子中, 我们分别创建了两根水管, 第一根水管用机器指令的执行速度来无限循环发送事件, 第二根水管随便发送点什么, 由于我们没有发送Complete事件, 因此第一根水管会一直发事件到它对应的水缸里去, 我们来看看运行结果是什么样.

    运行结果GIF图:

    zip2.gif

    我勒个草, 内存占用以斜率为1的直线迅速上涨, 几秒钟就300多M , 最终报出了OOM:

    zlc.season.rxjava2demo W/art: Throwing OutOfMemoryError "Failed to allocate a 28 byte allocation with
    4194304 free bytes and 8MB until OOM; 
    zlc.season.rxjava2demo W/art: "main" prio=5 tid=1 Runnable      
    zlc.season.rxjava2demo W/art:   | group="main" sCount=0 dsCount=0 obj=0x75188710 self=0x7fc0efe7ba00   
    zlc.season.rxjava2demo W/art:   | sysTid=32686 nice=0 cgrp=default sched=0/0 handle=0x7fc0f37dc200    
    zlc.season.rxjava2demo W/art:   | state=R schedstat=( 0 0 0 ) utm=948 stm=120 core=1 HZ=100         
    zlc.season.rxjava2demo W/art:   | stack=0x7fff971e8000-0x7fff971ea000 stackSize=8MB         
    zlc.season.rxjava2demo W/art:   | held mutexes= "mutator lock"(shared held)    
    zlc.season.rxjava2demo W/art:     at java.lang.Integer.valueOf(Integer.java:742)                                                            
    

    出现这种情况肯定是我们不想看见的, 这里就可以引出我们的Backpressure了, 所谓的Backpressure其实就是为了控制流量, 水缸存储的能力毕竟有限, 因此我们还得从源头去解决问题, 既然你发那么快, 数据量那么大, 那我就想办法不让你发那么快呗.

    那么这个源头到底在哪里, 究竟什么时候会出现这种情况, 这里只是说的Zip这一个例子, 其他的地方会出现吗? 带着这个问题我们来探究一下.

    我们让事情变得简单一点, 从一个单一的Observable说起.

    来看段代码:

    Observable.create(new ObservableOnSubscribe<Integer>() {                         
        @Override                                                                    
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { 
            for (int i = 0; ; i++) {   //无限循环发事件                                              
                emitter.onNext(i);                                                   
            }                                                                        
        }                                                                            
    }).subscribe(new Consumer<Integer>() {                                           
        @Override                                                                    
        public void accept(Integer integer) throws Exception {                       
            Thread.sleep(2000);                                                      
            Log.d(TAG, "" + integer);                                                
        }                                                                            
    });                                                                              
    

    这段代码很简单, 上游同样无限循环的发送事件, 在下游每次接收事件前延时2秒. 上下游工作在同一个线程里, 来看下运行结果:

    peace.gif

    哎卧槽, 怎么如此平静, 感觉像是走错了片场.

    为什么呢, 因为上下游工作在同一个线程呀骚年们! 这个时候上游每次调用emitter.onNext(i)其实就相当于直接调用了Consumer中的:

       public void accept(Integer integer) throws Exception {                       
            Thread.sleep(2000);                                                      
            Log.d(TAG, "" + integer);                                                
       }     
    

    所以这个时候其实就是上游每延时2秒发送一次. 最终的结果也说明了这一切.

    那我们加个线程呢, 改成这样:

    Observable.create(new ObservableOnSubscribe<Integer>() {                            
        @Override                                                                       
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {    
            for (int i = 0; ; i++) {    //无限循环发事件                                                     
                emitter.onNext(i);                                                      
            }                                                                           
        }                                                                               
    }).subscribeOn(Schedulers.io())                                                    
            .observeOn(AndroidSchedulers.mainThread())                                  
            .subscribe(new Consumer<Integer>() {                                        
                @Override                                                               
                public void accept(Integer integer) throws Exception {                  
                    Thread.sleep(2000);                                                 
                    Log.d(TAG, "" + integer);                                           
                }                                                                       
            });                                                                         
    

    这个时候把上游切换到了IO线程中去, 下游到主线程去接收, 来看看运行结果呢:

    violence.gif

    可以看到, 给上游加了个线程之后, 它就像脱缰的野马一样, 内存又爆掉了.

    为什么不加线程和加上线程区别这么大呢, 这就涉及了同步异步的知识了.

    当上下游工作在同一个线程中时, 这时候是一个同步的订阅关系, 也就是说上游每发送一个事件必须等到下游接收处理完了以后才能接着发送下一个事件.

    当上下游工作在不同的线程中时, 这时候是一个异步的订阅关系, 这个时候上游发送数据不需要等待下游接收, 为什么呢, 因为两个线程并不能直接进行通信, 因此上游发送的事件并不能直接到下游里去, 这个时候就需要一个田螺姑娘来帮助它们俩, 这个田螺姑娘就是我们刚才说的水缸 ! 上游把事件发送到水缸里去, 下游从水缸里取出事件来处理, 因此, 当上游发事件的速度太快, 下游取事件的速度太慢, 水缸就会迅速装满, 然后溢出来, 最后就OOM了.

    这两种情况用图片来表示如下:

    同步:

    同步.png

    异步:

    异步.png

    从图中我们可以看出, 同步和异步的区别仅仅在于是否有水缸.

    相信通过这个例子大家对线程之间的通信也有了比较清楚的认知和理解.

    源头找到了, 只要有水缸, 就会出现上下游发送事件速度不平衡的情况, 因此当我们以后遇到这种情况时, 仔细思考一下水缸在哪里, 找到水缸, 你就找到了解决问题的办法.

    既然源头找到了, 那么下一节我们就要来学习如何去解决了. 下节见.

    相关文章

      网友评论

      • c91848a505a1:楼主真他娘是个人才。
      • 慕名66:博主我的也没有遇到oom,怎么回事,内存达不到256M就被回收了
      • 9b799b628c86:你好 关于一些合并操作符有一个问题 就说zip
        场景如下 我有两个接口需要访问,但是呢 如果两个接口都抛异常了 那rxjava就会报at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366) 版本是2.0.1 实在想不到解决方法 应该是使用方式不对 请指教
      • 9b799b628c86:你好 关于一些合并操作符有一个问题 就说zip
        场景如下 我有两个接口需要访问,但是呢 如果两个接口都抛异常了 那rxjava就会报at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366) 版本是2.0.1 实在想不到解决方法 应该是使用方式不对 请指教
      • leilifengxingmw:水管,水表,水缸,接下来还有啥,文章写得很好,通俗易懂。
        c459beb59113:水池 水库。。。
      • 9d13dac0b326:好文,好文,好文,话不多说!
      • 慵懒的哈士奇:这个查看内存的插件叫啥
        tinyvampirepudg:androdi studio自己有的,Android Profiler
      • 流TT月:感谢楼主的贡献,解释的很到位,对我等新手来讲容易懂
      • aa03fd42aa76:测试机一直发就没OOM,我也是很无奈...:sleepy:
      • 小丸子_unique:Season_zlc君 ,有交流群么,有微信公众号么,求大神哥哥分享
        Season_zlc:@小丸子_unique 没有公众号:grin: , 只有个RxDownload的群
      • LuciferZ:遇到个问题,本篇的第一个例子,如果在observable2中发送一个onComplete事件,这时候理论上observable1中应该不会再发送事件了吧(参考上一篇文章所讲),可是实际测试发现observable1中依然会一直不断的发送事件,这点好困惑,望博主明示
        LuciferZ:@ISeekU 新组合的observable 也需要上游两个observable 的数据流呀。昨天调查了一下,observable2发送onComplete的时候,observable1那个线程会被interrupted 。这也解释了为什么上一篇文章里面,如果observable1里面有sleep 的代码,observable2发送onComplete之后,会抛出异常。
        8f959b52e515:我觉得这个跟那个没关系,事件的发送其实是使用的新的组合而成的那个Observable,而非第二个的Observable,:relaxed: ,一起交流
      • 老年追梦人:第一次看技术类文章看的停不下来 :sob: 妈妈问我为什么跪着看博客:cold_sweat:
        老年追梦人:@Season_zlc 话说楼主不更了么。。。。。:flushed:
        Season_zlc:哈哈,怪我咯:joy:
      • testthat:楼主的rxjava系列讲的不错
      • 2a27448f5073:那个monitors是什么插件?非android开发有么?
      • MoonlightAniki:讲得真好!
      • emoji_lucky:rxjava2.x的Observable是不存在背压的概念的,首先博主都没有完全理解什么是背压,背压是下游控制上游流速的一种手段。在rxjava1.x的时代,上游会给下游set一个producer,下游通过producer向上游请求n个数据,这样上游就有记录下游请求了多少个数据,然后下游请求多少个上游就给多少个,这个就是背压。一般来讲,每个节点都有缓存,比如说缓存的大小是64,这个时候下游可以一次性向上游request 64个数据。rxjava1.x的有些操作符不支持背压,也就是说这些操作符不会给下游set一个producer,也就是上游根本不理会下游的请求,一直向下游丢数据,如果下游的缓存爆了,那么下游就会抛出MissingBackpressureException,也就是背压失效了。在rxjava2.x时代,上述的背压逻辑全部挪到Flowable里了,所以说Flowable支持背压。而2.x时代的Observable是没有背压的概念的,Observable如果来不及消费会死命的缓存直到OOM,所以rxjava2.x的官方文档里面有讲,大数据流用Flowable,小数据流用Observable
        daocaowe:我到底听谁的
        我的QQ宠物:这是学术的课堂,不要什么喷不喷的就上来了
        我是高手:@唐诗淳 人家没喷啊,首先博主写的文章很好,但rxjava 2.0 observable确实没有背压的概念,你可以看看其他文章
      • Gfish:水管水缸水龙头水阀水表 还有啥
      • 6734bd8f0a1b:麻烦问下楼主用的什么gif录制软件?:+1:
      • 1cf651476639:每篇都写的蛮少,但是很精练,实在是不错
      • KingJA:博主的无限发送事件的过程中怎么没进行垃圾自动回收?我测试过程中会经常性被JVM回收,内存不会直线上升
        tinyvampirepudg:我也没有oom:disappointed_relieved:
      • Siact:没有见过这么通俗易懂的作者,赞个
      • luo2016:确实写的好
      • icoo:楼主的一句,卧槽这么安静是不是走错片场了,给我笑喷了
      • boboyuwu:亲写的不错哦 给个赞 感觉结尾有点草草收场了。既然说到背压问题,问题说道了,水缸说了。那么当水缸满了怎么控制上游发射的速率呀 这个没有说呀
      • 77324d9b47bb:太牛逼了,这比喻简直不能再形象
      • b911b3a470b9:大神,上游采样和延时有没有丢失数据呢,采样如果不是是队列形式的采样那么如何保证数据的完整性呢
      • Cloverss:简直 per费!
      • eoeoops:如果博主在文首放一个上一篇的链接,文尾放一个下一篇的链接就更好了:blush:
        安卓猿:@Season_zlc 楼主 用户良苦
        eoeoops:@Season_zlc :sweat_smile:
        Season_zlc: @刘豆 你们都没明白我的良苦用心,我是怕你们看得太投入了停不下来,根本不敢加直达链接哈哈😂
      • d8184ca3c970:求楼主建群
        Season_zlc:@d8184ca3c970 你可以加我RxDownload的讨论群603610731
      • ling9400:楼主,最后的异步发送的,我测试了内存也不会直接爆表啊!感觉还是逐步递增的,不知道是哪里的问题?
      • kuwork:写得好,很详细。TOC无效,加一个目录形式的链接就更好了
      • Iyangc:实在太好了 给楼主打赏了下,感谢楼主的好文章,我接着往下看...还有,楼主我在前面问了一个问题 你空了看到 指点一下我啦。:clap:
      • cde86b15d3d6:额 打赏都出现bug了
        d8184ca3c970:@沒亊偸著樂 为什么我没有bug啊
        cde86b15d3d6:@Season_zlc 木有骗你啊
        Season_zlc: @沒亊偸著樂 骗人!😂
      • c31d01cd6aff: :no_mouth: 看到结尾....
      • d2ab3859e18f:猝不及防,结尾卖的一手好萌
        Season_zlc: @墨痕坊 😂
      • 067d829266d9:楼主,2.0 Flowable 取消订阅这块怎么弄?1.x subscrible 订阅是会返回一个Subscription,然后mCompositeSubscription.add(Subscription), 在 Activity 退出时取消订阅 mCompositeSubscription.unsubscribe(),2.x,subscrible订阅 返回值是 void 了,2.0把Subscription 放到 onSubscribe(Subscription s)中去了,那现在是直接在onSubscribe(Subscription s)方法中在 s.request(1)后s.cancel()?
        067d829266d9: @小小小池 楼主,我的问题另外一个大神帮解决了。以下是他的回答:Flowable 提供了 subscribeWith 方法可以返回当前订阅的观察者,并且通过 ResourceSubscriber 这种类来提供 Disposable 接口 按照你之前的做法,现在应该是这样 CompositeDisposable composite2 = new CompositeDisposable(); composite2.add(Flowable.range(1, 5).subscribeWith(subscriber)); subscriber 应该是 ResourceSubscriber 或者 DisposableSubscriber 的实例
      • junerver:楼主我恨你,天天跟追小说似地等更新 :disappointed_relieved:
        妙法莲花1234:@Season_zlc 撩的一手好妹:grin:
        Season_zlc:@junerver 哈哈, 就是喜欢看你们着急的样子~\(≧▽≦)/~
      • starCoder:一直跟着看,讲的不错
      • 067d829266d9:楼主,快更新啊,吃不饱啊,多更新些Flowable 啊
        妙法莲花1234:@Season_zlc 哈哈哈哈, 撒娇,,
        Season_zlc: @小小小池 哼,你们给的赞太少了,本宝宝生气😡
      • 小爨:不错
      • 小新GG:楼主讲的很详细,,,,希望速度快点啊,,,,,还有希望深入一点 针对Flowable 和Rxbus 多写点啊
        J大空:@Season_zlc 请问一下,这个是什么画图工具?好nice的样子
        妙法莲花1234:@Season_zlc 图画的相当棒啊,小学生都看明白了,嘿嘿
        Season_zlc: @zhouxin1233 我每天还要上班呀,都是抽空写的,还要画图,各位多多理解。😂

      本文标题:给初学者的RxJava2.0教程(五)

      本文链接:https://www.haomeiwen.com/subject/bcavmttx.html