美文网首页
RxJava-----Flowable 源码分析

RxJava-----Flowable 源码分析

作者: 石器时代小古董 | 来源:发表于2018-04-11 11:50 被阅读0次
    参考资料 
    Pissay https://blog.piasy.com/AdvancedRxJava/2016/10/04/subjects-part-2/
    

    一、重点对象

    Flowable -----FlowableCreate
    Subscribe----FlowableOnBackpressureBuffer
    request-----AtomicLong 相当于一个计数,记录事件下发了多少次
    BackpressureHelper---控制请求个数
    drain:下发和整理事件
    上游发送对象的emitter和下游onSubscribe持有的subscription是一个对象,所以反复操作它的request

    BackPressure背压

    在异步模型中,如果上游产生数据速度过快而下游消费事件过慢。会出现数据堆积导致内存不断增加而溢出的问题。为了解决这种问题RxJava提出了节流以及背压的策略。它是一种流速控制的策略。

    1.响应式拉去实现流速控制
    在普通的RxJava模型中,上游主动推送事件给下游,下游的被动接收数据(下游的onNext方法是被动触发的)。而在响应式拉取模型中,由下游来请求上游发送事件。

    二、简单流程分析

     Flowable.create(new FlowableOnSubscribe<String>() {
    
                @Override
                public void subscribe(@NonNull FlowableEmitter<String> e) throws Exception {
                      //doSomething
                }
            }, BackpressureStrategy.BUFFER).subscribe(new Subscriber<String>() {
                @Override
                public void onSubscribe(Subscription s) {
                    s.request(20);
                }
    
                @Override
                public void onNext(String o) {
                    //doSomething
                }
    
                @Override
                public void onError(Throwable t) {
    
                }
    
            });
    

    FlowableCreate对象

    1.create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) 负责产生一个Flowable对象,它和Obserable的功能近似也有subscribe方法完成对下游观察者事件的订阅。不同的是它提供了Backpressure策略的支持。所以在性能上低于Observable,因为内部为了完成背压操作添加了许多其他操作。
    2.create实际产生一个FlowableCreate对象,这个对象会持有我们创建的FlowableOnSubscribe和背压策略

        final FlowableOnSubscribe<T> source;
        final BackpressureStrategy backpressure;
        public FlowableCreate(FlowableOnSubscribe<T> source, BackpressureStrategy backpressure) {
            this.source = source;
            this.backpressure = backpressure;
        }
    

    3.FlowableCreate负责根据背压策略mode来决定使用什么样的发射器emitter(Subscription),这一步是在订阅方法触发时完成的

    1. t.onSubscribe(emitter)会把发射器对象交给subscribe对象持有。
     @Override
        public void subscribeActual(Subscriber<? super T> t) {
            BaseEmitter<T> emitter;
            switch (backpressure) {
            case MISSING: {
                emitter = new MissingEmitter<T>(t);
                break;
            }
            case ERROR: {
                emitter = new ErrorAsyncEmitter<T>(t);
                break;
            }
            case DROP: {
                emitter = new DropAsyncEmitter<T>(t);
                break;
            }
            case LATEST: {
                emitter = new LatestAsyncEmitter<T>(t);
                break;
            }
            default: {
                emitter = new BufferAsyncEmitter<T>(t, bufferSize());
                break;
            }
            }
            t.onSubscribe(emitter);
            try {
                source.subscribe(emitter);
            } catch (Throwable ex) {
              .....
            }
        }
    

    三、LatestAsyncEmitter分发器

    我们采用的是LATEST策略:如果缓存池满了,会将后续的数据丢掉,但是不管缓存池的状态如何,LATEST都会将最后一条数据强行放入缓存池中。
    1.LatestAsyncEmitter分发器继承的是BaseEmitter,而BaseEmitter实际上是一个AtomicLong
    2.request方法内部其实是对Emitter维护的计数进行修改
    3.emitter内部维护了下游的Subscriber对象,用来调用下游的onNext方法传递事件

    继承AtomicLong为了优化性能。AtomicLong实际维护的是request个数,这个操作可能是在异步线程操作的,如果使用volatile关键词来维护,而volatile为防止指令重排会加入内存栅栏,频繁操作会有性能损耗,所以由AtomicLong来维护。

      abstract static class BaseEmitter<T>
        extends AtomicLong
        implements FlowableEmitter<T>, Subscription {
            final Subscriber<? super T> actual;
            final SequentialDisposable serial;
    
            BaseEmitter(Subscriber<? super T> actual) {
                this.actual = actual;
                this.serial = new SequentialDisposable();
            }
          @Override
            public final void request(long n) {
                if (SubscriptionHelper.validate(n)) {
                    BackpressureHelper.add(this, n);
                    onRequested();
                }
            }
        }
       // BackpressureHelper.add(this, n) 内部实现
             for (;;) {
                //如果request维护的是一个Long.MAX_VALUE的值,不做任何操作
                long r = requested.get();
                if (r == Long.MAX_VALUE) {
                    return Long.MAX_VALUE;
                }
               //将新值n和旧值相加
                long u = addCap(r, n);
               //更新requested维护的值
                if (requested.compareAndSet(r, u)) {
                    return r;
                }
            }
      //addCap
      public static long addCap(long a, long b) {
            long u = a + b;
            if (u < 0L) {
                return Long.MAX_VALUE;
            }
            return u;
        }
    

    三、LatestAsyncEmitter的发送事件方法

    只有在下游调用了request方法修改了BaseEmitter维护的引用计数,且触发了drain方法时才会下发事件吗,否则会因为在drain函数中判断了引用计数是否为0而终止发送
    下游函数在onSubcrible函数中拿到BaseEmitter对象,调用它的request请求开始发送数据

       @Override
            public final void request(long n) {
                if (SubscriptionHelper.validate(n)) {
                    BackpressureHelper.add(this, n);
                    onRequested();
                }
            }
      //在LatestAsyncEmitter类中
       @Override
            void onRequested() {
                drain();
            }
    

    done:是否忽略该事件
    queue:负责存储下发的事件
    drain:对事件是否下发做处理

    上游在调用onNext时,事件被queue缓存起来,但是如果下游没有调用request函数进行修改计数,在drain函数中也会被拦截

    //LatestAsyncEmitter的onNext方法
                if (done || isCancelled()) {
                    return;
                }
    
                if (t == null) {
                    onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                    return;
                }
                queue.set(t);
                drain();
    

    get():拿到emitter维护的计数,就是下游请求发送多少个数据
    e:代表当前处理了多少个数据的计数
    wip:保证同一时刻只有一个线程操作drain函数,它是AtomicInteger类。getAndIncrement会返回AtomicInteger维护的数据后再进行加1
    wip.addAndGet(-missed):相当于wip.addAndGet(wip.get()-1)
    这段代码会通过get()方法拿到下游请求了多少个数据,以及e用来计录已经处理了多少个数据。接着会判断订阅关系是否已经取消,如果取消就不发送。
    否则会不断下发数据直到e != r 即将所有下游请求的数据都发送完毕
    当所有数据都发送完毕后,会调用 BackpressureHelper.produced(this, e)修改维护的request计数,触发onComplete以及onError
    调用 if (missed == 0)来判断是否有新的任务,如果missed为0代表所有任务都执行完毕,如果大于0代表有新的任务,需要再次执行for(;;)的代码

            void drain() {
                if (wip.getAndIncrement() != 0) {
                    return;
                }
    
                int missed = 1;
                final Subscriber<? super T> a = actual;
                final AtomicReference<T> q = queue;
    
                for (;;) {
                    long r = get();
                    long e = 0L;
    
                    while (e != r) {
                        if (isCancelled()) {
                            q.lazySet(null);
                            return;
                        }
    
                        boolean d = done;
    
                        T o = q.getAndSet(null);
    
                        boolean empty = o == null;
    
                        if (d && empty) {
                            Throwable ex = error;
                            if (ex != null) {
                                error(ex);
                            } else {
                                complete();
                            }
                            return;
                        }
    
                        if (empty) {
                            break;
                        }
    
                        a.onNext(o);
    
                        e++;
                    }
    
                    if (e == r) {
                        if (isCancelled()) {
                            q.lazySet(null);
                            return;
                        }
    
                        boolean d = done;
    
                        boolean empty = q.get() == null;
    
                        if (d && empty) {
                            Throwable ex = error;
                            if (ex != null) {
                                error(ex);
                            } else {
                                complete();
                            }
                            return;
                        }
                    }
    
                    if (e != 0) {
                        BackpressureHelper.produced(this, e);
                    }
    
                    missed = wip.addAndGet(-missed);
                    if (missed == 0) {
                        break;
                    }
                }
            }
    

    思路总结

    使用Flowable和Subsrcibe对象完成订阅时,并不会像与Observable订阅完成后立即在subscribe方法中下发数据,而是在下游调用request时,通过触发drain函数(函数内部持有Subsrcibe,调用该对象的onNext方法)来去启动下发数据的流程,实现响应式拉取。拉取的数量由下游决定,通过更新BaseEmitter维护的任务计数来,来修改BaseEmitter处理事件的个数

    相关文章

      网友评论

          本文标题:RxJava-----Flowable 源码分析

          本文链接:https://www.haomeiwen.com/subject/rgjnhftx.html