美文网首页
再忆RxJava---背压策略

再忆RxJava---背压策略

作者: 勇敢地追 | 来源:发表于2019-08-25 15:43 被阅读0次

    1 背压存在的背景

    被观察者 发送事件速度太快,而观察者 来不及接收所有事件,从而导致观察者无法及时响应或者处理所有发送过来事件的问题,最终导致缓存区溢出、事件丢失 & OOM

    2 背压策略的原理

    • 2.1 未雨绸缪(事情在还没有发生之前做一定的处理),一共有两种
      (1)控制被观察者发送事件的速度---反馈控制
      (2)控制观察者接收事件的速度---响应式拉取
    • 2.2 亡羊补牢(事情已经发生,如何补救)---对多余的数据进行有选择的抛弃,或者保留,或者报错

    3 背压具体情况讨论

    3.1 同步策略

            Flowable.create(new FlowableOnSubscribe<String>() {
                @Override
                public void subscribe(FlowableEmitter<String> emitter) {
                    Log.e("emitter", "发送1");
                    emitter.onNext("111");
                    Log.e("emitter", "发送2");
                    emitter.onNext("222");
                    Log.e("emitter", "发送3");
                    emitter.onNext("333");
                    emitter.onComplete();
                }
            }, BackpressureStrategy.ERROR).subscribe(new Subscriber<String>() {
                @Override
                public void onSubscribe(Subscription s) {
                    s.request(3);
                }
    
                @Override
                public void onNext(String s) {
                    Log.e("emitter", "接受" + s);
                }
    
                @Override
                public void onError(Throwable t) {
                    Log.e("onError", t.getLocalizedMessage());
                }
    
                @Override
                public void onComplete() {
    
                }
            });
    

    其实对于同步而言,讨论背压毫无意义。emitter.onNext然后直接就是Subscriber.onNext,然后再下一个emitter.onNext。因为这是同步的,不存在缓存队列。就如例子而言,s.request(n),如果n小于3,会根据Error策略,直接走OnError方法(具体请看代码)。如果n大于3,是5,直接onComplete,不管有没有发送满5个
    总的来说,同步并没有采用什么背压,如果非要说的话,那也是亡羊补牢式的

    3.2 异步

    先来看几段代码
    FlowableCreate---NoOverflowBaseAsyncEmitter的onNext方法

            public final void onNext(T t) {
                。。。。。。
                if (get() != 0) {//get最初是128,也就是buffer-size,这是子线程
                    downstream.onNext(t);
                    BackpressureHelper.produced(this, 1);
                } else {
                    onOverflow();
                }
            }
    

    也就是发送的时候,超过128个数据,就走onError,没有就往下一个onNext走
    (可以先看一下ObserveOnSubscriber的onSubscribe函数,里面有queue的构造,以及sourceMode其实并没有赋值)
    再来看BaseObserveOnSubscriber的onNext方法

            @Override
            public final void onNext(T t) {
                if (done) {
                    return;
                }
                if (sourceMode == ASYNC) {
                    trySchedule();
                    return;
                }
                if (!queue.offer(t)) {//这个queue就是FlowableObserveOn的构造函数中的prefetch大小的一个队列。这里默认是128
                    //也就是最上面get为什么是128的原因
                    //此时还没到Handler,所以还是子线程
                    upstream.cancel();
                    error = new MissingBackpressureException("Queue is full?!");
                    done = true;
                }
                trySchedule();
            }
    

    接下来就是trySchedule,接下来就是调用自身run方法,走runAsync(ObserveOnSubscriber),然后无限循环poll直到没有数据,然后onNext
    runAsync主要注意produced和requested.get()

    • requested.get()就是自己定义的s.request,如果不定义就永远没有onNext
    • produced就是已经onNext出去的数据个数

    总结:子线程生成一个128长度的缓存队列。被观察者发送数据,如果队列没满,就走onNext,满了就报错。主线程s.request来控制要取多少数据,不设置就永远没有onNext打印出来(有点类似于线程池)

    3.2.1 控制被观察者发送事件的速度---反馈控制

    由于观察者和被观察者处于不同线程,所以被观察者无法通过requested()知道观察者自身接收事件能力
    可以定义一些边界条件emitter.requested()!=0,或者drop,直接不管

    3.2.2 控制观察者接收事件的速度---响应式拉取

    比如发送100,s.request(50),那么也就是说还会有50个在缓存队列里面。存在问题就是可能会超出缓存队列,可以用BackpressureStrategy.ERROR来处理等等

    参考文献
    https://www.jianshu.com/p/ceb48ed8719d

    相关文章

      网友评论

          本文标题:再忆RxJava---背压策略

          本文链接:https://www.haomeiwen.com/subject/ucahectx.html