作者:IT魔幻师
博客:www.huyingzi.top
转载请注明出处:https://www.jianshu.com/p/23f74055e999
一、RxJava1与RxJava2 对比
-
RxJava 2x 不再支持 null 值,如果传入一个null会抛出 NullPointerException
-
Observable.just(null)(不支持)
-
RxJava2 所有的函数接口(Function/Action/Consumer)均设计为可抛出Exception,自己去解决编译异常需要转换问题。
-
RxJava1 中Observable不能很好支持背压,在RxJava2 中将Oberservable实现成不支持背压,而新增Flowable 来支持背压
二、背压
事件上游产生的事件高于事件下游消费的事件导致内存不断扩大
rxjava1并没有对这个的解决方案
rxjava2
添加了一个新的被观察者角色操作符Flowable所有的Observable操作都可以用Flowable替换
什么时候用 Observable:
一般处理最大不超过1000条数据,并且几乎不会出现内存溢出
如果式 鼠标事件,频率不超过1000 Hz,基本上不会背压;
什么时候用 Flowable:
处理以某种方式产生超过10K的元素;
文件读取与分析,例如 读取指定行数的请求;网络IO流;
有很多的阻塞和/或 基于拉取的数据源,但是又想得到一个响应式非阻塞接口的。
三、背压策略
1.BackpressureStrategy.ERROR:若上游发送事件速度超出下游处理事件能力,且事件缓存池已满,则抛出异常
//阻塞时队列
2.BackpressureStrategy.BUFFER:若上游发送事件速度超出下游处理能力,则把事件存储起来等待下游处理
3.BackpressureStrategy.DROP:若上游发送事件速度超出下游处理能力,事件缓存池满了后将之后发送的事件丢弃
4.BackpressureStrategy.LATEST:若上有发送时间速度超出下游处理能力,则只存储最新的128个事件
四、Flowable的使用
@Test
public void testFlowable() {
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 1000000; i++) {
emitter.onNext(i);
}
}
}, BackpressureStrategy.ERROR).subscribe(new FlowableSubscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
//使用Flowable需要在此处给其一个最大的事件处理能力
//设置为最大的处理能力
s.request(500);
// s.request(Integer.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
//模拟处理
try {
Thread.currentThread().sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("处理事件:"+integer);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
}
网友评论