RxJava之背压策略

作者: 103style | 来源:发表于2019-07-24 15:10 被阅读8次

转载请以链接形式标明出处:
本文出自:103style的博客

本文基于 RxJava 2.x 版本


目录

  • RxJava背压策略简介
  • Observable背压导致崩溃的原因
  • Flowable 使用介绍
  • 五种背压策略源码分析
  • 小结

RxJava背压策略简介

官方介绍

Backpressure is when in an Flowable processing pipeline, some asynchronous stages can't process the values fast enough and need a way to tell the upstream producer to slow down.
背压是在Flowable处理事件流中,某些异步阶段无法足够快地处理这些值,并且需要一种方法来告诉上游生产商减速。

所以RxJava的背压策略(Backpressure)是指处理上述上游流速过快现象的一种策略。 类似 Java中的线程池 中的饱和策略RejectedExecutionHandler


Observable背压导致崩溃的原因

我们先使用 Observable看看是什么情况:

Observable
        .create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; ; i++) {
                    emitter.onNext(i);
                }
            }
        })
        .subscribeOn(Schedulers.computation())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println(integer);
            }
        });
image.png

输出:

I/art: Background partial concurrent mark sweep GC freed 7(224B) AllocSpace objects, 0(0B) LOS objects, 27% free, 43MB/59MB, paused 528us total 106.928ms
I/System.out: 0
I/art: Background partial concurrent mark sweep GC freed 8(256B) AllocSpace objects, 0(0B) LOS objects, 20% free, 62MB/78MB, paused 1.065ms total 327.346ms
I/System.out: 1
I/art: Background partial concurrent mark sweep GC freed 8(256B) AllocSpace objects, 0(0B) LOS objects, 16% free, 82MB/98MB, paused 1.345ms total 299.700ms
I/art: Background partial concurrent mark sweep GC freed 8(256B) AllocSpace objects, 0(0B) LOS objects, 13% free, 103MB/119MB, paused 1.609ms total 377.432ms
I/System.out: 2
I/art: Background sticky concurrent mark sweep GC freed 29(800B) AllocSpace objects, 0(0B) LOS objects, 0% free, 120MB/120MB, paused 1.280ms total 105.749ms
I/art: Background partial concurrent mark sweep GC freed 22(640B) AllocSpace objects, 0(0B) LOS objects, 11% free, 126MB/142MB, paused 1.818ms total 679.398ms
I/System.out: 3
I/art: Background partial concurrent mark sweep GC freed 9(288B) AllocSpace objects, 0(0B) LOS objects, 9% free, 148MB/164MB, paused 1.946ms total 555.619ms
I/System.out: 4
I/art: Background sticky concurrent mark sweep GC freed 29(800B) AllocSpace objects, 0(0B) LOS objects, 0% free, 165MB/165MB, paused 1.253ms total 107.036ms
I/art: Background partial concurrent mark sweep GC freed 8(256B) AllocSpace objects, 0(0B) LOS objects, 8% free, 172MB/188MB, paused 2.355ms total 570.029ms
I/art: Background sticky concurrent mark sweep GC freed 28(768B) AllocSpace objects, 0(0B) LOS objects, 0% free, 188MB/188MB, paused 11.474ms total 82.399ms
I/System.out: 5
I/art: Background partial concurrent mark sweep GC freed 23(672B) AllocSpace objects, 0(0B) LOS objects, 7% free, 197MB/213MB, paused 2.355ms total 631.635ms
I/art: Background partial concurrent mark sweep GC freed 22(640B) AllocSpace objects, 0(0B) LOS objects, 6% free, 226MB/242MB, paused 3.091ms total 908.581ms
I/System.out: 6
I/art: Background sticky concurrent mark sweep GC freed 29(800B) AllocSpace objects, 0(0B) LOS objects, 0% free, 242MB/242MB, paused 1.672ms total 102.676ms
I/art: Waiting for a blocking GC Alloc
I/art: Clamp target GC heap from 267MB to 256MB
I/art: Alloc sticky concurrent mark sweep GC freed 0(0B) AllocSpace objects, 0(0B) LOS objects, 1% free, 252MB/256MB, paused 1.581ms total 10.336ms
I/art: WaitForGcToComplete blocked for 12.447ms for cause Alloc
I/art: Starting a blocking GC Alloc
I/art: Starting a blocking GC Alloc
I/System.out: 9
I/art: Waiting for a blocking GC Alloc
I/art: Waiting for a blocking GC Alloc
I/art: Clamp target GC heap from 268MB to 256MB
I/art: Alloc concurrent mark sweep GC freed 0(0B) AllocSpace objects, 0(0B) LOS objects, 1% free, 252MB/256MB, paused 1.574ms total 818.037ms
I/art: WaitForGcToComplete blocked for 2.539s for cause Alloc
I/art: Starting a blocking GC Alloc
I/art: Waiting for a blocking GC Alloc
W/art: Throwing OutOfMemoryError "Failed to allocate a 12 byte allocation with 4109520 free bytes and 3MB until OOM; failed due to fragmentation (required continguous free 4096 bytes for a new buffer where largest contiguous free 0 bytes)"

我们可以从上图中看到,内存在逐步上升,在一定的时间后,到达256M之后会触发GC,最后抛出OutOfMemoryError。因为上游的事件发送太快而下游的消费者消耗的比较慢。

那导致内存暴增的源头是什么呢 ?


我们对上面的代码做一点点修改,注释了observeOn(AndroidSchedulers.mainThread()),会发现内存显示很正常,不会存在上述问题。

    Observable
            .create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    for (int i = 0; ; i++) {
                        emitter.onNext(i);
                    }
                }
            })
            .subscribeOn(Schedulers.computation())
//          .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(integer);
                }
            });
注释了observeOn

所以内存暴增的源头就在 observeOn(AndroidSchedulers.mainThread()).

我们来看看 observeOn的源码,通过 RxJava subscribeOn和observeOn源码介绍,我们知道在 ObservableObserveOn.ObserveOnObserveronSubscribe中构建了一个容量默认为128SpscLinkedArrayQueue

queue = new SpscLinkedArrayQueue<T>(bufferSize);

上游每发送一个事件都会通过queue.offer(t)保存到SpscLinkedArrayQueue中。

public void onNext(T t) {
    if (done) {
        return;
    }
    if (sourceMode != QueueDisposable.ASYNC) {
        queue.offer(t);
    }
    schedule();
}

我们可以写个测试代码来看看,因为生产比消费快的多,相当于一直添加元素,如下:

private void test(){
    SpscLinkedArrayQueue<Integer> queue = new SpscLinkedArrayQueue<>(128);
    for (int i = 0; ; i++) {
        queue.offer(i);
    }
}

运行会发现内存变化和Observable一样迅速暴增。

测试代码内存变化

SpscLinkedArrayQueue的详细介绍后面再说。现在可以大致理解为 一直狂吃,然后最后撑破肚皮,然后裂开


Flowable的用法

我们来看看 Flowable的用法:

Flowable.create(FlowableOnSubscribe<T> source, BackpressureStrategy mode)

BackpressureStrategy 包含五种模式:MISSINGERRORBUFFERDROPLATEST

下面对这五种BackpressureStrategy 分别介绍其用法以及 发送事件速度 > 接收事件速度 时的处理方式:

  • BackpressureStrategy.MISSING
    处理方式:抛出异常MissingBackpressureException,并提示 缓存区满了
    代码示例:

    Flowable
            .create(new FlowableOnSubscribe<Object>() {
                @Override
                public void subscribe(FlowableEmitter<Object> emitter) throws Exception {
                    for (int i = 0; i < Flowable.bufferSize() * 2; i++) {
                        emitter.onNext(i);
                    }
                    emitter.onComplete();
                }
            }, BackpressureStrategy.MISSING)
            .subscribeOn(Schedulers.computation())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<Object>() {
                @Override
                public void onSubscribe(Subscription s) {
                    s.request(Integer.MAX_VALUE);
                }
    
                @Override
                public void onNext(Object o) {
                    System.out.println("onNext: " + o);
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
    
                @Override
                public void onError(Throwable t) {
                    t.printStackTrace();
                }
    
                @Override
                public void onComplete() {
                    System.out.println("onComplete");
                }
            });
    

    输出结果:

    System.out: onNext: 0
    System.err: io.reactivex.exceptions.MissingBackpressureException: Queue is full?!
    
  • BackpressureStrategy.ERROR
    处理方式:直接抛出异常MissingBackpressureException
    修改上述代码的 BackpressureStrategy.MISSINGBackpressureStrategy.ERROR

    Flowable
            .create(new FlowableOnSubscribe<Object>() {
                ...
            }, BackpressureStrategy.ERROR)
            ...
    

    输出结果:

    System.out: onNext: 0
    System.err: io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
    
  • BackpressureStrategy.BUFFER
    处理方式:类似Observable一样扩充缓存区大小
    修改上述代码的 BackpressureStrategy.MISSINGBackpressureStrategy.BUFFER

    Flowable
            .create(new FlowableOnSubscribe<Object>() {
                ...
            }, BackpressureStrategy.BUFFER)
            ...
    

    输出结果:

    System.out: onNext: 0
    System.out: onNext: 1
    System.out: onNext: 2
    System.out: onNext: 3
    System.out: onNext: 4
    System.out: onNext: 5
    System.out: onNext: 6
    ...
    System.out: onNext: 247
    System.out: onNext: 248
    System.out: onNext: 249
    System.out: onNext: 250
    System.out: onNext: 251
    System.out: onNext: 252
    System.out: onNext: 253
    System.out: onNext: 254
    System.out: onNext: 255
    System.out: onComplete
    
  • BackpressureStrategy.DROP
    处理方式:丢弃缓存区满后处理缓冲区数据期间发送过来的事件
    示例代码:

      Flowable
            .create(new FlowableOnSubscribe<Object>() {
                @Override
                public void subscribe(FlowableEmitter<Object> emitter) throws Exception {
                    for (int i = 0; ; i++) {
                        emitter.onNext(i);
                    }
                }
            }, BackpressureStrategy.DROP)
            .subscribeOn(Schedulers.computation())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<Object>() {
                @Override
                public void onSubscribe(Subscription s) {
                    s.request(Integer.MAX_VALUE);
                }
    
                @Override
                public void onNext(Object o) {
                    System.out.println("onNext: " + o);
                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    
                }
    
                @Override
                public void onError(Throwable t) {
                    t.printStackTrace();
                }
    
                @Override
                public void onComplete() {
                    System.out.println("onComplete");
                }
            });
    

    输出结果:

    System.out: onNext: 0
    System.out: onNext: 1
    System.out: onNext: 2
    System.out: onNext: 3
    ...
    System.out: onNext: 124
    System.out: onNext: 125
    System.out: onNext: 126
    System.out: onNext: 127
    System.out: onNext: 1070801
    System.out: onNext: 1070802
    System.out: onNext: 1070803
    System.out: onNext: 1070804
    System.out: onNext: 1070805
    ...
    
  • BackpressureStrategy.LATEST
    处理方式:丢弃缓存区满后处理缓冲区数据期间发送过来的非最后一个事件。下面示例代码输出了 129 个事件,下面的源码分析会介绍。
    示例代码:

    Flowable
            .create(new FlowableOnSubscribe<Object>() {
                @Override
                public void subscribe(FlowableEmitter<Object> emitter) throws Exception {
                    for (int i = 0; i < Flowable.bufferSize() * 2; i++) {
                        emitter.onNext(i);
                    }
                    emitter.onComplete();
                }
            }, BackpressureStrategy.LATEST)
            .subscribeOn(Schedulers.computation())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<Object>() {
                @Override
                public void onSubscribe(Subscription s) {
                    s.request(Integer.MAX_VALUE);
                }
    
                @Override
                public void onNext(Object o) {
                    System.out.println("onNext: " + o);
                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
    
                }
    
                @Override
                public void onError(Throwable t) {
                    t.printStackTrace();
                }
    
                @Override
                public void onComplete() {
                    System.out.println("onComplete");
                }
            });
    

    输出结果:

    System.out: onNext: 0
    System.out: onNext: 1
    System.out: onNext: 2
    System.out: onNext: 3
    ...
    System.out: onNext: 124
    System.out: onNext: 125
    System.out: onNext: 126
    System.out: onNext: 127
    System.out: onNext: 255
    System.out: onComplete
    

五种背压策略源码分析

通知之前 RxJava之create操作符源码解析 的介绍。我们知道Flowable.create(new FlowableOnSubscribe<Object>(){...}, BackpressureStrategy.LATEST) 返回的是一个FlowableCreate对象。

分别对不同的背压策略创建了不同的Emitter.

public final class FlowableCreate<T> extends Flowable<T> {
    //...
    public FlowableCreate(FlowableOnSubscribe<T> source, BackpressureStrategy backpressure) {
        this.source = source;
        this.backpressure = backpressure;
    }
    public void subscribeActual(Subscriber<? super T> t) {
        BaseEmitter<T> emitter;
        switch (backpressure) {
            case MISSING: {
                emitter = new MissingEmitter<T>(t);
                break;
            }
            case ERROR: {
                emitter = new ErrorAsyncEmitter<T>(t);
                break;
            }
            case DROP: {
                emitter = new DropAsyncEmitter<T>(t);
                break;
            }
            case LATEST: {
                emitter = new LatestAsyncEmitter<T>(t);
                break;
            }
            default: {
                emitter = new BufferAsyncEmitter<T>(t, bufferSize());
                break;
            }
        }
        t.onSubscribe(emitter);
        try {
            source.subscribe(emitter);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            emitter.onError(ex);
        }
    }
    //...
}
  • MissingEmitter
    static final class MissingEmitter<T> extends BaseEmitter<T> {
        MissingEmitter(Subscriber<? super T> downstream) {
            super(downstream);
        }
        @Override
        public void onNext(T t) {
            if (isCancelled()) {
                return;
            }
            if (t != null) {
                downstream.onNext(t);
            } else {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            for (;;) {
                long r = get();
                if (r == 0L || compareAndSet(r, r - 1)) {
                    return;
                }
            }
        }
    
    }
    
    通过上面的代码我们可以看到MissingEmitter基本上没做什么操作,所以BackpressureStrategy.MISSING示例中的代码实际上是调用了ObserveOn中返回对象的FlowableObserveOn.ObserveOnSubscriberonNext
    public final void onNext(T t) {
        if (done) {
            return;
        }
        if (sourceMode == ASYNC) {
            trySchedule();
            return;
        }
        if (!queue.offer(t)) {
            upstream.cancel();
            error = new MissingBackpressureException("Queue is full?!");
            done = true;
        }
        trySchedule();
    }
    
    上面代码中我们看到了背压情况下出现的报错信息,出现的前提是queue.offer(t)返回false。这里的queueonSubscribe中构造的容量为Flowable.bufferSize()SpscArrayQueue.
    public void onSubscribe(Subscription s) {
        if (SubscriptionHelper.validate(this.upstream, s)) {
            this.upstream = s;
            //...
            queue = new SpscArrayQueue<T>(prefetch);
            downstream.onSubscribe(this);
            s.request(prefetch);
        }
    }
    
    SpscArrayQueueoffer方法,我们可以看到当SpscArrayQueue数据 “满了” 的时候即返回false.
    public boolean offer(E e) {
        //...
        final int mask = this.mask;
        final long index = producerIndex.get();
        final int offset = calcElementOffset(index, mask);
        if (index >= producerLookAhead) {
            int step = lookAheadStep;
            if (null == lvElement(calcElementOffset(index + step, mask))) { // LoadLoad
                producerLookAhead = index + step;
            } else if (null != lvElement(offset)) {
                return false;
            }
        }
        soElement(offset, e); // StoreStore
        soProducerIndex(index + 1); // ordered store -> atomic and ordered for size()
        return true;
    }
    
    所以BackpressureStrategy.MISSING在缓冲区满了之后再发射事件即会抛出 message"Queue is full?!"MissingBackpressureException.

  • ErrorAsyncEmitter
    abstract static class BaseEmitter<T>
    extends AtomicLong
    implements FlowableEmitter<T>, Subscription {
        //...
        @Override
        public final void request(long n) {
            if (SubscriptionHelper.validate(n)) {
                BackpressureHelper.add(this, n);
                onRequested();
            }
        }
        //...
    }
    abstract static class NoOverflowBaseAsyncEmitter<T> extends BaseEmitter<T> {
        NoOverflowBaseAsyncEmitter(Subscriber<? super T> downstream) {
            super(downstream);
        }
        @Override
        public final void onNext(T t) {
            //...
            if (get() != 0) {
                downstream.onNext(t);
                BackpressureHelper.produced(this, 1);
            } else {
                onOverflow();
            }
        }
        abstract void onOverflow();
    }
    static final class ErrorAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {
        ErrorAsyncEmitter(Subscriber<? super T> downstream) {
            super(downstream);
        }
        @Override
        void onOverflow() {
            onError(new MissingBackpressureException("create: could not emit value due to lack of requests"));
        }
    }
    
    通过在onSubscribe中调用request(Flowable.bufferSize())设置当前AtomicLongvalue值。然后 onNext 中每传递一个事件就通过BackpressureHelper.produced(this, 1)value1. 当发送了Flowable.bufferSize()个事件,get() != 0不成立,调用onOverflow()方法抛出 MissingBackpressureException异常。

  • DropAsyncEmitter
    static final class DropAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {
    
        private static final long serialVersionUID = 8360058422307496563L;
    
        DropAsyncEmitter(Subscriber<? super T> downstream) {
            super(downstream);
        }
    
        @Override
        void onOverflow() {
            // nothing to do
        }
    
    }
    
    ErrorAsyncEmitter 类似,只不过当发送超过超过Flowable.bufferSize()的事件时,啥也没做,即实现丢弃的功能。

  • LatestAsyncEmitter

    static final class LatestAsyncEmitter<T> extends BaseEmitter<T> {
        final AtomicReference<T> queue;
        //...
        LatestAsyncEmitter(Subscriber<? super T> downstream) {
            super(downstream);
            this.queue = new AtomicReference<T>();
            //...
        }
    
        @Override
        public void onNext(T t) {
            //...
            queue.set(t);
            drain();
        }
        //...
    }
    

    我们可以看到每次调用onNext都会更新传过来的值到queue中,所以queue中保存了最新的值。

    • 接着来看drain方法:
      上面我们知道在onSubscribe中调用request()设置当前AtomicLongvalue值。

      void drain() {
          //...
          for (;;) {
              long r = get();
              long e = 0L;
              while (e != r) {
                  //...
                  boolean d = done;
                  T o = q.getAndSet(null);
                  boolean empty = o == null;
                  if (d && empty) {
                      //...
                      return;
                  }
                  if (empty) {
                      break;
                  }
                  a.onNext(o);
                  e++;
              }
              if (e == r) {
                  //...
                  boolean d = done;
                  boolean empty = q.get() == null;
                  if (d && empty) {
                      //...
                      return;
                  }
              }
              if (e != 0) {
                  BackpressureHelper.produced(this, e);
              }
              //...
          }
      }
      
      • for (;;)里面通过get()获取当前AtomicLong的值。然后通过a.onNext(o);传递给下游,然后e++,在通过BackpressureHelper.produced(this, e);减掉AtomicLong的值。
      • 当调用Flowable.bufferSize()onNext之后,get()返回的值为0,所以e != r不成立,在e == r的判断中,在从onNext过来时emptyfalse,所以直接跳出 for循环。
      • 通过上面我们知道当传递超过Flowable.bufferSize()的事件过来,只会更新queue中的值为最新的事件,其他啥也没做。那最后一个事件时怎么发出的呢?,继续往下看。
    • 最后一个事件时怎么发出的?
      我们在上面的drain()中调用a.onNext(o)最终是调用observeOn构建对象中的ObserveOnSubscriberonNext,即调用runAsync();

      public final void onNext(T t) {
          //...
          trySchedule();
      }
      final void trySchedule() {
          //...
          worker.schedule(this);
      }
      @Override
      public final void run() {
          if (outputFused) {
              runBackfused();
          } else if (sourceMode == SYNC) {
              runSync();
          } else {
              runAsync();
          }
      }
      
    • runAsync()

      void runAsync() {
          //...
          for (;;) {
              long r = requested.get();
              while (e != r) {
                  boolean d = done;
                  T v;
                  try {
                      v = q.poll();
                  } catch (Throwable ex) {
                      //...
                      return;
                  }
                  //...
                  a.onNext(v);
                  e++;
                  if (e == limit) {
                      if (r != Long.MAX_VALUE) {
                          r = requested.addAndGet(-e);
                      }
                      upstream.request(e);
                      e = 0L;
                  }
              }
              //...
          }
      }
      
      • 我们可以看到在for循环中通过q.poll()去获取缓存队列SpscArrayQueue中的事件。然后通过a.onNext(v);去执行我们示例代码中的耗时操作。
      • 然后当e == limit是,回去调用LatestAsyncEmitterrequest(e),而limit是在构造函数中初始化的,值为缓存队列容量Flowable.bufferSize()3/4所以当队列中的事件消耗了容量的3/4之后,会再去请求上游发送事件。
        BaseObserveOnSubscriber(
                Worker worker,
                boolean delayError,
                int prefetch) {
            //...
            this.limit = prefetch - (prefetch >> 2);
        }
        
    • request方法:

      @Override
      public final void request(long n) {
          if (SubscriptionHelper.validate(n)) {
              System.out.println("n = " + n);
              BackpressureHelper.add(this, n);
              onRequested();
          }
      }
      @Override
      void onRequested() {
          drain();
      }
      

      即继续执行drain()方法,因为queue中还保存最新的值事件。所以会通过a.onNext(o)发送这个最新的事件。

  • 如果在执行完等待队列3/4的事件之后,上游的事件还没发送结束,下游即会再次缓存上游发送过来的容量的3/4个事件。
    示例代码:

    Flowable.create(new FlowableOnSubscribe<Object>() {
        @Override
        public void subscribe(FlowableEmitter<Object> emitter) throws Exception {
            for (int i = 0; i < Flowable.bufferSize() * 2; i++) {
                emitter.onNext(i);
            }
            Thread.sleep(10 * Flowable.bufferSize());
            for (int i = 0; i < Flowable.bufferSize() * 2; i++) {
                emitter.onNext(Flowable.bufferSize() * 2 + i);
            }
            emitter.onComplete();
        }
    }, BackpressureStrategy.LATEST)
            .subscribeOn(Schedulers.computation())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<Object>() {
                @Override
                public void onSubscribe(Subscription s) {
                    s.request(Integer.MAX_VALUE);
                }
                @Override
                public void onNext(Object o) {
                    System.out.println("onNext: " + o);
                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                @Override
                public void onError(Throwable t) {
                    t.printStackTrace();
                }
                @Override
                public void onComplete() {
                    System.out.println("onComplete");
                }
            });
    

    输出结果:

    System.out: onNext: 0
    System.out: onNext: 1
    System.out: onNext: 2
    System.out: onNext: 3
    //....
    System.out: onNext: 125
    System.out: onNext: 126
    System.out: onNext: 127
    System.out: onNext: 255
    System.out: onNext: 256
    System.out: onNext: 257
    //...
    System.out: onNext: 349
    System.out: onNext: 350
    System.out: onNext: 511
    System.out: onComplete
    

    可以看到输出结果中255-350即为容量1283/4个元素。


  • BufferAsyncEmitter
    • 我们可以看到内部有一个SpscLinkedArrayQueue的缓存队列,每次调用onNext都会先保存到缓存队列,然后通过drain()方法一直去遍历当前的缓存队列。然后和LatestAsyncEmitter一样,当下游的缓存队列满了之后,即不再放下游发送事件,只是把上游的事件保存在SpscLinkedArrayQueue中,等待下游处理了容量的3/4的事件之后,上游在发送容量的3/4的事件过去。知道上游的事件消耗完,或者异常退出。即和Observable的效果类似,只不过缓存队列一个在上游一个在下游。
    static final class BufferAsyncEmitter<T> extends BaseEmitter<T> {
        final SpscLinkedArrayQueue<T> queue;
        //...
    
        BufferAsyncEmitter(Subscriber<? super T> actual, int capacityHint) {
            super(actual);
            this.queue = new SpscLinkedArrayQueue<T>(capacityHint);
            this.wip = new AtomicInteger();
        }
        @Override
        public void onNext(T t) {
            //...
            queue.offer(t);
            drain();
        }
        void drain() {
            //...
            final SpscLinkedArrayQueue<T> q = queue;
            for (;;) {
                long r = get();
                long e = 0L;
                while (e != r) {
                    //...
                    boolean d = done;
                    T o = q.poll();
                    boolean empty = o == null;
                    if (d && empty) {
                        //...
                        return;
                    }
                    if (empty) {
                        break;
                    }
                    a.onNext(o);
                    e++;
                }
                if (e == r) {
                    //...
                    boolean d = done;
                    boolean empty = q.isEmpty();
                    if (d && empty) {
                        //...
                        return;
                    }
                }
                if (e != 0) {
                    BackpressureHelper.produced(this, e);
                }
                missed = wip.addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }
    }
    

小结

  • 我们知道了Observable出现背压的原因是上游发送的超多事件缓存在observeOn返回对象的缓存队列中,事件的增加导致了内存的增加。
  • 我们介绍了Flowable的使用和五种背压策略的具体实现。
    • MISSING:超过observeOn配置的bufferSize则抛出异常MissingBackpressureException并提示Queue is full?!
    • ERROR:超过observeOn配置的bufferSize则直接抛出异常MissingBackpressureException
    • BUFFER:超过observeOn配置的bufferSize则缓存到上游的缓冲队列,等待下游消耗了容量的3/4的事件之后,在继续发送上游缓存的事件给下游。
    • DROP:超过observeOn配置的bufferSize则丢弃。
    • LATEST:超过observeOn配置的bufferSize则丢弃并保存最新的值到queue,如果在下游消耗了容量的3/4的事件之后,上游还有事件在发送,则继续往下游发送事件,当没有事件的时候,再发送queue中保存的最新的那个事件。

参考文章


以上

相关文章

  • RxJava之背压策略

    转载请以链接形式标明出处:本文出自:103style的博客 本文基于 RxJava 2.x 版本 目录 RxJav...

  • RxJava2背压管理策略实例解析

    1、前言: 在使用RxJava 的背压时,遇到了很多困扰。本文主要是针对RxJava背压策略的5种模式下,观察者和...

  • 5章 RxJava背压策略

    本篇文章已授权微信公众号 YYGeeker 独家发布转载请标明出处 CSDN学院课程地址RxJava2从入门到精通...

  • RxJava背压

    订阅分为:同步订阅 异步订阅 同步订阅Rxjava1与Rxjava2中 同步订阅没有用到缓冲区,只要上游事件数量不...

  • RxJava背压

    RxJava 当我们在对RxJava从1.0版本升级到2.0版本的时候,我们发现RxJava2.0增加了一个被观察...

  • RxJava2系列之背压策略(一)

    前言 通过前7篇RxJava的文章,我们对RxJava1.x版本的内容进行了学习与实践。目前RxJava已经更新到...

  • Android RxJava :图文详解 背压策略

    前言 Rxjava,由于其基于事件流的链式调用、逻辑简洁 & 使用简单的特点,深受各大 Android开发者的欢迎...

  • 再忆RxJava---背压策略

    1 背压存在的背景 被观察者 发送事件速度太快,而观察者 来不及接收所有事件,从而导致观察者无法及时响应或者处理所...

  • Rxjava(五)之背压模式

    前言 我们在上面的篇幅讲解了Rxjava的操作符的使用,那么这篇文章我们将讲解Rxjava的背压模式。在Rxjav...

  • Rxjava的背压

    一、什么是背压? 被观察者与观察者通过subscribe()订阅并使用异步后,一般上游发射一条数据下游就会...

网友评论

    本文标题:RxJava之背压策略

    本文链接:https://www.haomeiwen.com/subject/xqmulctx.html