给初学者的RxJava2.0教程(六)

作者: Season_zlc | 来源:发表于2016-12-14 17:23 被阅读23811次

Outline

[TOC]

前言

在上一节中, 我们找到了上下游流速不均衡的源头 , 在这一节里我们将学习如何去治理它 . 可能很多看过其他人写的文章的朋友都会觉得只有Flowable才能解决 , 所以大家对这个Flowable都抱有很大的期许 , 其实呐 , 你们毕竟图样图森破 , 今天我们先抛开Flowable, 仅仅依靠我们自己的双手和智慧 , 来看看我们如何去治理 , 通过本节的学习之后我们再来看Flowable, 你会发现它其实并没有想象中那么牛叉, 它只是被其他人过度神化了.

正题

我们接着来看上一节的这个例子:

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; ; i++) {  //无限循环发送事件
                    emitter.onNext(i);
                }
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "" + integer);
                    }
                });

上一节中我们看到了它的运行结果是直接爆掉了内存, 也明白它为什么就爆掉了内存, 那么我们能做些什么, 才能不让这种情况发生呢.

之前我们说了, 上游发送的所有事件都放到水缸里了, 所以瞬间水缸就满了, 那我们可以只放我们需要的事件到水缸里呀, 只放一部分数据到水缸里, 这样不就不会溢出来了吗, 因此, 我们把上面的代码修改一下:

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; ; i++) {
                    emitter.onNext(i);
                }
            }
        }).subscribeOn(Schedulers.io())
                .filter(new Predicate<Integer>() {
                    @Override
                    public boolean test(Integer integer) throws Exception {
                        return integer % 10 == 0;
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "" + integer);
                    }
                });

在这段代码中我们增加了一个filter, 只允许能被10整除的事件通过, 再来看看运行结果:

filter.gif

可以看到, 虽然内存依然在增长, 但是增长速度相比之前, 已经减少了太多了, 至少在我录完GIF之前还没有爆掉内存, 大家可以试着改成能被100整除试试.

可以看到, 通过减少进入水缸的事件数量的确可以缓解上下游流速不均衡的问题, 但是力度还不够, 我们再来看一段代码:

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; ; i++) {
                    emitter.onNext(i);
                }
            }
        }).subscribeOn(Schedulers.io())
                .sample(2, TimeUnit.SECONDS)  //sample取样
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "" + integer);
                    }
                });

这里用了一个sample操作符, 简单做个介绍, 这个操作符每隔指定的时间就从上游中取出一个事件发送给下游. 这里我们让它每隔2秒取一个事件给下游, 来看看这次的运行结果吧:

sample.gif

这次我们可以看到, 虽然上游仍然一直在不停的发事件, 但是我们只是每隔一定时间取一个放进水缸里, 并没有全部放进水缸里, 因此这次内存仅仅只占用了5M.

大家以后可以出去吹牛逼了: 我曾经通过技术手段去优化一个程序, 最终使得内存占用从300多M变成不到5M. (≧▽≦)/

前面这两种方法归根到底其实就是减少放进水缸的事件的数量, 是以数量取胜, 但是这个方法有个缺点, 就是丢失了大部分的事件.

那么我们换一个角度来思考, 既然上游发送事件的速度太快, 那我们就适当减慢发送事件的速度, 从速度上取胜, 听上去不错, 我们来试试:

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; ; i++) {
                    emitter.onNext(i);
                    Thread.sleep(2000);  //每次发送完事件延时2秒
                }
            }
        }).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "" + integer);
                    }
                });

这次我们让上游每次发送完事件后都延时了2秒, 来看看运行结果:

sleep.gif

完美 ! 一切都是那么完美 !

可以看到, 我们给上游加上延时了之后, 瞬间一头发情的公牛就变得跟只小绵羊一样, 如此温顺, 如此平静, 如此平稳的内存线, 美妙极了. 而且事件也没有丢失, 上游通过适当的延时, 不但减缓了事件进入水缸的速度, 也可以让下游充足的时间从水缸里取出事件来处理 , 这样一来, 就不至于导致大量的事件涌进水缸, 也就不会OOM啦.

到目前为止, 我们没有依靠任何其他的工具, 就轻易解决了上下游流速不均衡的问题.

因此我们总结一下, 本节中的治理的办法就两种:

  • 一是从数量上进行治理, 减少发送进水缸里的事件
  • 二是从速度上进行治理, 减缓事件发送进水缸的速度

大家一定没忘记, 在上一节还有个Zip的例子, 这个例子也爆了我们的内存, 现学现用, 我们用刚学到的办法来试试能不能惩奸除恶, 先来看看第一种办法.

先来减少进入水缸的事件的数量:

Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; ; i++) {
                    emitter.onNext(i);
                }
            }
        }).subscribeOn(Schedulers.io()).sample(2, TimeUnit.SECONDS); //进行sample采样

        Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("A");
            }
        }).subscribeOn(Schedulers.io());

        Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
            @Override
            public String apply(Integer integer, String s) throws Exception {
                return integer + s;
            }
        }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, s);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.w(TAG, throwable);
            }
        });

来试试运行结果吧:

zip_sample.gif

哈哈, 成功了吧, 再来用第二种办法试试.

这次我们来减缓速度:

Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; ; i++) {
                    emitter.onNext(i);
                    Thread.sleep(2000);  //发送事件之后延时2秒
                }
            }
        }).subscribeOn(Schedulers.io());

        Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("A");
            }
        }).subscribeOn(Schedulers.io());

        Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
            @Override
            public String apply(Integer integer, String s) throws Exception {
                return integer + s;
            }
        }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<String>() {
            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, s);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                Log.w(TAG, throwable);
            }
        });

来看看运行结果吧:

zip_sleep.gif

果然也成功了, 这里只打印出了下游收到的事件, 所以只有一个. 如果你对这个结果看不懂, 请自觉掉头看前面几篇文章.

通过本节的学习, 大家应该对如何处理上下游流速不均衡已经有了基本的认识了, 大家也可以看到, 我们并没有使用Flowable, 所以很多时候仔细去分析问题, 找到问题的原因, 从源头去解决才是最根本的办法. 后面我们讲到Flowable的时候, 大家就会发现它其实没什么神秘的, 它用到的办法和我们本节所讲的基本上是一样的, 只是它稍微做了点封装.

好了, 今天的教程就到这里吧, 下一节中我们就会来学习你们喜闻乐见的Flowable.

相关文章

  • RxJava

    教程 给初学者的RxJava2.0教程(一) 给初学者的RxJava2.0教程(二) 给初学者的RxJava2.0...

  • RXjave总结

    文章 给初学者的RxJava2.0教程(一)给初学者的RxJava2.0教程(二)

  • RxJava2.0的使用

    这里的讲解比较简单,易懂 给初学者的RxJava2.0教程(一) :基本工作原理给初学者的RxJava2.0教程(...

  • Rxjava2

    Season_zl给初学者的RxJava2.0教程 ObservableEmitter emitter 1....

  • rx - 收藏集 - 掘金

    给初学者的 RxJava2.0 教程 (二) - Android - 掘金作者博客 http://www.jian...

  • RxJava整理

    给初学者的RxJava2.0教程 ObservableEmitter 上游可以发送无限个onNext, 下游也可以...

  • test RxJava

    参考自: 给初学者的RxJava2.0教程(一) http://www.jianshu.com/p/464fa02...

  • Rxjava介绍<1>

    Rxjava github地址给初学者的RxJava2.0教程------水管系列手把手教你使用 RxJava 2...

  • Android

    大佬们的传送门: Season_zlc RxJava2 : 1.给初学者的RxJava2.0教程(一)

  • RxJava2使用

    学习入门RxJava2,推荐一位简书作者Season_zlc,他写的教程很详细。 给初学者的RxJava2.0教程...

网友评论

  • 永远改不完的bug的Coder:2018.10.17 打卡 点赞~
  • 逍遥wqy:感觉这篇文章只是举例
  • sakurekid:好厉害
  • 断臂残猿:这一章讲的有实际作用吗
  • 9277d1463f65:感觉这种方法作用不大,因为限制了上游的速度,而实际情况通常是你不清楚上下游的速度比例,所以这种做法会影响效率。还没看作者下面的文章,感觉获益良多。
  • hhhjjj_4590:大神啊, 你硬是把枯燥的代码写成了情节曲折的小说, 除了膜拜, 我还能说什么呢:smile: . 太赞了
  • e1419562d836:其实还可以通过信号量来解决
    mEmiter = emitter;
    for(int i=0;i<1000;i++){
    s1.acquire();
    emitter.onNext(i);
    Log.d(TAG,"emitter: " + i);
    printCrxThread("emiter" + i);
    }

    Log.d(TAG,"onNext: " + integer);
    printCrxThread("onNext");
    Thread.sleep(1000);
    s1.release();
  • xiaodouyaer:我曾经通过技术手段去优化一个程序, 最终使得内存占用从300多M变成不到5M.:joy: 老板是不是要给我加薪啊:joy:
    博雅波:@Jay_Lwp 你肯定没仔细看这篇博客.....
    kirito0424:不存在的,你要是给老板挣了1000w,肯定给你加薪
    Jay_Lwp:求写一篇博客指点下?
  • 慕名66:very very nice!!!!!!!
  • 杨晓是大V:啊哈哈哈,第四次拜读路过
  • 小龍五:已经连续修炼到第六集了,楼主是我见过最帅的!
  • 风之丨旅人:这种解决问题的思想对我很受用。。。多谢楼主
  • Ikulm:使用sample,每隔两秒取一个事件扔到水缸,这时的i不可能是0吧,i在循环里面会一直递增,我的最终输出的结果:2067031A
  • 轻舞飞絮:深入浅出,只能说666
  • 9d13dac0b326:This is the best article for RxJava I've ever read!!!
  • 千里之行死于足下:纠正小编一个错误,小绵羊也有可能是一头"领头羊",所以也会发飙的,哈哈......开个小玩笑.
  • a27941a6db65:Zip操作从数量和速度上来治理没有明白,第一个水管无限循环虽说数量减少了,速度减慢了,但是一样会缓存到水缸,但是为啥GIF图中的内存如此平稳
    圣诞小熊:@钟小北 速度减慢之后,控制了流速,第一个observable和第二个就匹配上了,然后根据zip的功能,其他的就没用了,再怎么循环下游也接收不到了,也就不会造成oom了,我是这么理解的,不对的地方请指正。
    骄傲的奔波儿灞:我也是这样觉着,这两种处理方法,在zip操作中,并不能彻底解决问题,只是减缓了内存的增长速度;我特意做了个试验,让Observable1间隔1S无限发送,让Observable2间隔10S发送A和B,会发现输出结果是0A和1B,虽然此时Observable1这里的循环已经执行到发送第10个事件了,但是zip操作依旧是顺序的取Observable1的第2个事件1来和Observable2的第2个事件B进行结合,这也就说明Observable1发送的事件已经是存在了队列里,只是速度慢了而已;不知道分析的对不对,希望高手解答;
  • 87f826a8000a:同楼上,也是我看过的写的最清晰易懂的文章。技术文就要这样,针对还没入门的,讲那些原理术语应该也只有懂的人才看得懂了。
  • Mocaris:厉害啊,这是我最认真看的文章:smirk:
  • 溅射:如果是2个Observable都循环的话,就会内存溢出,楼主第二个它只会打印一次,内存当然不会溢出的,恩。
  • MiracleSoul:请问一下用这种方法解决Zip的问题是否还是不能解决,因为第一个水管还是一直在循环发事件,而第二个水管只有一个事件。久了之后,水缸不是一样会溢出吗?为什么从Gif图上看内存并没有增长的趋势?
  • sun_wenming:麻烦问一下老哥。 你是用的什么工具录制gif的,我用的gifcam 貌似无法录制鼠标。
    sun_wenming:@Season_zlc 好的,我看一下具体设置。刚接触这个 gifcam
    Season_zlc:@A我是文明人 就是gifcam啊
  • Devnan:666,通俗易懂
  • 顶级工程师闯天涯:666,追本溯源....
  • doc__wei:玩操作符玩的很溜啊,,采样sample,,
  • g小志:这么好文章就是发的太慢了。不过博主辛苦了,但是不要脸的我还是希望博主快点更新,通俗易懂,直击要害,所以学的很快 速度更新。
  • longforus::+1: 通俗易懂 👍
  • zhangsht:不知不觉从一翻到六了 打个tag
  • 0445981d6022:6的飞起
  • 我不是番茄请叫我西红柿:我用了sample结果只能收到上游最后一个事件其他的无法接收
  • 干LeetCode:老铁,我现在的项目就需要大量、快速的发送数据,这两种解决方法都不行。。我用的1.x的版本,怎么办呢:confounded:
  • 27efec53a72d:太厉害了
  • Decillion:是不是大家把你也神化了!不管怎样我现在就感觉你的文章写的太NB了!
  • 准备流浪的雪:因为第二个Observable只发送了一个事件,zip组合多个Observable的时候,取最少的Observable
  • StoneHui:1. 使用 sample 操作符的确定文中已经提到,会丢失部分事件;
    2. 使用 sleep 延时的操作也并不完美,下游处理过慢(超过 sleep 时间)时依然后丢失事件。
    3. RxJava1.x 中解决 BackPressure 的最优雅方式应该是使用 request(long n) 方法。具体如下:
    cde86b15d3d6:@StoneHui 也是6 ,不过现在都是rx2.0了
    StoneHui:
    Observable.from(stringList).observeOn(Schedulers.io()).subscribe(new Subscriber<String>() {
    @Override
    public void onStart() {
    request(1); // 取出一条新数据
    }

    @Override
    public void onCompleted() {
    }

    @Override
    public void onError(Throwable e) {
    e.printStackTrace();
    }

    @Override
    public void onNext(ResourceUpdate resourceUpdate) {
    // TODO 执行下游处理操作
    ...

    request(1); // 取出一条新数据
    }
    });
  • 1cf651476639:没想到安卓还有个这么牛叉的工具哈
  • f7e986a9044f:楼主,文章一开始内存增长缓慢的原因不是加filter啊,你把consumer里的sleep(2)去掉了,消费速度就快了。
  • 木木仨儿:干货文章,赞
  • T9的第三个三角:延迟发送不失为一个好方法,但是不会延迟处理时间吗?如果本身就是耗时操作,再又延迟了 一段时间,导致整个过程耗时更久
  • 向阳薰衣草:逻辑清晰,通俗易懂 ,非常好,多谢分享!
  • 73ece15c815f:大家以后可以出去吹牛逼了: 我曾经通过技术手段去优化一个程序, 最终使得内存占用从300多M变成不到5M. ~(≧▽≦)/~。。哈哈 牛B,一直在用1,还没时间看2.。今天一下看了几篇,,收获很多啊。
  • 海角秋风:治标不治本啊......而且,背压是啥?......
    水池对应的方法是啥?只是自己抽象出来的而已啊.
  • 风骚无俩:高手啊
  • 6c1e2188d9f1:学习~ 通俗易懂。持续关注~~~
  • luo2016:水平很高
  • boboyuwu:我的错 。上篇没看完 。 哈哈连续剧不错
  • 2b0f4108a645:很赞,,很清晰,边看边写列子运行看效果,:smile:
  • 贾亦真亦贾:所以 背压是什么来着?
    慕名66:@等__的空瓶子 牛逼!!!
    a59e9506b318:按字面理解简单来说,背部的压力,就是你走得慢了,后面有人推你
    妙法莲花1234:发送比接收快,快的不是一点点..
  • 33a24f836282:看到了一股清流:smile:
  • 0baebfeea614:通俗易懂很实用
  • d8184ca3c970:通俗易懂,分析透彻,多谢大神。也是我看过的最好的rxjava的文章
    阳光的nick_lxz:@Season_zlc 这真是的是那个阿里小浣熊吗,,还有老哥你这文章写的真心好
    Season_zlc:@阳光的nick_lxz 这儿都能碰到阿里熊的粉丝:joy:
  • 0f69314fd625:写的不错,是我看到了Rxjava最好的文章
  • c31d01cd6aff: :no_mouth:
    c31d01cd6aff:@小耳朵图图是我 :no_mouth: 结尾告诉我们要好好学习~
    EitanLiu: @久山崎 结尾写什么了,关于女猪脚的?
    Season_zlc:@久山崎 :joy: 结尾已删
  • 3da9532e41be:通俗易懂上乘之作
  • 3aead53c9347:等下一章女主出场!!
    zhangwenhaojf40:写的很棒0
    Season_zlc: @菜馆香 哈哈,这一节没有女主角,只有小日本
  • junerver:没有喜欢有打赏啊,期待这个系列的更新~
    Season_zlc: @junerver 哈哈,谢谢
  • 小新GG:很多时候仔细去分析问题, 找到问题的原因, 从源头去解决才是最根本的办法,,,,,,这句话很受教,,,,厉害了 word哥
    阿V很简单:@炎之铠 哎, 配置问题真的 很烦
    炎之铠:最怕的就是出了问题,怎么找都找不到原因——配置环境经常出现的情况
    妙法莲花1234:的确啊,遇事不要着急,找到问题原因, 解决方法往往应运而生..:grin:
  • 简约黑:通俗易懂,厉害🏆

本文标题:给初学者的RxJava2.0教程(六)

本文链接:https://www.haomeiwen.com/subject/isaimttx.html