- RxJava 2.x与RxJava 1.x的差异
-
Nulls
这是一个很大的变化,熟悉 RxJava 1.x 的童鞋一定都知道,1.x 是允许我们在发射事件的时候传入 null 值的,但现在我们的 2.x 不支持了,不信你试试? 大大的 NullPointerException 教你做人。这意味着 Observable<Void> 不再发射任何值,而是正常结束或者抛出空指针。 -
Flowable
在 RxJava 1.x 中关于介绍 backpressure 部分有一个小小的遗憾,那就是没有用一个单独的类,而是使用 Observable 。而在 2.x 中 Observable 不支持背压了,将用一个全新的 Flowable 来支持背压。
或许对于背压,有些小伙伴们还不是特别理解,这里简单说一下。大概就是指在异步场景中,被观察者发送事件的速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略。感兴趣的小伙伴可以模拟这种情况,在差距太大的时候,我们的内存会猛增,直到OOM。而我们的 Flowable 一定意义上可以解决这样的问题,但其实并不能完全解决,这个后面可能会提到。 -
Single/Completable/Maybe
其实这三者都差不多,Single 顾名思义,只能发送一个事件,和 Observable接受可变参数完全不同。而 Completable 侧重于观察结果,而 Maybe 是上面两种的结合体。也就是说,当你只想要某个事件的结果(true or false)的时候,你可以使用这种观察者模式。 -
线程调度相关
这一块基本没什么改动,但细心的小伙伴一定会发现,RxJava 2.x 中已经没有了 Schedulers.immediate() 这个线程环境,还有 Schedulers.test()。 -
Function相关
熟悉 1.x 的小伙伴一定都知道,我们在1.x 中是有 Func1,Func2.....FuncN的,但 2.x 中将它们移除,而采用 Function 替换了 Func1,采用 BiFunction 替换了 Func 2..N。并且,它们都增加了 throws Exception,也就是说,妈妈再也不用担心我们做某些操作还需要 try-catch 了。 -
其他操作符相关
如 Func1...N 的变化,现在同样用 Consumer 和 BiConsumer 对 Action1 和 Action2 进行了替换。后面的 Action 都被替换了,只保留了 ActionN。
- RxJava 2.x 拥有了新的特性,其依赖于4个基础接口,它们分别是
- Publisher
- Subscriber
- Subscription
- Processor
其中最核心的莫过于 Publisher
和 Subscriber
。Publisher
可以发出一系列的事件,而 Subscriber
负责和处理这些事件。
其中用的比较多的自然是 Publisher
的 Flowable
,它支持背压。关于背压给个简洁的定义就是:
背压是指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略。
简而言之,背压是流速控制的一种策略。有兴趣的可以看一下官方对于背压的讲解。
可以明显地发现,RxJava 2.x 最大的改动就是对于 backpressure
的处理,为此将原来的 Observable
拆分成了新的 Observable
和 Flowable
,同时其他相关部分也同时进行了拆分,但令人庆幸的是,是它,是它,还是它,还是我们最熟悉和最喜欢的 RxJava。
RxJava API文档
Flowable示例:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
while (true){
e.onNext(1);
}
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Thread.sleep(2000);
System.out.println(integer);
}
});
处理Backpressure的策略仅仅是处理Subscriber接收事件的方式,并不影响Flowable发送事件的方法。即使采用了处理Backpressure的策略,Flowable原来以什么样的速度产生事件,现在还是什么样的速度不会变化,主要处理的是Subscriber接收事件的方式。
Backpressure的策略:
ERROR
这种方式会在产生Backpressure问题的时候直接抛出一个异常,这个异常就是著名的MissingBackpressureException。
我们先以代码示例介绍一下Flowable相比与Observable新的东西。
Flowable<Integer> flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emit 1");
emitter.onNext(1);
Log.d(TAG, "emit 2");
emitter.onNext(2);
Log.d(TAG, "emit 3");
emitter.onNext(3);
Log.d(TAG, "emit complete");
emitter.onComplete();
}
}, BackpressureStrategy.ERROR); //增加了一个参数
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.d(TAG, "onSubscribe");
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
};
flowable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(subscriber);
01-20 16:18:58.898 17829-17851/? D/MainActivity: emit 1
01-20 16:18:58.898 17829-17851/? D/MainActivity: emit 2
01-20 16:18:58.898 17829-17851/? D/MainActivity: emit 3
01-20 16:18:58.898 17829-17851/? D/MainActivity: emit complete
01-20 16:18:58.908 17829-17829/? D/MainActivity: onNext: 1
01-20 16:18:58.908 17829-17829/? D/MainActivity: onNext: 2
01-20 16:18:58.908 17829-17829/? D/MainActivity: onNext: 3
01-20 16:18:58.908 17829-17829/? D/MainActivity: onComplete
上述代码创建了一个Flowable(被观察者)和一个Subscriber(观察者),可以看到程序如我们预期的一样输出结果了。不同的是 onSubscribe(Subscription s)中传给我们的不再是Disposable了, 而是Subscription。然而Subscription也可以用于切断观察者与被观察者之间的联系,调用Subscription.cancel()方法便可。 不同的地方在于Subscription增加了一个void request(long n)方法, 这个方法有什么用呢, 在上面的代码中也有这么一句代码:
s.request(Long.MAX_VALUE);
这个方法就是用来向生产者申请可以消费的事件数量。这样我们便可以根据本身的消费能力进行消费事件。
当调用了request()方法后,生产者便发送对应数量的事件供消费者消费。
这是因为Flowable在设计的时候采用了一种新的思路也就是响应式拉取的方式,你要求多少,我便传给你多少。
注意:如果不显示调用request就表示消费能力为0。
虽然并不限制向request()方法中传入任意数字,但是如果消费者并没有这么多的消费能力,依旧会造成资源浪费,最后产生OOM。形象点就是不能打肿脸充胖子。
而ERROR策略就避免了这种情况的出现(讲了这么多终于出现了)。
在异步调用时,RxJava中有个缓存池,用来缓存消费者处理不了暂时缓存下来的数据,缓存池的默认大小为128,即只能缓存128个事件。无论request()中传入的数字比128大或小,缓存池中在刚开始都会存入128个事件。当然如果本身并没有这么多事件需要发送,则不会存128个事件。
在ERROR策略下,如果缓存池溢出,就会立刻抛出MissingBackpressureException异常。
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 129; i++) {
Log.d(TAG, "emit " + i);
emitter.onNext(i);
}
}
}, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
mSubscription = s;
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
}
});
01-20 16:58:48.993 32434-32474/? D/MainActivity: emit 125
01-20 16:58:48.993 32434-32474/? D/MainActivity: emit 126
01-20 16:58:48.993 32434-32474/? D/MainActivity: emit 127
01-20 16:58:48.993 32434-32474/? D/MainActivity: emit 128
01-20 16:58:49.003 32434-32434/? W/MainActivity: onError:
io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
我们让Flowable发送129个事件,而Subscriber一个也不处理,就产生了异常。
因此,ERROR即保证在异步操作中,事件累积不能超过128,超过即出现异常。消费者不能再接收事件了,但生产者并不会停止。
BUFFER
所谓BUFFER就是把RxJava中默认的只能存128个事件的缓存池换成一个大的缓存池,支持存很多很多的数据。
这样,消费者通过request()即使传入一个很大的数字,生产者也会生产事件,并将处理不了的事件缓存。
但是这种方式任然比较消耗内存,除非是我们比较了解消费者的消费能力,能够把握具体情况,不会产生OOM。
总之BUFFER要慎用。
DROP
看名字就可以了解其作用:当消费者处理不了事件,就丢弃。
消费者通过request()传入其需求n,然后生产者把n个事件传递给消费者供其消费。其他消费不掉的事件就丢掉。
下面例子具体介绍:
image.png点击“开始”按钮,建立连接。生产者开始生产事件,刚开始消费者通过request()只要了50个事件消费。然后每次点击“消费”按钮,再次消费50个事件。
mFlowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; ; i++) {
emitter.onNext(i);
}
}
}, BackpressureStrategy.DROP);
mSubscriber = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
mSubscription = s;
s.request(50);
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
}
};
}
public void start(View view){
mFlowable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(mSubscriber);
}
public void consume(View view){
mSubscription.request(50);
}
01-20 17:25:44.331 3327-3327/com.lvr.rxjavalearning D/MainActivity: onNext: 0
..........................................................
01-20 17:25:44.331 3327-3327/com.lvr.rxjavalearning D/MainActivity: onNext: 49
01-20 17:25:47.891 3327-3327/com.lvr.rxjavalearning D/MainActivity: onNext: 50
..........................................................
01-20 17:25:47.891 3327-3327/com.lvr.rxjavalearning D/MainActivity: onNext: 99
01-20 17:25:50.241 3327-3327/com.lvr.rxjavalearning D/MainActivity: onNext: 100
..........................................................
01-20 17:25:50.241 3327-3327/com.lvr.rxjavalearning D/MainActivity: onNext: 127
01-20 17:25:50.241 3327-3327/com.lvr.rxjavalearning D/MainActivity: onNext: 17749078
..........................................................
01-20 17:25:50.241 3327-3327/com.lvr.rxjavalearning D/MainActivity: onNext: 17749099
可以看出,生产者一次性传入128个事件进入缓存池。点击“开始”按钮,消费了50个。然后第一次点击“消费”按钮,又消费了50个,第二次点击“消费”按钮,再次消费50个。然而此时原来的128个缓存只剩下28个了,所以先消费掉28个,然后剩下22个是后来传入的(其实后来的是在消费了96个后传入,并一次性在缓存池中又传入了96个,具体可以看源码,这里不解释了)。
LATEST
LATEST与DROP功能基本一致。
消费者通过request()传入其需求n,然后生产者把n个事件传递给消费者供其消费。其他消费不掉的事件就丢掉。
唯一的区别就是LATEST总能使消费者能够接收到生产者产生的最后一个事件。
还是以上述例子展示,唯一的区别就是Flowable不再无限发事件,只发送1000000个。
结果如下:
01-20 17:50:30.459 25334-25334/com.lvr.rxjavalearning D/MainActivity: onNext: 0
..........................................................
01-20 17:50:30.459 25334-25334/com.lvr.rxjavalearning D/MainActivity: onNext: 49
01-20 17:50:31.569 25334-25334/com.lvr.rxjavalearning D/MainActivity: onNext: 50
..........................................................
01-20 17:50:32.459 25334-25334/com.lvr.rxjavalearning D/MainActivity: onNext: 100
01-20 17:50:32.459 25334-25334/com.lvr.rxjavalearning D/MainActivity: onNext: 101
..........................................................
01-20 17:50:32.459 25334-25334/com.lvr.rxjavalearning D/MainActivity: onNext: 127
01-20 17:50:32.459 25334-25334/com.lvr.rxjavalearning D/MainActivity: onNext: 999999
唯一的区别就在最后一行。这就是LATEST与DROP的区别。
上述例子Flowable对象的获取都是通过create()获取的,自然可以通过BackpressureStrategy.LATEST之类的方式指定处理背压的策略。如果Flowable对象不是自己创建的,可以采用onBackpressureBuffer()、onBackpressureDrop()、onBackpressureLatest()的方式指定。
Flowable.just(1).onBackpressureBuffer()
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
}
});
以上就是关于Flowable及backpressure的内容。
作者:Ruheng
链接:https://www.jianshu.com/p/1f4867ce3c01
來源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。</pre>
网友评论