美文网首页
RxJava 背压源码解析

RxJava 背压源码解析

作者: 你可记得叫安可 | 来源:发表于2020-06-24 09:52 被阅读0次

    例子

    Flowable.create<Int>({ emitter ->
            println("观察者可接受事件数量 = ${emitter.requested()}")
            var flag = false
            for (i in 0..499) {
                flag = false
                // 若 requested() == 0 则不发送
                while (emitter.requested() == 0L) {
                    if (!flag) {
                        println("不再发送")
                        flag = true
                    }
                }
                println("发送了事件 $i,观察者可接受事件数量 = ${emitter.requested()}")
                emitter.onNext(i)
            }
            emitter.onComplete()
        }, BackpressureStrategy.ERROR)
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.computation())
            .subscribe(object : Subscriber<Int> {
                override fun onComplete() {
                    println("onComplete")
                }
    
                override fun onSubscribe(s: Subscription) {
                    println("onSubscribe")
                    subscribption = s
                }
    
                override fun onNext(t: Int) {
                    println("接收到了事件 $t")
                }
    
                override fun onError(t: Throwable) {
                    println("onError $t")
                }
            })
    
        Thread.sleep(200)
        println("第一次按钮 48")
        subscribption?.request(48)
    
        Thread.sleep(200)
        println("第二次按钮 48")
        subscribption?.request(48)
    
        Thread.sleep(200)
        println("第三次按钮 48")
        subscribption?.request(48)
    

    上面例子中做的事情很简单:
    数据源一共有 500 个数据要发射,当 emitter.requested() == 0 时就不再发射,背压策略是 BackpressureStrategy.ERROR,发射线程是 Schedulers.io(),接受线程切换到 Schedulers.computation()。外部通过 subscription.request(48) 一共 3 次请求 48 个数据,每次请求都有间隔 200ms,以保证每次请求后,上游数据都能发射完成。
    运行结果

    观察者可接受事件数量 = 128
    发送了事件 0,观察者可接受事件数量 = 128
    发送了事件 1,观察者可接受事件数量 = 127
    发送了事件 2,观察者可接受事件数量 = 126
    ...
    发送了事件 125,观察者可接受事件数量 = 3
    发送了事件 126,观察者可接受事件数量 = 2
    发送了事件 127,观察者可接受事件数量 = 1
    不再发送
    // 200ms 后
    第一次按钮 48
    接收到了事件 0
    接收到了事件 1
    ...
    接收到了事件 45
    接收到了事件 46
    接收到了事件 47
    // 200ms 后
    第二次按钮 48
    接收到了事件 48
    接收到了事件 49
    接收到了事件 50
    ...
    接收到了事件 93
    接收到了事件 94
    接收到了事件 95
    发送了事件 128,观察者可接受事件数量 = 96
    发送了事件 129,观察者可接受事件数量 = 95
    发送了事件 130,观察者可接受事件数量 = 94
    ...
    发送了事件 221,观察者可接受事件数量 = 3
    发送了事件 222,观察者可接受事件数量 = 2
    发送了事件 223,观察者可接受事件数量 = 1
    // 200ms 后
    不再发送
    第三次按钮 48
    接收到了事件 96
    接收到了事件 97
    接收到了事件 98
    接收到了事件 99
    ...
    接收到了事件 141
    接收到了事件 142
    接收到了事件 143
    END

    从上面结果我们可以看到现象:

    • 初始时,数据源只知道下游观察者接收数据的能力为 128 个数据,于是一次性发送 128 个数据,直到下游观察者不再能够接收数据。
    • 观察者第一次请求 48 个数据,并依次收到了上游发送的 48 个数据。这个时候按道理缓冲区至少空出了 48 个位置,但是上游并没有再发送 48 个数据去补充
    • 观察者第二次请求 48 个数据,并依次收到了上游发送的 48 个数据。理论上这个时候缓冲区中此时已发射 48+48=96 个数据(96 也是缓冲区大小 128 的 2/3),达到缓冲区大小的 2/3。因此上游马上又发送了 96 个数据以填补缓冲区。再经过几次试验,比如第二次请求超过 48 个数据,当缓冲区中空出位置大于缓冲区大小的 2/3 时,上游会马上发射 2/3 数据来填补缓冲区。这个原理我们后面再说。
    • 观察者第三次请求 48 个数据,并依次收到了上游发送的 48 个数据。

    下面我们来看一下如何通过源码解释上面的现象。了解 RxJava 普通操作符源码的同学都知道,Flowable.create() 实际上创建了一个 FlowableCreate 实例。observeOn(Schedulers.computation()) 实际上创建了一个 FlowableObserveOn 实例,并在其中发射消息时切换了线程。

    FlowableCreate

    有了之前源码分析的经验,我们只用关注 subscribeActual() 方法:

    @Override
    public void subscribeActual(Subscriber<? super T> t) {
        BaseEmitter<T> emitter;
        switch (backpressure) {
        case MISSING: {
            emitter = new MissingEmitter<T>(t);
            break;
        }
        case ERROR: {
            emitter = new ErrorAsyncEmitter<T>(t);
            break;
        }
        case DROP: {
            emitter = new DropAsyncEmitter<T>(t);
            break;
        }
        case LATEST: {
            emitter = new LatestAsyncEmitter<T>(t);
            break;
        }
        default: {
            emitter = new BufferAsyncEmitter<T>(t, bufferSize());
            break;
        }
        }
        t.onSubscribe(emitter);
        try {
            source.subscribe(emitter);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            emitter.onError(ex);
        }
    }
    

    根据创建 Flowable 时所传入的不同策略,创建了不同的 Emitter

    MissingEmitter
    @Override
    public void onNext(T t) {
        if (isCancelled()) {
            return;
        }
        if (t != null) {
            // 将数据发给下游
            downstream.onNext(t);
        } else {
            onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
            return;
        }
        // 检查自己的计数器
        for (;;) {
            long r = get();
            if (r == 0L || compareAndSet(r, r - 1)) {
                return;
            }
        }
    }
    

    如果有数据往下发(开发者主动调用了 onNext 来发射数据),则直接发给下游。并且给自己的计数器 -1,如果计数器本身就为 0 了则不再 -1.Emitter 自己就是一个计数器,它是如何被赋值的呢,我们看看基类 BaseEmitter

    abstract static class BaseEmitter<T>
    extends AtomicLong
    implements FlowableEmitter<T>, Subscription {
        final Subscriber<? super T> downstream;
        BaseEmitter(Subscriber<? super T> downstream) {
            this.downstream = downstream;
        }
    
        //... 省略了不是背压处理相关的函数:error, complete 等,跟普通的 Observable 差不多
        
        // 设置计数器的值
        @Override
        public final void request(long n) {
            if (SubscriptionHelper.validate(n)) {
                BackpressureHelper.add(this, n);
                onRequested();
            }
        }
        void onRequested() {
            // default is no-op
        }
        // 获取计数器的值
        @Override
        public final long requested() {
            return get();
        }
    }
    

    可以看到 Emitter 自己的计数器的值是靠 request(n) 方法来设置的。

    我们之后可以看到,这个 request(n) 方法是其下游 xxxSubscriber.onSubscribe() 方法里来更新上游的。

    ErrorAsyncEmitter
    static final class ErrorAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {
        ErrorAsyncEmitter(Subscriber<? super T> downstream) {
            super(downstream);
        }
        @Override
        void onOverflow() {
            onError(new MissingBackpressureException("create: could not emit value due to lack of requests"));
        }
    }
    
    abstract static class NoOverflowBaseAsyncEmitter<T> extends BaseEmitter<T> {
        NoOverflowBaseAsyncEmitter(Subscriber<? super T> downstream) {
            super(downstream);
        }
        @Override
        public final void onNext(T t) {
            if (isCancelled()) {
                return;
            }
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            // 计数器 != 0 则往下发射数据,并且计数器减 1
            if (get() != 0) {
                downstream.onNext(t);
                BackpressureHelper.produced(this, 1);
            } else {
                // 如果计数器 == 0,上游还要往下发,则调用 onOverflow() 来处理溢出
                onOverflow();
            }
        }
        abstract void onOverflow();
    }
    

    ErrorAsyncEmitter 类继承自 NoOverflowBaseAsyncEmitter。不同于上面的 MissingEmitter 是收到数据后直接往下发,然后计数器 -1,ErrorAsyncEmitter 是计数器不为 0 才会往下发射数据,如果计数器为 0 了,则做溢出处理:往下游通知 onError

    DropAsyncEmitter
    static final class DropAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {
        private static final long serialVersionUID = 8360058422307496563L;
        DropAsyncEmitter(Subscriber<? super T> downstream) {
            super(downstream);
        }
        @Override
        void onOverflow() {
            // nothing to do
        }
    }
    

    ErrorAsyncEmitter 差不多,只不过溢出时 onOverflow() 不做任何处理。

    LatestAsyncEmitter

    这个的实现复杂一点,代码就不贴了。这个策略主要是当缓冲区满了的时候,如果上游再往下发射数据,LatestAsyncEmitter 会缓存最近一次发射的数据。当下游再 request 数据时,会将缓存的数据也一起发射下去。

    缓冲区

    上面的所有 Emitter 我们都只看到了计数器(Emitter 自身就是一个计数器),计数器帮助 Emitter 执行背压策略来发射数据。那么发射的数据是在哪里被缓存的呢?思考一下,数据缓存只有在异步的时候才需要,而在什么时候我们能够明确地知道是异步操作呢?那就是在调用切线程的操作符时。因此我们看看 FlowableObserveOn

    public final class FlowableObserveOn<T> extends AbstractFlowableWithUpstream<T, T> {
    final Scheduler scheduler;
        final boolean delayError;
        final int prefetch;
        public FlowableObserveOn(
                Flowable<T> source,
                Scheduler scheduler,
                boolean delayError,
                int prefetch) {
            super(source);
            this.scheduler = scheduler;
            this.delayError = delayError;
            // 缓冲区大小
            this.prefetch = prefetch;
        }
        @Override
        public void subscribeActual(Subscriber<? super T> s) {
            Worker worker = scheduler.createWorker();
            if (s instanceof ConditionalSubscriber) {
                source.subscribe(new ObserveOnConditionalSubscriber<T>(
                        (ConditionalSubscriber<? super T>) s, worker, delayError, prefetch));
            } else {
                // 走这里,构造 ObserveOnSubscriber 对象
                source.subscribe(new ObserveOnSubscriber<T>(s, worker, delayError, prefetch));
            }
        }
    }
    
    构造缓冲区

    上面主要需要关注的就是 prefetch 值。该值是由开发者调用 observeOn 时传入的,默认是 128。然后将这个缓冲区大小传入 ObserveOnSubscriber 的构造函数。我们先看下 ObserveOnSubscriber.onSubscribe() 方法:

    @Override
    public void onSubscribe(Subscription s) {
        if (SubscriptionHelper.validate(this.upstream, s)) {
            this.upstream = s;
            // ... 省去一些代码
            
            queue = new SpscArrayQueue<T>(prefetch);
            downstream.onSubscribe(this);
            s.request(prefetch);
        }
    }
    

    可以看到构造了一个 prefetch 大小的 SpscArrayQueue,并且依次向下游调用 onSubscribe(),向上游 request(prefetch)。还记得上一节中,各个 Emitter 的计数器是怎么来的吗?就是下游主动 request 的。因此我们开头的例子中 ErrorAsyncEmitter 的计数器大小是由这个 ObserveOnSubscriber 主动 request(128) 的,,也就是 128 大小,这也符合我们的 log 中的现象。

    设置 limit 为 2/3 缓冲区大小

    我们再看构造 FlowableObserveOn 时调用的父类 BaseObserveOnSubscriber

    BaseObserveOnSubscriber(
            Worker worker,
            boolean delayError,
            int prefetch) {
        this.worker = worker;
        this.delayError = delayError;
        this.prefetch = prefetch;
        this.requested = new AtomicLong();
        // 这里 limit 设置为缓冲区大小的 2/3
        this.limit = prefetch - (prefetch >> 2);
    }
    

    可以看到上面设置了一个 limit 为缓冲区 prefetch 的 2/3 大小。参照我们例子中的 log:

    • 第一次观察者消费 48 个数据,此时没有触发上游发射新的数据来填补缓冲区。
    • 第二次观察者消费 48 个数据,此时观察者消费到序号 95(也就是第 96 个数据)的数据后,上游马上开始发射数据。而 96 就是 128 的 2/3,因此可以知道,这个 limit 是用于判断缓冲区被消费 2/3(或者说只剩余少于 1/3)时,下游主动向上游请求数据的。因此这个 limit 应该是用于 onNext() 方法中的。
    缓冲区的消费行为

    直接看 onNext() 方法:

    @Override
    public final void onNext(T t) {
        if (done) {
            return;
        }
        if (!queue.offer(t)) {
            upstream.cancel();
            // 这就是采用 MISSING 策略时,缓冲区满的时候的报错
            error = new MissingBackpressureException("Queue is full?!");
            done = true;
        }
        trySchedule();
    }
    

    在这里我们发现了 MISSING 策略的报错信息。根据我们上面的分析,MISSING 策略就是无脑往下游发射数据,到 BaseObserveOnSubscriber.onNext() 这里时,queue.offer(t) 返回 false,表示缓冲区已满,于是向下游发送 error。因此我们知道了,MISSING 的意思其实是 missing 背压策略,而不是 missing 发射的数据
    接着调用 trySchedule(),我们来看看:

    final void trySchedule() {
        if (getAndIncrement() != 0) {
            return;
        }
        // 这里切线程执行任务
        worker.schedule(this);
    }
    

    上面通过 worker 来切换线程,最终执行的是 ObserveOnSubscriber.runAsync()

    @Override
    void runAsync() {
        int missed = 1;
        final Subscriber<? super T> a = downstream;
        final SimpleQueue<T> q = queue;
        long e = produced;
        for (;;) {
            // 下游观察者通过调用 request 设置的数目。在我们的例子中,就是 subscription 每次 request 的 48 个数据
            long r = requested.get();
            // 死循环,直到发射下游请求的数目个数据
            while (e != r) {
                boolean d = done;
                T v;
                try {
                    // 从缓存中取出数据
                    v = q.poll();
                } catch (Throwable ex) {
                    Exceptions.throwIfFatal(ex);
                    cancelled = true;
                    upstream.cancel();
                    q.clear();
                    a.onError(ex);
                    worker.dispose();
                    return;
                }
                boolean empty = v == null;
                if (checkTerminated(d, empty, a)) {
                    return;
                }
                if (empty) {
                    break;
                }
                a.onNext(v);
                e++;
                // 如果已发射数据 == limit(缓冲区的 2/3),则向上游请求 limit 个数据,并且一次性更新 request 数据
                if (e == limit) {
                    if (r != Long.MAX_VALUE) {
                        r = requested.addAndGet(-e);
                    }
                    upstream.request(e);
                    e = 0L;
                }
            }
            if (e == r && checkTerminated(done, q.isEmpty(), a)) {
                return;
            }
            int w = get();
            if (missed == w) {
                produced = e;
                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            } else {
                missed = w;
            }
        }
    }
    

    上面的关键逻辑其实很简单,而且也能解释我们例子中 log 的现象:从缓冲区中一直取数据往下发射,直到发射了下游 request 个数据。如果还没满足下游请求,发射数据就已经达到了缓冲区 2/3 个,那么就主动向上游请求 2/3 个数据,以填满缓冲区。

    这就解释了为什么例子中下游观察者只有消费了 96 个以上数据,上游才会继续往下发射 96 个数据(注意是发射缓冲区 2/3 个,如果缓冲区空余大于 2/3,那么上游也只会发射 2/3 个)

    多个背压策略

    如果我们创建多个背压 Flowable 后,再使用 onBackpressureLatest(), onBackpressureDrop 等背压操作符,那么最后的背压策略是以哪个为准呢?通过看源码我们可以知道,每一个背压操作符在 onSubscribe() 方法中,都是固定向上 request(Long.MAX_VALUE),这样就会使上游的背压策略中的计数器变为无限大,而使上游的背压策略失效。因此,背压策略的操作符总是以最后一个为准。

    中间的普通操作符有背压吗?

    我们看 FlowableMap,它构造的是 MapConditionalSubscriber,它没有像背压操作符一样继承自 AtomicLong,从而变为一个计数器(计数器被用于判断是否执行背压策略),只是简单地将数据做变换后发给下游。因此背压策略的操作在这些普通操作符上是不生效的。

    相关文章

      网友评论

          本文标题:RxJava 背压源码解析

          本文链接:https://www.haomeiwen.com/subject/qvolfktx.html