美文网首页
再忆RxJava---背压策略

再忆RxJava---背压策略

作者: 勇敢地追 | 来源:发表于2019-08-25 15:43 被阅读0次

1 背压存在的背景

被观察者 发送事件速度太快,而观察者 来不及接收所有事件,从而导致观察者无法及时响应或者处理所有发送过来事件的问题,最终导致缓存区溢出、事件丢失 & OOM

2 背压策略的原理

  • 2.1 未雨绸缪(事情在还没有发生之前做一定的处理),一共有两种
    (1)控制被观察者发送事件的速度---反馈控制
    (2)控制观察者接收事件的速度---响应式拉取
  • 2.2 亡羊补牢(事情已经发生,如何补救)---对多余的数据进行有选择的抛弃,或者保留,或者报错

3 背压具体情况讨论

3.1 同步策略

        Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> emitter) {
                Log.e("emitter", "发送1");
                emitter.onNext("111");
                Log.e("emitter", "发送2");
                emitter.onNext("222");
                Log.e("emitter", "发送3");
                emitter.onNext("333");
                emitter.onComplete();
            }
        }, BackpressureStrategy.ERROR).subscribe(new Subscriber<String>() {
            @Override
            public void onSubscribe(Subscription s) {
                s.request(3);
            }

            @Override
            public void onNext(String s) {
                Log.e("emitter", "接受" + s);
            }

            @Override
            public void onError(Throwable t) {
                Log.e("onError", t.getLocalizedMessage());
            }

            @Override
            public void onComplete() {

            }
        });

其实对于同步而言,讨论背压毫无意义。emitter.onNext然后直接就是Subscriber.onNext,然后再下一个emitter.onNext。因为这是同步的,不存在缓存队列。就如例子而言,s.request(n),如果n小于3,会根据Error策略,直接走OnError方法(具体请看代码)。如果n大于3,是5,直接onComplete,不管有没有发送满5个
总的来说,同步并没有采用什么背压,如果非要说的话,那也是亡羊补牢式的

3.2 异步

先来看几段代码
FlowableCreate---NoOverflowBaseAsyncEmitter的onNext方法

        public final void onNext(T t) {
            。。。。。。
            if (get() != 0) {//get最初是128,也就是buffer-size,这是子线程
                downstream.onNext(t);
                BackpressureHelper.produced(this, 1);
            } else {
                onOverflow();
            }
        }

也就是发送的时候,超过128个数据,就走onError,没有就往下一个onNext走
(可以先看一下ObserveOnSubscriber的onSubscribe函数,里面有queue的构造,以及sourceMode其实并没有赋值)
再来看BaseObserveOnSubscriber的onNext方法

        @Override
        public final void onNext(T t) {
            if (done) {
                return;
            }
            if (sourceMode == ASYNC) {
                trySchedule();
                return;
            }
            if (!queue.offer(t)) {//这个queue就是FlowableObserveOn的构造函数中的prefetch大小的一个队列。这里默认是128
                //也就是最上面get为什么是128的原因
                //此时还没到Handler,所以还是子线程
                upstream.cancel();
                error = new MissingBackpressureException("Queue is full?!");
                done = true;
            }
            trySchedule();
        }

接下来就是trySchedule,接下来就是调用自身run方法,走runAsync(ObserveOnSubscriber),然后无限循环poll直到没有数据,然后onNext
runAsync主要注意produced和requested.get()

  • requested.get()就是自己定义的s.request,如果不定义就永远没有onNext
  • produced就是已经onNext出去的数据个数

总结:子线程生成一个128长度的缓存队列。被观察者发送数据,如果队列没满,就走onNext,满了就报错。主线程s.request来控制要取多少数据,不设置就永远没有onNext打印出来(有点类似于线程池)

3.2.1 控制被观察者发送事件的速度---反馈控制

由于观察者和被观察者处于不同线程,所以被观察者无法通过requested()知道观察者自身接收事件能力
可以定义一些边界条件emitter.requested()!=0,或者drop,直接不管

3.2.2 控制观察者接收事件的速度---响应式拉取

比如发送100,s.request(50),那么也就是说还会有50个在缓存队列里面。存在问题就是可能会超出缓存队列,可以用BackpressureStrategy.ERROR来处理等等

参考文献
https://www.jianshu.com/p/ceb48ed8719d

相关文章

  • 再忆RxJava---背压策略

    1 背压存在的背景 被观察者 发送事件速度太快,而观察者 来不及接收所有事件,从而导致观察者无法及时响应或者处理所...

  • 再忆RxJava---线程切换

    RxJava已经陆陆续续看了将近两年多了,觉得很有必要重新认识一下。而且最新github上都出了3.0了。重新看一...

  • RxJava之背压策略

    转载请以链接形式标明出处:本文出自:103style的博客 本文基于 RxJava 2.x 版本 目录 RxJav...

  • RxJava2背压管理策略实例解析

    1、前言: 在使用RxJava 的背压时,遇到了很多困扰。本文主要是针对RxJava背压策略的5种模式下,观察者和...

  • 5章 RxJava背压策略

    本篇文章已授权微信公众号 YYGeeker 独家发布转载请标明出处 CSDN学院课程地址RxJava2从入门到精通...

  • RxJava2.x-Flowable背压策略

    一、Flowable背压策略 BackpressureStrategy.MISSING 只接受一部分,其他的丢弃...

  • Android RxJava :图文详解 背压策略

    前言 Rxjava,由于其基于事件流的链式调用、逻辑简洁 & 使用简单的特点,深受各大 Android开发者的欢迎...

  • 背压

    背压的含义就是指在一个典型的生产者消费者模型下,生产者生产数据的速度超过了消费者消费的速度导致的问题。 RxJS ...

  • 背压

    Back Pressure是流处理系统中,非常经典常见的问题,它是让流系统能对压力变化能够呈现良好抗压性的关键所在...

  • RxJava背压

    订阅分为:同步订阅 异步订阅 同步订阅Rxjava1与Rxjava2中 同步订阅没有用到缓冲区,只要上游事件数量不...

网友评论

      本文标题:再忆RxJava---背压策略

      本文链接:https://www.haomeiwen.com/subject/ucahectx.html