Rxjava2之Fusion图解深入理解篇

作者: LSteven | 来源:发表于2017-12-24 17:03 被阅读127次

    最近看源码的时候,经常会看到FusionMode。 这玩意网上介绍比较少,粗看也比较复杂,但因为较多运算符中都用到了它,所以此篇决定选择几个代表运算符对它做一下分析。(图看不清楚点开看大图吧。。orz)

    TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
    
    Flowable.range(1, 5)
    
                  .flatMapIterable(mapper, 2)
    
                  .subscribe(ts);
    

    我们以上面的test code作为主要例子进行一步步肢解分析。最初看可能会比较晕,但这玩意多看几次就可以串起来了> < 自己的笔记记录,写的可能不是很流畅 见谅。

    FlowableRange

    image.png

    这里的分析与 Rxjava图示入门篇的方法一致。这里先介绍三个重要的类:

    BaseRangeSubscription

    我们知道,subscription其实可以看成上游和下游中间的连接点。下游拿到subscription可以随时取消订阅,而在这节,我们会发现他还有另一个极其重要的作用---与FusionMode的结合。BaseRangeSubscription继承于BasicQueueSubscription,它向外界提供三个接口:

    • requestFusion(mode)
    • request()
    • poll()
    requestFusion

    外界调用此接口时,会带着自己希望请求的Mode,这个Mode到底是什么后面再说。subsciption收到这个请求的mode后,根据自己的心情返回新的Mode给请求方。这里BaseRangeSubscription就返回了mode&SYNC

    request

    外界调用此接口时,代表向subscription请求n个item。BaseRangeSubscription收到该请求时,根据n是否=max,调用fastPathslowPath。这两个方法又是干什么的呢?先暂时忽略。

    poll

    外界调用此接口时,代表希望从subscription中获取item。可以看到FlowableRange就返回index,假设range是[0,10],那就依次会返回0,1,...

    其实一般与FusionMode有关的subscription都会提供这三个接口,后面我们会看到这三个接口分别起到什么重要的作用。

    RangeSubscription & RangeConditionSubscription

    这两货主要为FlowableRange实现了前面接口中的fastPathslowPath,根据图示可以看到本质都是调用了根据request(n)中请求的数量依次调用subscriber.onNext(index)

    FlowableFlattenIterable

    image.png

    这货与以前介绍过的flatMap类似。传入一个mapper,

    FlattenIterableSubscriber

    当其onSubscribe被调用时,该subscriber会观察与上游连接的subscription是否为QueueSubscription。是不是似曾相识,这个QueueSubscription在前面介绍FlowableRange时提过。如果与上游连接的subscriptionQueueSubscription,就会调用前面的接口1requestFusion(ANY)。看上游返回什么Mode,这里会返回两类,分别为SYNCASYNC

    sync

    假设上游返回SYNC,done = true。代表它们之间的交互为同步。

    if (m == SYNC) {
        fusionMode = m;
        this.queue = qs;
        done = true;
    
        actual.onSubscribe(this);
    
        return;
    }
    

    这里看到存在一个变量为queue,下游可以通过queue向上游获取数据

    aysnc

    假设上游返回ASYNC,代表上下游之间的交互为异步。subscription.request(prefetch)。调用接口2。根据前面的介绍会调用slowPath,即调用FlattenIterableSubscriber.onNext

    这里我们看onNext:上代码吧

    @Override
    public void onNext(T t) {
        if (done) {
            return;
        }
        if (fusionMode == NONE && !queue.offer(t)) {
            onError(new MissingBackpressureException("Queue is full?!"));
            return;
        }
        drain();
    }
    

    这里因为是async,所以不是done

    • ASYNC 进入drain()

    • NONE 如果队列已经满了抛出错误

    FlattenIterableSubscriber也和queue有关,它也提供之前介绍过的三个接口。

    在讲两个运算符结合之前,我们最后讲一下drain()到底干了什么吧!

    drain

    前面提到过,subscription有一个this.queue队列。drain会依次通过队列向上游请求item,通过iterable = mapper.apply(t);得到iterable,然后再遍历iterable,调用下游subscriber.onNext/onComplte,完成数据流动过程。

    两个操作符熔合

    终于到了激动人心的时刻。这两个操作符如果合在一起会产生什么神奇的效果呢?数据到底是怎么流动的呢?

    再放一次要介绍的例子:

    TestSubscriber<Integer> ts = new TestSubscriber<Integer>();
    
    Flowable.range(1, 5)
    
                  .flatMapIterable(mapper, 2)
    
                  .subscribe(ts);
    
    image.png

    当调用subscribe(ts)时首先调用FlowableFlatMapIterable.subscribe,此时自己看代码发现调用了source.subscibe(FlattenIterableSubscriber),即将上游FlowableRange与中游的FlattenIterableSubscriber结合。中游发现有人要给自己发数据了,就调用自己的onSubscribe,即FlattenIterableSubscriber.onSubscribe(BaseRangeSubscription),此时BaseRangeSubscription作为上游和中游之间的connection

    onSubscribe

    这时候发生的事情其实前面都介绍过了。但我自己都快忘了,重述一遍,此时中游的FlattenIterableSubscriber会通过connection BaseRangeSubscription请求Mode,这里BaseRangeSubscription返回SYNC,代表上游只支持同步模式。前面只简单介绍了SYNC,这里再重点介绍一下。

    
    if (m == SYNC) {
        fusionMode = m;
        this.queue = qs;
        done = true;
    
        actual.onSubscribe(this);
    
        return;
    }
    

    这里会继续调用realSubscriber.onSubscribe,我们先忽略下游也会有mode请求的情况,

    long mr = missedRequested.getAndSet(0L);
    if (mr != 0L) {
        s.request(mr);
    }
    

    此时我们的testSubscriber会通过中游与下游之间的connectionFlattenIterableSubscriber向中游请求数据。此时会调用前面介绍过的drain,而调用drain时,中游会主要通过上游与中游之间的connectionBaseRangeSubscription.poll即接口三要求上游同步返回数据给它,数据返回后再依次返回给下游。

    耶其实到这里就完成了整个数据的流动。是不是其实也不怎么复杂(...别欺骗自己了)

    到这里我们可以说,到底mode是什么了。拿住官方解释吧。

    /**
    * Request a synchronous fusion mode and can be returned by {@link #requestFusion(int)}
    * for an accepted mode.
    * <p>
    * In synchronous fusion, all upstream values are either already available or is generated
    * when {@link #poll()} is called synchronously. When the {@link #poll()} returns null,
    * that is the indication if a terminated stream.
    * In this mode, the upstream won't call the onXXX methods and callers of
    * {@link #poll()} should be prepared to catch exceptions. Note that {@link #poll()} has
    * to be called sequentially (from within a serializing drain-loop).
    */

    先介绍SYNC,说明当上游告诉下游它支持同步时,下游只会通过poll求取数据,取一个给一个,如果上游返回null,说明整个数据流已经被中断了。所以在这种模式下,上游是不会主动调用下游的onXXX的。

    ASYNC又是什么呢?

    ASYNC

    我们再走一遍async的流程吧!

    if (m == ASYNC) {
        fusionMode = m;
        this.queue = qs;
    
        actual.onSubscribe(this);
    
        s.request(prefetch);
        return;
    }
    

    看到async时,中游已经主动向上游去预先取数据了,所以有prefetch
    的概念。因此此时,上游就会去调用中游的onNext。看到这里可以发现asyncsync的部分不同了,sync只会通过poll去主动拉数据,而async是会去主动request数据让上游主动调用自己的onNext方法。

    @Override
    public void onNext(T t) {
        if (done) {
            return;
        }
        if (fusionMode == NONE && !queue.offer(t)) {
            onError(new MissingBackpressureException("Queue is full?!"));
            return;
        }
        drain();
    }
    

    onNext中看到依旧调用了drain,说明中游收到上游的通知时会再通过poll去拉取上游给它准备的数据。

    好吧,那现在让我们看看ASYNC的官方解释吧

    /**
    * Request an asynchronous fusion mode and can be returned by {@link #requestFusion(int)}
    * for an accepted mode.
    * <p>
    * In asynchronous fusion, upstream values may become available to {@link #poll()} eventually.
    * Upstream signals onError() and onComplete() as usual but onNext may not actually contain
    * the upstream value but have {@code null} instead. Downstream should treat such onNext as indication
    * that {@link #poll()} can be called. Note that {@link #poll()} has to be called sequentially
    * (from within a serializing drain-loop). In addition, callers of {@link #poll()} should be
    * prepared to catch exceptions.
    */

    在异步模式中,上游会通过onNext通知下游去取数据,此时下游就会通过poll去取上游给自己的准备的数据。

    可以看到,一个是下游主动同步去取,一个是上游发通知并携带数据给下游。下游其实是不知道数据什么时候会来的。

    注释其实还有另一层意思,如果上游给下游发了onNext(null),说明下游此时要通过poll主动去取。这个又是什么意思呢?这里的async例子其实不怎么好,因为FlowableRange只支持sync,所以我们换个例子再看看

    FlowabledoAfterNext

    sync

    TestSubscriber<Integer> ts0 = SubscriberFusion.newTest(QueueSubscription.SYNC);
    
    Flowable.range(1, 5)
    .doAfterNext(afterNext)
    .subscribe(ts0);
    

    这里sync运用在中游与下游之间。可以看到,当modesync时,

    if (initialFusionMode != 0) {
        if (s instanceof QueueSubscription) {
            qs = (QueueSubscription<T>)s;
    
            int m = qs.requestFusion(initialFusionMode);//sync
            establishedFusionMode = m;
    
            if (m == QueueSubscription.SYNC) {
                checkSubscriptionOnce = true;
                lastThread = Thread.currentThread();
                try {
                    T t;
                    while ((t = qs.poll()) != null) {
                        values.add(t);
                    }
                    completions++;
                } catch (Throwable ex) {
                    // Exceptions.throwIfFatal(e); TODO add fatal exceptions?
                    errors.add(ex);
                }
                return;
            }
        }
    }
    

    while ((t = qs.poll()) != null) {
    values.add(t);
    }

    所以再次印证,同步都是靠下游主动去拉的。

    重点关注async:

    async

    TestSubscriber<Integer> ts0 = SubscriberFusion.newTest(QueueSubscription.ASYNC);
    
    Flowable.range(1, 5)
    .doAfterNext(afterNext)
    .subscribe(ts0);
    
    actual.onSubscribe(s);
    long mr = missedRequested.getAndSet(0L);
    if (mr != 0L) {
        s.request(mr);
    }
    
    image.png

    进入DoAfterSubscriber.request(mr)->UnicastProcessor.request(mr)->drain->a.onNext(null)

    if (establishedFusionMode == QueueSubscription.ASYNC) {
        try {
            while ((t = qs.poll()) != null) {
                values.add(t);
            }
        } catch (Throwable ex) {
            // Exceptions.throwIfFatal(e); TODO add fatal exceptions?
            errors.add(ex);
            qs.cancel();
        }
        return;
    }
    

    可以看到这里就是官方解释的验证。这里中游和下游是异步的,所以当下游发出request告诉中游自己想要n个item时,最终中游通过上游返回onNext(null),下游收到这个通知时才会主动去拉数据。显而易见,中游什么时候给数据下游其实是不知道的,这里onNext也没有像之前我们分析过的操作符一样把item传递给下游,只起了一个通知的作用。

    其实这里我们还是忽略了很多东西,为什么上游会返回onNext(null),这里其实我们已经假定上中游也是异步模式了。这是因为FlowableOnNext当收到下游要异步的请求时,会也向上游请求自己也要使用异步的模式。所以这里上游就直接返回了onNext(null),我们进一步研究代码会发现如果中游向上游请求sync模式,那上游会返回onNext(t)

    while (r != e) {
          boolean d = done;
    
          T t = q.poll();
          boolean empty = t == null;
    
          if (checkTerminated(failFast, d, empty, a, q)) {
              return;
          }
    
          if (empty) {
              break;
          }
    
          a.onNext(t);
    
          e++;
      }
    

    总结

    写到这里其实可以理解为什么要提供三个接口了,

    • poll 用于同步请求返回数据,或是异步通知下游来拿数据
    • request 用于当收到下游的请求时的操作,如果是同步直接会返回给下游数据,否则可能只是通知下游我有数据了你自己来拿吧。

    操作符背后真的包含很多思想,这节只介绍了asyncsync,boundary还没看懂。。 后面还会继续研究操作符熔合,看懂了再写吧

    如果哪里理解的不对,欢迎及时指正。

    相关文章

      网友评论

      本文标题:Rxjava2之Fusion图解深入理解篇

      本文链接:https://www.haomeiwen.com/subject/gkyhgxtx.html