1 背压存在的背景
被观察者 发送事件速度太快,而观察者 来不及接收所有事件,从而导致观察者无法及时响应或者处理所有发送过来事件的问题,最终导致缓存区溢出、事件丢失 & OOM
2 背压策略的原理
- 2.1 未雨绸缪(事情在还没有发生之前做一定的处理),一共有两种
(1)控制被观察者发送事件的速度---反馈控制
(2)控制观察者接收事件的速度---响应式拉取 - 2.2 亡羊补牢(事情已经发生,如何补救)---对多余的数据进行有选择的抛弃,或者保留,或者报错
3 背压具体情况讨论
3.1 同步策略
Flowable.create(new FlowableOnSubscribe<String>() {
@Override
public void subscribe(FlowableEmitter<String> emitter) {
Log.e("emitter", "发送1");
emitter.onNext("111");
Log.e("emitter", "发送2");
emitter.onNext("222");
Log.e("emitter", "发送3");
emitter.onNext("333");
emitter.onComplete();
}
}, BackpressureStrategy.ERROR).subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
s.request(3);
}
@Override
public void onNext(String s) {
Log.e("emitter", "接受" + s);
}
@Override
public void onError(Throwable t) {
Log.e("onError", t.getLocalizedMessage());
}
@Override
public void onComplete() {
}
});
其实对于同步而言,讨论背压毫无意义。emitter.onNext然后直接就是Subscriber.onNext,然后再下一个emitter.onNext。因为这是同步的,不存在缓存队列。就如例子而言,s.request(n),如果n小于3,会根据Error策略,直接走OnError方法(具体请看代码)。如果n大于3,是5,直接onComplete,不管有没有发送满5个
总的来说,同步并没有采用什么背压,如果非要说的话,那也是亡羊补牢式的
3.2 异步
先来看几段代码
FlowableCreate---NoOverflowBaseAsyncEmitter的onNext方法
public final void onNext(T t) {
。。。。。。
if (get() != 0) {//get最初是128,也就是buffer-size,这是子线程
downstream.onNext(t);
BackpressureHelper.produced(this, 1);
} else {
onOverflow();
}
}
也就是发送的时候,超过128个数据,就走onError,没有就往下一个onNext走
(可以先看一下ObserveOnSubscriber的onSubscribe函数,里面有queue的构造,以及sourceMode其实并没有赋值)
再来看BaseObserveOnSubscriber的onNext方法
@Override
public final void onNext(T t) {
if (done) {
return;
}
if (sourceMode == ASYNC) {
trySchedule();
return;
}
if (!queue.offer(t)) {//这个queue就是FlowableObserveOn的构造函数中的prefetch大小的一个队列。这里默认是128
//也就是最上面get为什么是128的原因
//此时还没到Handler,所以还是子线程
upstream.cancel();
error = new MissingBackpressureException("Queue is full?!");
done = true;
}
trySchedule();
}
接下来就是trySchedule,接下来就是调用自身run方法,走runAsync(ObserveOnSubscriber),然后无限循环poll直到没有数据,然后onNext
runAsync主要注意produced和requested.get()
- requested.get()就是自己定义的s.request,如果不定义就永远没有onNext
- produced就是已经onNext出去的数据个数
总结:子线程生成一个128长度的缓存队列。被观察者发送数据,如果队列没满,就走onNext,满了就报错。主线程s.request来控制要取多少数据,不设置就永远没有onNext打印出来(有点类似于线程池)
3.2.1 控制被观察者发送事件的速度---反馈控制
由于观察者和被观察者处于不同线程,所以被观察者无法通过requested()知道观察者自身接收事件能力
可以定义一些边界条件emitter.requested()!=0,或者drop,直接不管
3.2.2 控制观察者接收事件的速度---响应式拉取
比如发送100,s.request(50),那么也就是说还会有50个在缓存队列里面。存在问题就是可能会超出缓存队列,可以用BackpressureStrategy.ERROR来处理等等
网友评论