RxJava2.0 - 文章八

作者: 世道无情 | 来源:发表于2018-04-29 20:20 被阅读6次

前言

上一节中,我们一次性发送128个事件没有任何问题,但是一旦超过128个立马抛出MissingBackpressureException异常,提示你上游发送事件太多,下游处理不过来,如何解决?这一节就来解决下。

1. 解决方法


思路一:

发送128个事件没问题是因为Flowable内部有一个大小为128的水缸,超过则会溢出,既然水缸这么小,就换一个大点的水缸试下,代码如下:

    /**
     * BUFFER:大的水缸,
     *          让上游无限的发送事件,下游一个也不处理,结果容易造成OOM
     */
    public static void demo1(){
        // 创建一个上游:Flowable
        Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; ; i++) {          // 无限for循环
                    Log.e("TAG" , "emit " + i) ;
                    emitter.onNext(i);
                }
            }
        } , BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io())  // 让上游的for循环在子线程中执行
            .observeOn(AndroidSchedulers.mainThread())   // for循环执行完后切回到主线程
            .subscribe(new Subscriber<Integer>() {
                @Override
                public void onSubscribe(Subscription s) {
                    Log.e("TAG" , "subscribe") ;
                    mSubscription = s ;
                }

                @Override
                public void onNext(Integer integer) {
                    Log.e("TAG" , "next" + integer) ;
                }

                @Override
                public void onError(Throwable t) {
                    Log.e("TAG" , "error") ;
                }

                @Override
                public void onComplete() {
                    Log.e("TAG" , "complete") ;
                }
            }) ;
    }

换大的水缸,在上游使用无限for循环发射数据,下游一个也不处理,导致OOM;


图片.png
思路二:

之前解决Observable如何解决上游发送事件太快,有两种方法:从数量和速度两个方面解决,同样的,Flowable也有对应的两种方法:BackpressureStrategy.DRAP和BackpressureStrategy.LATEST。

  • Drap:直接把存不下的事件丢掉;
  • Latest:只保留最新的数据;
Drop代码如下:
    /**
     * Drop:丢弃存储不下的事件
     */
    public static void demo2(){
        Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; i < 10000; i++) {
                    emitter.onNext(i);
                }
            }
        } , BackpressureStrategy.DROP).subscribeOn(Schedulers.io())    // 让上游的for循环在子线程中执行
                .observeOn(AndroidSchedulers.mainThread())    // 切换到主线程执行下边的操作
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.e("TAG" , "subscribe -> ") ;
                        mSubscription = s ;
                        s.request(128);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.e("TAG" , "next -> " + integer) ;
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.e("TAG" , "error") ;
                    }

                    @Override
                    public void onComplete() {
                        Log.e("TAG" , "complete") ;
                    }
                }) ;
    }

效果如如下:


图片.png
  • 结果就是:第一次调用 s.request(128); 方法时候,下游收到 0-127共128个事件,第二次调用 s.request(128)就不确定,上游在一直发送事件,内存正常,drop丢弃了存不下事件;
latest代码如下:

让上游无限发送事件,把Subscription保存起来,方便在外边调用 s.request(128)方法

    /**
     * latest:只保留最新数据
     */
    public static void demo3(){
        Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; i < 10000; i++) {
                    emitter.onNext(i);
                }
            }
        } , BackpressureStrategy.LATEST).subscribeOn(Schedulers.io())   // 让上游的for循环在子线程中执行
                .observeOn(AndroidSchedulers.mainThread())  // 切换到主线程中执行下边操作
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.e("TAG" , "subscribe") ;
                        mSubscription = s ;
                        s.request(128);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.e("TAG" , "next -> " + integer) ;
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.e("TAG" , "error") ;
                    }

                    @Override
                    public void onComplete() {
                        Log.e("TAG" , "complete") ;
                    }
                }) ;
    }
interval操作符:
/**
     *  interval操作符
     */
    public static void demo4(){
        Flowable.interval(1 , TimeUnit.MICROSECONDS)
                .onBackpressureDrop()    // 添加背压策略
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Long>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.e("TAG" , "subscribe") ;
                        mSubscription = s ;
                        s.request(Long.MAX_VALUE);
                    }

                    @Override
                    public void onNext(Long aLong) {
                        Log.e("TAG" , "next -> " + aLong) ;
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.e("TAG" , "error") ;
                    }

                    @Override
                    public void onComplete() {
                        Log.e("TAG" , "complete") ;
                    }
                }) ;
    }

八篇文章所有代码已上传至github:
https://github.com/shuai999/RxJava2Demo.git

相关文章

  • RxJava2.0 - 文章八

    前言 上一节中,我们一次性发送128个事件没有任何问题,但是一旦超过128个立马抛出MissingBackpres...

  • Rxjava系列(六) RxJava2.0操作符详解

    Rxjava2.0概述 通过前面的文章介绍,读者对RxJava2.0应该有了初步的认识。RxJava2.0相对1....

  • RXjave总结

    文章 给初学者的RxJava2.0教程(一)给初学者的RxJava2.0教程(二)

  • RxJava2.0 - 文章一

    前言 自己在学习RxJava2.0时,参考了大神的博客,然后在这里做一个笔记为了方便自己以后复习和查看,同时也给需...

  • RxJava2.0 - 文章二

    前言 上篇文章讲解了 RxJava2.0的最基本使用,在本节中主要看下RxJava的线程控制。 1. 概述 ...

  • RxJava2.0 - 文章七

    前言 上一节我们学习了使用Observable解决上、下游发射事件速度不平衡的问题,之所以学习 Observabl...

  • RxJava

    教程 给初学者的RxJava2.0教程(一) 给初学者的RxJava2.0教程(二) 给初学者的RxJava2.0...

  • RxJava2.0源码初探

    RxJava2.0源码初探 RxJava2.0的源码相对于1.0发生了很大的变化, 命名方式也发生了很大变化, 下...

  • Rxjava2.0 发生订阅关系 的源码解析

    由于要做一场关于rxjava2.0 的内部分享,本人便怀着期待的心情去了解了下rxjava2.0,关于rxjava...

  • RxJava2.0的使用

    这里的讲解比较简单,易懂 给初学者的RxJava2.0教程(一) :基本工作原理给初学者的RxJava2.0教程(...

网友评论

    本文标题:RxJava2.0 - 文章八

    本文链接:https://www.haomeiwen.com/subject/lvwglftx.html