美文网首页
Rxjava2与1的差异,新东西介绍<2>

Rxjava2与1的差异,新东西介绍<2>

作者: 天空在微笑 | 来源:发表于2018-01-28 15:30 被阅读5次
    1. RxJava 2.x与RxJava 1.x的差异
    • Nulls
      这是一个很大的变化,熟悉 RxJava 1.x 的童鞋一定都知道,1.x 是允许我们在发射事件的时候传入 null 值的,但现在我们的 2.x 不支持了,不信你试试? 大大的 NullPointerException 教你做人。这意味着 Observable<Void> 不再发射任何值,而是正常结束或者抛出空指针。

    • Flowable
      在 RxJava 1.x 中关于介绍 backpressure 部分有一个小小的遗憾,那就是没有用一个单独的类,而是使用 Observable 。而在 2.x 中 Observable 不支持背压了,将用一个全新的 Flowable 来支持背压。
      或许对于背压,有些小伙伴们还不是特别理解,这里简单说一下。大概就是指在异步场景中,被观察者发送事件的速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略。感兴趣的小伙伴可以模拟这种情况,在差距太大的时候,我们的内存会猛增,直到OOM。而我们的 Flowable 一定意义上可以解决这样的问题,但其实并不能完全解决,这个后面可能会提到。

    • Single/Completable/Maybe
      其实这三者都差不多,Single 顾名思义,只能发送一个事件,和 Observable接受可变参数完全不同。而 Completable 侧重于观察结果,而 Maybe 是上面两种的结合体。也就是说,当你只想要某个事件的结果(true or false)的时候,你可以使用这种观察者模式。

    • 线程调度相关
      这一块基本没什么改动,但细心的小伙伴一定会发现,RxJava 2.x 中已经没有了 Schedulers.immediate() 这个线程环境,还有 Schedulers.test()。

    • Function相关
      熟悉 1.x 的小伙伴一定都知道,我们在1.x 中是有 Func1,Func2.....FuncN的,但 2.x 中将它们移除,而采用 Function 替换了 Func1,采用 BiFunction 替换了 Func 2..N。并且,它们都增加了 throws Exception,也就是说,妈妈再也不用担心我们做某些操作还需要 try-catch 了。

    • 其他操作符相关
      如 Func1...N 的变化,现在同样用 Consumer 和 BiConsumer 对 Action1 和 Action2 进行了替换。后面的 Action 都被替换了,只保留了 ActionN。

    1. RxJava 2.x 拥有了新的特性,其依赖于4个基础接口,它们分别是
    • Publisher
    • Subscriber
    • Subscription
    • Processor

    其中最核心的莫过于 PublisherSubscriberPublisher 可以发出一系列的事件,而 Subscriber 负责和处理这些事件。

    其中用的比较多的自然是 PublisherFlowable,它支持背压。关于背压给个简洁的定义就是:

    背压是指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略。

    简而言之,背压是流速控制的一种策略。有兴趣的可以看一下官方对于背压的讲解

    可以明显地发现,RxJava 2.x 最大的改动就是对于 backpressure 的处理,为此将原来的 Observable 拆分成了新的 ObservableFlowable,同时其他相关部分也同时进行了拆分,但令人庆幸的是,是它,是它,还是它,还是我们最熟悉和最喜欢的 RxJava。
    RxJava API文档
    Flowable示例:

    Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                    while (true){
                        e.onNext(1);
                    }
                }
            })
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Thread.sleep(2000);
                    System.out.println(integer);
                }
            });
    

    处理Backpressure的策略仅仅是处理Subscriber接收事件的方式,并不影响Flowable发送事件的方法。即使采用了处理Backpressure的策略,Flowable原来以什么样的速度产生事件,现在还是什么样的速度不会变化,主要处理的是Subscriber接收事件的方式。
    Backpressure的策略:

    ERROR

    这种方式会在产生Backpressure问题的时候直接抛出一个异常,这个异常就是著名的MissingBackpressureException。

    我们先以代码示例介绍一下Flowable相比与Observable新的东西。

    Flowable<Integer> flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
                @Override
                public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                    Log.d(TAG, "emit 1");
                    emitter.onNext(1);
                    Log.d(TAG, "emit 2");
                    emitter.onNext(2);
                    Log.d(TAG, "emit 3");
                    emitter.onNext(3);
                    Log.d(TAG, "emit complete");
                    emitter.onComplete();
                }
            }, BackpressureStrategy.ERROR); //增加了一个参数
    
            Subscriber<Integer> subscriber = new Subscriber<Integer>() {
                @Override
                public void onSubscribe(Subscription s) {
                    Log.d(TAG, "onSubscribe");
                    s.request(Long.MAX_VALUE);  
                }
                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG, "onNext: " + integer);
    
                }
                @Override
                public void onError(Throwable t) {
                    Log.w(TAG, "onError: ", t);
                }
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            };
            flowable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(subscriber);
    
    

    01-20 16:18:58.898 17829-17851/? D/MainActivity: emit 1

    01-20 16:18:58.898 17829-17851/? D/MainActivity: emit 2

    01-20 16:18:58.898 17829-17851/? D/MainActivity: emit 3

    01-20 16:18:58.898 17829-17851/? D/MainActivity: emit complete

    01-20 16:18:58.908 17829-17829/? D/MainActivity: onNext: 1

    01-20 16:18:58.908 17829-17829/? D/MainActivity: onNext: 2

    01-20 16:18:58.908 17829-17829/? D/MainActivity: onNext: 3

    01-20 16:18:58.908 17829-17829/? D/MainActivity: onComplete

    上述代码创建了一个Flowable(被观察者)和一个Subscriber(观察者),可以看到程序如我们预期的一样输出结果了。不同的是 onSubscribe(Subscription s)中传给我们的不再是Disposable了, 而是Subscription。然而Subscription也可以用于切断观察者与被观察者之间的联系,调用Subscription.cancel()方法便可。 不同的地方在于Subscription增加了一个void request(long n)方法, 这个方法有什么用呢, 在上面的代码中也有这么一句代码:

      s.request(Long.MAX_VALUE);  
    
    

    这个方法就是用来向生产者申请可以消费的事件数量。这样我们便可以根据本身的消费能力进行消费事件。

    当调用了request()方法后,生产者便发送对应数量的事件供消费者消费。

    这是因为Flowable在设计的时候采用了一种新的思路也就是响应式拉取的方式,你要求多少,我便传给你多少。

    注意:如果不显示调用request就表示消费能力为0。

    虽然并不限制向request()方法中传入任意数字,但是如果消费者并没有这么多的消费能力,依旧会造成资源浪费,最后产生OOM。形象点就是不能打肿脸充胖子。

    而ERROR策略就避免了这种情况的出现(讲了这么多终于出现了)。

    在异步调用时,RxJava中有个缓存池,用来缓存消费者处理不了暂时缓存下来的数据,缓存池的默认大小为128,即只能缓存128个事件。无论request()中传入的数字比128大或小,缓存池中在刚开始都会存入128个事件。当然如果本身并没有这么多事件需要发送,则不会存128个事件。

    在ERROR策略下,如果缓存池溢出,就会立刻抛出MissingBackpressureException异常。

    Flowable.create(new FlowableOnSubscribe<Integer>() {
                @Override
                public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                    for (int i = 0; i < 129; i++) {
                        Log.d(TAG, "emit " + i);
                        emitter.onNext(i);
                    }
                }
            }, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Subscriber<Integer>() {
                        @Override
                        public void onSubscribe(Subscription s) {
                            mSubscription = s;
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            Log.d(TAG, "onNext: " + integer);
                        }
                        @Override
                        public void onError(Throwable t) {
                            Log.w(TAG, "onError: ", t);
                        }
    
                        @Override
                        public void onComplete() {
                        }
                    });
    
    

    01-20 16:58:48.993 32434-32474/? D/MainActivity: emit 125

    01-20 16:58:48.993 32434-32474/? D/MainActivity: emit 126

    01-20 16:58:48.993 32434-32474/? D/MainActivity: emit 127

    01-20 16:58:48.993 32434-32474/? D/MainActivity: emit 128

    01-20 16:58:49.003 32434-32434/? W/MainActivity: onError:

    io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests

    我们让Flowable发送129个事件,而Subscriber一个也不处理,就产生了异常。

    因此,ERROR即保证在异步操作中,事件累积不能超过128,超过即出现异常。消费者不能再接收事件了,但生产者并不会停止。

    BUFFER

    所谓BUFFER就是把RxJava中默认的只能存128个事件的缓存池换成一个大的缓存池,支持存很多很多的数据。

    这样,消费者通过request()即使传入一个很大的数字,生产者也会生产事件,并将处理不了的事件缓存。

    但是这种方式任然比较消耗内存,除非是我们比较了解消费者的消费能力,能够把握具体情况,不会产生OOM。

    总之BUFFER要慎用。

    DROP

    看名字就可以了解其作用:当消费者处理不了事件,就丢弃。

    消费者通过request()传入其需求n,然后生产者把n个事件传递给消费者供其消费。其他消费不掉的事件就丢掉。

    下面例子具体介绍:

    image.png

    点击“开始”按钮,建立连接。生产者开始生产事件,刚开始消费者通过request()只要了50个事件消费。然后每次点击“消费”按钮,再次消费50个事件。

    mFlowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
                @Override
                public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                    for (int i = 0; ; i++) {
    
                        emitter.onNext(i);
                    }
                }
            }, BackpressureStrategy.DROP);
            mSubscriber = new Subscriber<Integer>() {
                @Override
                public void onSubscribe(Subscription s) {
                    mSubscription = s;
                    s.request(50);
                }
    
                @Override
                public void onNext(Integer integer) {
                    Log.d(TAG, "onNext: " + integer);
                }
    
                @Override
                public void onError(Throwable t) {
                    Log.w(TAG, "onError: ", t);
                }
    
                @Override
                public void onComplete() {
    
                }
            };
        }
        public void start(View view){
            mFlowable.subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(mSubscriber);
    
        }
        public void consume(View view){
            mSubscription.request(50);
    
        }
    
    

    01-20 17:25:44.331 3327-3327/com.lvr.rxjavalearning D/MainActivity: onNext: 0

    ..........................................................

    01-20 17:25:44.331 3327-3327/com.lvr.rxjavalearning D/MainActivity: onNext: 49

    01-20 17:25:47.891 3327-3327/com.lvr.rxjavalearning D/MainActivity: onNext: 50

    ..........................................................

    01-20 17:25:47.891 3327-3327/com.lvr.rxjavalearning D/MainActivity: onNext: 99

    01-20 17:25:50.241 3327-3327/com.lvr.rxjavalearning D/MainActivity: onNext: 100

    ..........................................................

    01-20 17:25:50.241 3327-3327/com.lvr.rxjavalearning D/MainActivity: onNext: 127

    01-20 17:25:50.241 3327-3327/com.lvr.rxjavalearning D/MainActivity: onNext: 17749078

    ..........................................................

    01-20 17:25:50.241 3327-3327/com.lvr.rxjavalearning D/MainActivity: onNext: 17749099

    可以看出,生产者一次性传入128个事件进入缓存池。点击“开始”按钮,消费了50个。然后第一次点击“消费”按钮,又消费了50个,第二次点击“消费”按钮,再次消费50个。然而此时原来的128个缓存只剩下28个了,所以先消费掉28个,然后剩下22个是后来传入的(其实后来的是在消费了96个后传入,并一次性在缓存池中又传入了96个,具体可以看源码,这里不解释了)。

    LATEST

    LATEST与DROP功能基本一致。

    消费者通过request()传入其需求n,然后生产者把n个事件传递给消费者供其消费。其他消费不掉的事件就丢掉。

    唯一的区别就是LATEST总能使消费者能够接收到生产者产生的最后一个事件。

    还是以上述例子展示,唯一的区别就是Flowable不再无限发事件,只发送1000000个。

    结果如下:

    01-20 17:50:30.459 25334-25334/com.lvr.rxjavalearning D/MainActivity: onNext: 0

    ..........................................................

    01-20 17:50:30.459 25334-25334/com.lvr.rxjavalearning D/MainActivity: onNext: 49

    01-20 17:50:31.569 25334-25334/com.lvr.rxjavalearning D/MainActivity: onNext: 50

    ..........................................................

    01-20 17:50:32.459 25334-25334/com.lvr.rxjavalearning D/MainActivity: onNext: 100

    01-20 17:50:32.459 25334-25334/com.lvr.rxjavalearning D/MainActivity: onNext: 101

    ..........................................................

    01-20 17:50:32.459 25334-25334/com.lvr.rxjavalearning D/MainActivity: onNext: 127

    01-20 17:50:32.459 25334-25334/com.lvr.rxjavalearning D/MainActivity: onNext: 999999

    唯一的区别就在最后一行。这就是LATEST与DROP的区别。

    上述例子Flowable对象的获取都是通过create()获取的,自然可以通过BackpressureStrategy.LATEST之类的方式指定处理背压的策略。如果Flowable对象不是自己创建的,可以采用onBackpressureBuffer()、onBackpressureDrop()、onBackpressureLatest()的方式指定。

     Flowable.just(1).onBackpressureBuffer()
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribeOn(Schedulers.io())
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
    
                        }
                    });
    
    

    以上就是关于Flowable及backpressure的内容。

    作者:Ruheng
    链接:https://www.jianshu.com/p/1f4867ce3c01
    來源:简书
    著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。</pre>

    相关文章

      网友评论

          本文标题:Rxjava2与1的差异,新东西介绍<2>

          本文链接:https://www.haomeiwen.com/subject/kuuoaxtx.html