美文网首页Android-RxJavaRx系列Android进阶
给初学者的RxJava2.0教程(七)

给初学者的RxJava2.0教程(七)

作者: Season_zlc | 来源:发表于2016-12-19 17:18 被阅读31506次

Outline

[TOC]

前言

上一节里我们学习了只使用Observable如何去解决上下游流速不均衡的问题, 之所以学习这个是因为Observable还是有很多它使用的场景, 有些朋友自从听说了Flowable之后就觉得Flowable能解决任何问题, 甚至有抛弃Observable这种想法, 这是万万不可的, 它们都有各自的优势和不足.

在这一节里我们先来学习如何使用Flowable, 它东西比较多, 也比较繁琐, 解释起来也比较麻烦, 但我还是尽量用通俗易懂的话来说清楚, 毕竟, 这是一个通俗易懂的教程.

正题

我们还是以两根水管举例子:

prepare.png

之前我们所的上游和下游分别是ObservableObserver, 这次不一样的是上游变成了Flowable, 下游变成了Subscriber, 但是水管之间的连接还是通过subscribe(), 我们来看看最基本的用法吧:

Flowable<Integer> upstream = Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "emit 1");
                emitter.onNext(1);
                Log.d(TAG, "emit 2");
                emitter.onNext(2);
                Log.d(TAG, "emit 3");
                emitter.onNext(3);
                Log.d(TAG, "emit complete");
                emitter.onComplete();
            }
        }, BackpressureStrategy.ERROR); //增加了一个参数

        Subscriber<Integer> downstream = new Subscriber<Integer>() {

            @Override
            public void onSubscribe(Subscription s) {
                Log.d(TAG, "onSubscribe");
                s.request(Long.MAX_VALUE);  //注意这句代码
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: " + integer);

            }

            @Override
            public void onError(Throwable t) {
                 Log.w(TAG, "onError: ", t);
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        };

        upstream.subscribe(downstream);

这段代码中,分别创建了一个上游Flowable和下游Subscriber, 上下游工作在同一个线程中, 和之前的Observable的使用方式只有一点点的区别, 先来看看运行结果吧:

D/TAG: onSubscribe   
D/TAG: emit 1        
D/TAG: onNext: 1     
D/TAG: emit 2        
D/TAG: onNext: 2     
D/TAG: emit 3        
D/TAG: onNext: 3     
D/TAG: emit complete 
D/TAG: onComplete    

结果也和我们预期的是一样的.

我们注意到这次和Observable有些不同. 首先是创建Flowable的时候增加了一个参数, 这个参数是用来选择背压,也就是出现上下游流速不均衡的时候应该怎么处理的办法, 这里我们直接用BackpressureStrategy.ERROR这种方式, 这种方式会在出现上下游流速不均衡的时候直接抛出一个异常,这个异常就是著名的MissingBackpressureException. 其余的策略后面再来讲解.

另外的一个区别是在下游的onSubscribe方法中传给我们的不再是Disposable了, 而是Subscription, 它俩有什么区别呢, 首先它们都是上下游中间的一个开关, 之前我们说调用Disposable.dispose()方法可以切断水管, 同样的调用Subscription.cancel()也可以切断水管, 不同的地方在于Subscription增加了一个void request(long n)方法, 这个方法有什么用呢, 在上面的代码中也有这么一句代码:

 s.request(Long.MAX_VALUE);  

这句代码有什么用呢, 不要它可以吗? 我们来试试:

 Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "emit 1");
                emitter.onNext(1);
                Log.d(TAG, "emit 2");
                emitter.onNext(2);
                Log.d(TAG, "emit 3");
                emitter.onNext(3);
                Log.d(TAG, "emit complete");
                emitter.onComplete();
            }
        }, BackpressureStrategy.ERROR).subscribe(new Subscriber<Integer>() {

            @Override
            public void onSubscribe(Subscription s) {
                Log.d(TAG, "onSubscribe");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: " + integer);

            }

            @Override
            public void onError(Throwable t) {
                Log.w(TAG, "onError: ", t);
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });

这次我们取消掉了request这句代码, 来看看运行结果:

zlc.season.rxjava2demo D/TAG: onSubscribe
zlc.season.rxjava2demo D/TAG: emit 1
zlc.season.rxjava2demo W/TAG: onError: 
                              io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
                                  at io.reactivex.internal.operators.flowable.FlowableCreate$ErrorAsyncEmitter.onOverflow(FlowableCreate.java:411)
                                  at io.reactivex.internal.operators.flowable.FlowableCreate$NoOverflowBaseAsyncEmitter.onNext(FlowableCreate.java:377)
                                  at zlc.season.rxjava2demo.demo.ChapterSeven$3.subscribe(ChapterSeven.java:77)
                                  at io.reactivex.internal.operators.flowable.FlowableCreate.subscribeActual(FlowableCreate.java:72)
                                  at io.reactivex.Flowable.subscribe(Flowable.java:12218)
                                  at zlc.season.rxjava2demo.demo.ChapterSeven.demo2(ChapterSeven.java:111)
                                  at zlc.season.rxjava2demo.MainActivity$2.onClick(MainActivity.java:36)
                                  at android.view.View.performClick(View.java:5637)
                                  at android.view.View$PerformClick.run(View.java:22429)
                                  at android.os.Handler.handleCallback(Handler.java:751)
                                  at android.os.Handler.dispatchMessage(Handler.java:95)
                                  at android.os.Looper.loop(Looper.java:154)
                                  at android.app.ActivityThread.main(ActivityThread.java:6119)
                                  at java.lang.reflect.Method.invoke(Native Method)
                                  at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:886)
                                  at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:776)
zlc.season.rxjava2demo D/TAG: emit 2
zlc.season.rxjava2demo D/TAG: emit 3
zlc.season.rxjava2demo D/TAG: emit complete

哎哎哎, 大兄弟, 怎么一言不合就抛异常?

从运行结果中可以看到, 在上游发送第一个事件之后, 下游就抛出了一个著名的MissingBackpressureException异常, 并且下游没有收到任何其余的事件. 可是这是一个同步的订阅呀, 上下游工作在同一个线程, 上游每发送一个事件应该会等待下游处理完了才会继续发事件啊, 不可能出现上下游流速不均衡的问题呀.

带着这个疑问, 我们再来看看异步的情况:

Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "emit 1");
                emitter.onNext(1);
                Log.d(TAG, "emit 2");
                emitter.onNext(2);
                Log.d(TAG, "emit 3");
                emitter.onNext(3);
                Log.d(TAG, "emit complete");
                emitter.onComplete();
            }
        }, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

这次我们同样去掉了request这句代码, 但是让上下游工作在不同的线程, 来看看运行结果:

zlc.season.rxjava2demo D/TAG: onSubscribe
zlc.season.rxjava2demo D/TAG: emit 1
zlc.season.rxjava2demo D/TAG: emit 2
zlc.season.rxjava2demo D/TAG: emit 3
zlc.season.rxjava2demo D/TAG: emit complete

哎, 这次上游正确的发送了所有的事件, 但是下游一个事件也没有收到. 这是因为什么呢?

这是因为Flowable在设计的时候采用了一种新的思路也就是响应式拉取的方式来更好的解决上下游流速不均衡的问题, 与我们之前所讲的控制数量控制速度不太一样, 这种方式用通俗易懂的话来说就好比是叶问打鬼子, 我们把上游看成小日本, 把下游当作叶问, 当调用Subscription.request(1)时, 叶问就说我要打一个! 然后小日本就拿出一个鬼子给叶问, 让他打, 等叶问打死这个鬼子之后, 再次调用request(10), 叶问就又说我要打十个! 然后小日本又派出十个鬼子给叶问, 然后就在边上看热闹, 看叶问能不能打死十个鬼子, 等叶问打死十个鬼子后再继续要鬼子接着打...

所以我们把request当做是一种能力, 当成下游处理事件的能力, 下游能处理几个就告诉上游我要几个, 这样只要上游根据下游的处理能力来决定发送多少事件, 就不会造成一窝蜂的发出一堆事件来, 从而导致OOM. 这也就完美的解决之前我们所学到的两种方式的缺陷, 过滤事件会导致事件丢失, 减速又可能导致性能损失. 而这种方式既解决了事件丢失的问题, 又解决了速度的问题, 完美 !

但是太完美的东西也就意味着陷阱也会很多, 你可能只是被它的外表所迷惑, 失去了理智, 如果你滥用或者不遵守规则, 一样会吃到苦头.

比如这里需要注意的是, 只有当上游正确的实现了如何根据下游的处理能力来发送事件的时候, 才能达到这种效果, 如果上游根本不管下游的处理能力, 一股脑的瞎他妈发事件, 仍然会产生上下游流速不均衡的问题, 这就好比小日本管他叶问要打几个, 老子直接拿出1万个鬼子, 这尼玛有种打死给我看看? 那么如何正确的去实现上游呢, 这里先卖个关子, 之后我们再来讲解.

学习了request, 我们就可以解释上面的两段代码了.

首先第一个同步的代码, 为什么上游发送第一个事件后下游就抛出了MissingBackpressureException异常, 这是因为下游没有调用request, 上游就认为下游没有处理事件的能力, 而这又是一个同步的订阅, 既然下游处理不了, 那上游不可能一直等待吧, 如果是这样, 万一这两根水管工作在主线程里, 界面不就卡死了吗, 因此只能抛个异常来提醒我们. 那如何解决这种情况呢, 很简单啦, 下游直接调用request(Long.MAX_VALUE)就行了, 或者根据上游发送事件的数量来request就行了, 比如这里request(3)就可以了.

然后我们再来看看第二段代码, 为什么上下游没有工作在同一个线程时, 上游却正确的发送了所有的事件呢? 这是因为在Flowable里默认有一个大小为128的水缸, 当上下游工作在不同的线程中时, 上游就会先把事件发送到这个水缸中, 因此, 下游虽然没有调用request, 但是上游在水缸中保存着这些事件, 只有当下游调用request时, 才从水缸里取出事件发给下游.

是不是这样呢, 我们来验证一下:

    public static void request(long n) {
        mSubscription.request(n); //在外部调用request请求上游
    }

    public static void demo3() {
        Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                Log.d(TAG, "emit 1");
                emitter.onNext(1);
                Log.d(TAG, "emit 2");
                emitter.onNext(2);
                Log.d(TAG, "emit 3");
                emitter.onNext(3);
                Log.d(TAG, "emit complete");
                emitter.onComplete();
            }
        }, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;  //把Subscription保存起来
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });
    }

这里我们把Subscription保存起来, 在界面上增加了一个按钮, 点击一次就调用Subscription.request(1), 来看看运行结果:

request.gif

结果似乎像那么回事, 上游发送了四个事件保存到了水缸里, 下游每request一个, 就接收一个进行处理.

刚刚我们有说到水缸的大小为128, 有朋友就问了, 你说128就128吗, 又不是唯品会周年庆, 我不信. 那就来验证一下:

Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; i < 128; i++) {
                    Log.d(TAG, "emit " + i);
                    emitter.onNext(i);
                }
            }
        }, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.d(TAG, "onSubscribe");
                        mSubscription = s;
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.w(TAG, "onError: ", t);
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete");
                    }
                });

这里我们让上游一次性发送了128个事件, 下游一个也不接收, 来看看运行结果:

zlc.season.rxjava2demo D/TAG: onSubscribe
zlc.season.rxjava2demo D/TAG: emit 0
  ...
zlc.season.rxjava2demo D/TAG: emit 126
zlc.season.rxjava2demo D/TAG: emit 127

这段代码的运行结果很正常, 没有任何错误和异常, 上游仅仅是发送了128个事件.

那来试试129个呢, 把上面代码中的128改成129试试:

zlc.season.rxjava2demo D/TAG: onSubscribe
zlc.season.rxjava2demo D/TAG: emit 0
  ...
zlc.season.rxjava2demo D/TAG: emit 126
zlc.season.rxjava2demo D/TAG: emit 127
zlc.season.rxjava2demo D/TAG: emit 128  //这是第129个事件
zlc.season.rxjava2demo W/TAG: onError: 
                              io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
                                  at io.reactivex.internal.operators.flowable.FlowableCreate$ErrorAsyncEmitter.onOverflow(FlowableCreate.java:411)
                                  at io.reactivex.internal.operators.flowable.FlowableCreate$NoOverflowBaseAsyncEmitter.onNext(FlowableCreate.java:377)
                                  at zlc.season.rxjava2demo.demo.ChapterSeven$7.subscribe(ChapterSeven.java:169)
                                  at io.reactivex.internal.operators.flowable.FlowableCreate.subscribeActual(FlowableCreate.java:72)
                                  at io.reactivex.Flowable.subscribe(Flowable.java:12218)
                                  at io.reactivex.internal.operators.flowable.FlowableSubscribeOn$SubscribeOnSubscriber.run(FlowableSubscribeOn.java:82)
                                  at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59)
                                  at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51)
                                  at java.util.concurrent.FutureTask.run(FutureTask.java:237)
                                  at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272)
                                  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
                                  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
                                  at java.lang.Thread.run(Thread.java:761)  

这次可以看到, 在上游发送了第129个事件的时候, 就抛出了MissingBackpressureException异常, 提醒我们发洪水啦. 当然了, 这个128也不是我凭空捏造出来的, Flowable的源码中就有这个buffersize的大小定义, 可以自行查看.

注意这里我们是把上游发送的事件全部都存进了水缸里, 下游一个也没有消费, 所以就溢出了, 如果下游去消费了事件, 可能就不会导致水缸溢出来了. 这里我们说的是可能不会, 这也很好理解, 比如刚才这个例子上游发了129个事件, 下游只要快速的消费了一个事件, 就不会溢出了, 但如果下游过了十秒钟再来消费一个, 那肯定早就溢出了.

好了, 今天的教程就到这里了, 下一节我们将会更加深入的去学习FLowable, 敬请期待.

(哈哈, 给我的RxDownload打个广告: RxDownload是一个基于RxJava的多线程+断点续传的下载工具, 感兴趣的来GitHub点个star吧☺. 电梯直达->戳这里 )

相关文章

  • RxJava

    教程 给初学者的RxJava2.0教程(一) 给初学者的RxJava2.0教程(二) 给初学者的RxJava2.0...

  • RXjave总结

    文章 给初学者的RxJava2.0教程(一)给初学者的RxJava2.0教程(二)

  • RxJava2.0的使用

    这里的讲解比较简单,易懂 给初学者的RxJava2.0教程(一) :基本工作原理给初学者的RxJava2.0教程(...

  • Rxjava2

    Season_zl给初学者的RxJava2.0教程 ObservableEmitter emitter 1....

  • rx - 收藏集 - 掘金

    给初学者的 RxJava2.0 教程 (二) - Android - 掘金作者博客 http://www.jian...

  • RxJava整理

    给初学者的RxJava2.0教程 ObservableEmitter 上游可以发送无限个onNext, 下游也可以...

  • test RxJava

    参考自: 给初学者的RxJava2.0教程(一) http://www.jianshu.com/p/464fa02...

  • Rxjava介绍<1>

    Rxjava github地址给初学者的RxJava2.0教程------水管系列手把手教你使用 RxJava 2...

  • Android

    大佬们的传送门: Season_zlc RxJava2 : 1.给初学者的RxJava2.0教程(一)

  • RxJava2使用

    学习入门RxJava2,推荐一位简书作者Season_zlc,他写的教程很详细。 给初学者的RxJava2.0教程...

网友评论

  • wu2007369:艾玛,叶问打鬼子,把我笑得。
    突然想起来还有上一章的脱缰的野马,发情的公牛,温顺的小白兔。
    太生动了
  • ZzzBj:从第一篇开始看 ,楼主思维发散,思路清晰,但真奇才,小弟贫穷只能送上喜欢和star:stuck_out_tongue_winking_eye:
  • sakurekid:我是叶问,我要一个打128个,哈哈哈
  • 01sr:可是为什么同步的时候上游不把事件暂时放到水缸里呢
    ITIan:同步就必须从头走到尾,不能中断,你同步放入水缸,难道就不会卡住吗,一样是卡住,这样下游也就没有存在意义了,异步的话,存入水缸卡住,下游因为是不同线程也可以等待一段时间再处理,这样下游就有存在的意义
    a117173647c2:同步和异步区别了解一下
  • c459beb59113:唯品会给广告费了吗:clap:
    Season_zlc:@c459beb59113 没给!!😠 我是不是要找他们给广告费!
  • BugFree张瑞:满脑子都是叶问打鬼子
  • 吉哈达:写的非常好。
  • leilifengxingmw:为什么我的手没有发抖,因为我的兜里没钱:smile:
  • 4adbc21fddb2:request:处理下游事件的能力
    哈哈哈哈作者写的太通俗易懂了,一口气看了七节,学到了很多!
  • 叶落无痕52:个人觉得,博主这语言表达能力应该是可以出书了!
  • 不正就是歪573:我感觉我好像是来看故事会的。。。。
  • 杨晓是大V:啊哈哈哈,很给力
  • 青衿尘:我能打十个😂
  • 青衿尘:我能打十个:joy:
  • d3804c22b6d1:写的真心好
  • lipeiyan:叶问打鬼子很好,哈哈
  • 4321177a4375:确实大佬你几年安卓啊 说出来让我惭愧一下
  • 郭效江:兄弟,你讲的通俗易懂,幽默形象。我来晚了
  • cb7bede841b1:打鬼子的比喻真的是太搞笑了
  • JayQiu:很喜欢叶问大鬼子的比喻
  • 菜的扣jio啊:叶问打鬼子这个例子解释,真的把我逗笑了,哈哈
  • 千里之行死于足下:有几个问题不明白,为什么小日本会按要求分配小鬼子给叶问打,然后站一边看热闹?这剧情设计得有点跳跃耶!还有,叶问谁啊?打篮球的么?
    a521wangzhaof:@千里之行死于足下 语文功底不行,只是比喻而已
    361579c6b5ba:叶问系列的电影都是这样的,小日本分配几个鬼子让你打,显示分配垃圾的,最后分配大Boss,打完收工回岛国继续php,能理解我这里的php吗?不是啥啥啥语言框架。
  • 我是龙俊:叶问打鬼子 :+1: :+1: :+1:
  • 米奇小林:10个码农9个逗
    LuLiangDev:@米奇小林 还有一个陈独秀。
  • 安久哲丶_1ab2:目前看到过最通俗易懂的rxjava系列文章!很赞。
  • 小钙:植入广告,差评!
  • bingoCode:我没有 request 同时 上游 有 130 个数据 也没报错啊 这是嘛情况
    木溪bo:表示129已经MissingBackpressureException了
  • CCY0122:看的好爽
  • 夜封雪:这是整个demo:
    Flowable.create(new FlowableOnSubscribe<Integer>() {
    @Override
    public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception {
    for (int i = 0; i < 1000; i++) {
    e.onNext(i);
    Log.e(TAG, "emit " + i);
    }
    }
    }, BackpressureStrategy.ERROR)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Subscriber<Integer>() {
    @Override
    public void onSubscribe(Subscription s) {

    }

    @Override
    public void onNext(Integer integer) {
    Log.e(TAG, "onNext: " + integer);
    }

    @Override
    public void onError(Throwable t) {

    }

    @Override
    public void onComplete() {
    Log.e(TAG, "onComplete");
    }
    });
    Shirley0145:因为你没有在onError()里打印出错误信息吧
  • 夜封雪:for (int i = 0; i < 129; i++) {
    e.onNext(i);
    Log.e(TAG, "emit " + i);
    }
    这里打印了第129个条数据,但是没有报错
    09-01 17:57:52.080 580-601/? E/ChapterSeven: emit 0
    09-01 17:57:52.081 580-601/? E/ChapterSeven: emit 1
    09-01 17:57:52.081 580-601/? E/ChapterSeven: emit 2
    ………………………………………………………………
    09-01 17:57:52.086 580-601/? E/ChapterSeven: emit 126
    09-01 17:57:52.086 580-601/? E/ChapterSeven: emit 127
    09-01 17:57:52.086 580-601/? E/ChapterSeven: emit 128
    09-01 17:57:52.131 580-602/? I/Adreno: QUALCOMM build : 74df444, I409c65498b
    Build Date : 06/22/16
    OpenGL ES Shader Compiler Version: XE031.08.00.02
    Local Branch : N16
    Remote Branch :
    Remote Branch :
    Reconstruct Branch :
    09-01 17:57:52.137 580-602/? I/OpenGLRenderer: Initialized EGL, version 1.4
    这个是怎么回事呀 ?
  • 844f3e2d4e14:我这有个问题!就是如果上下游都在主线程,下路即使没有request()上游同样可以把所有的事件发送出来,不会抛出异常!这什么情况??
    流浪小猫:我发现也是这样,大神粗来解释下
    tinyvampirepudg:@X_sky_B :joy:
    X_sky_B:确定观察者onError中是 这么写的吗 Log.w(TAG, "onError: ", t);
    ----------引用9楼的回复
  • d57748eef2b8:上下游在不同线程的时候并没有去打印throwable,在没有写request的情况下,上游写了个无限发送事件,上游应该向水缸中放了远超128的事件,下游由于没哟调用request所以一个也没取,因此水缸应该会溢出导致oom,但是查看后发现内存并没有溢出!!不明白是为什么
  • 太阳晒得我丶好干瘪:作者一句“叶问我要打十个”把我给炸出来了
  • 0861bad599ad:最后一个demo中我把128换成了129,报了MissingBackpressureException这个我能理解;但是@Override
    public void onSubscribe(Subscription s) {
    Log.d(TAG, "onSubscribe");
    s.request(Long.MAX_VALUE);
    mSubscription = s;
    }
    这样,添加了 s.request(Long.MAX_VALUE);这句话请求了几次事件,但是还是发生了MissingBackpressureException,这是为什么?
  • g小志:大赞 你的文章也必须点赞 不管用不用 就冲这篇文章 我就知道 博主写什么都不会差 棒棒哒
  • boredream:128阈值这个问题我撸demo测了下比较迷。
    request数量小于99时,无论request多少,只要emit发送超过128就进策略报错;
    而request数量大于99时,emit发送数量 - request数量 > 128时 才报错。
  • xyyou123:是不是一定要写request???
  • 1cf651476639:为啥我把 s.request(Long.MAX_VALUE); 删掉后还是没有抛异常呢? 这让我百思不得其解,昨天还以为是RxJava的版本问题,今天换了教程一种的版本还是不会报错
    1cf651476639:原来是在onError里面,不好意思看错了!嘿嘿
  • 胡转路旁旧时光:RX小白一个,楼主讲的深入浅出,值得好好学习.特别是水管和叶问打鬼子的形容.
  • 27efec53a72d:很赞,非常喜欢作者
  • 176bed5b3190:叶问鬼子打的还行:smile:
  • 10TB的真爱:楼主大大,我有一个问题,为什么我这样写下游接受不到数据啊?flowable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(subscriber);
    10TB的真爱:而且我已经设置了request()的值。
  • 61ae09a9984b:我想请教下,128这个缓冲最大值能修改得更大的吗?还有我可以清空里面缓冲区的事件吗?
  • KingJA:一股脑的瞎他妈发事件
  • e2f11fd62dd4::blush: star了
  • FynnJason:楼主,文章里的图片丢失了。。。
  • 蓝色理想pro:图片好多张都挂了。。。:smile:
  • 644e4d4c83ab:图片又挂了:joy:
  • Holyn:“你说128就128吗, 又不是唯品会周年庆..”,为这个广告点赞,难道我们是唯品会同事??:smile:
    Season_zlc: @浩子_Android 哈哈,并不是。。😂
  • 53861bc74f13:图片都看不到,但还是很棒,顺着你的文章读过来,没有图片也差不多脑补出来了
  • boboyuwu:我真想给你个么么哒
  • void_Zhao:图片看不了啊,就我有这问题吗:joy:
  • 苏村的南哥:图片都看不了,只有我一个人有这个问题吗 我换了Firefox,Chrome,Safarif 图片都看不了。 前面六章节的目有问题。。。
    Season_zlc:处理了
  • Xanthuim:第一张图挂了。
  • 9b2d9cd59272:楼主,图片不能看了,能重新发送一下么
  • a33855e6741f:Disposable有CompositeDisposable进行管理,那Subscriber呢?
  • 63f61095b3c4:很喜欢作者RxJava这一系列文章,特别是叶问打鬼子的比喻,哈哈,赞!
  • b0c0a8a8937a:刚才 我在异步的情况下 我在上游发送事件的方法中发送了500个事件 没调用request()方法,出现了那个异常. 我又重新改回上面你说的哪几种情况 这次全部都会出现那个异常了 和你的文章描述的一样. 不理解为什么我第一次时候就不会出现这个异常 - -! 可能我文字表达不清楚我的问题 反正就是问题解决了 感谢作者的文章 超级感谢
    蝴蝶没了_4f1d:要在onError那里输出 Throwable 日志
  • b0c0a8a8937a:在我的Studio上不管是同步还是异步上游的事件都会被存储在队列中,没调用一次request()就会拉取一个事件进行处理
  • b0c0a8a8937a:请教下作者 为什么在我的Studio上跑 同步请求,不调用request()方法,并不会报异常啊
    1cf651476639:@皇马船长 原来是这样,了解了
    皇马船长:确定观察者onError中是 这么写的吗 Log.w(TAG, "onError: ", t);

  • Bugme:作者真的好文采,看了还想继续看下去,好像写故事一样,啊哈哈哈~!点赞
    Season_zlc:@Bugme :v:
  • d8184ca3c970:好文章啊,太感谢楼主了,不去清华大学当老师可惜了。
    sing_song:可以当老师
    Season_zlc: @d8184ca3c970 哈哈,你说得太夸张了😂
  • TedYt:每篇必须点赞:+1:
  • cde86b15d3d6:不错哦
  • d2ab3859e18f:RxJava2.0,入门看这个就够了
  • 4e9764e206c3:太感谢博主深入浅出的讲解了,有个问题希望您解答下:
    如果有两组分别为: observerable1和observerable2,其中observerable1已经经过一系列map...处理后.subscribe(subscriber1),那么observerable1还能和observerable2合并并且继续的.subscribe(subscriber2)吗?也就是说一个observerable被subscribe两次是否可行?是否是常规的做法?(具体例子:有一个EditText当有输入的时候我要将数据存到一个字符串(subscriber1),当输入的长度是5的时要提交(subscriber2),提交动作又和(button)的点击是一样的所以应该合并,那么EditText的输入就有个两个订阅。)
    4e9764e206c3:不好意思没有表述清楚。长度为5和点击按钮都会提交,所以他们都要去响应提交的订阅事件,但是输入的时候又要去响应缓存的事件,所以输入操作涉及到了多次订阅。
    另外,我想问一下,对于事件除了zip和merge这些合并之外,有没有分支的操作方式?
    Season_zlc:@为森 另外你的这个需求我没看懂啥意思, 到底是长度为5就自动提交, 还是点击了按钮才提交?
    Season_zlc:@为森 Observable可以被多次subscribe, 每次subscribe事件就消费掉了, 下次subscribe又是一次全新的事件.
  • 641a85228368:老司机,能否提供RXjava2.0的下载地址啊,网上找的全是1.0的。
    妙法莲花1234:@Season_zlc https://github.com/ReactiveX/RxJava/releases
    Season_zlc: @Dragon_NA 第一篇开头就有啊

本文标题:给初学者的RxJava2.0教程(七)

本文链接:https://www.haomeiwen.com/subject/nymtvttx.html