美文网首页
RxJava源码分析(二)基本的数据流分析(有背压)

RxJava源码分析(二)基本的数据流分析(有背压)

作者: kakaxicm | 来源:发表于2018-08-12 11:40 被阅读0次

    引言

    上篇文章中,我们了解了RxJava基本的无背压数据流实现原理,本篇我们依然从案例着手,学习有背压下数据流响应实现。何为背压?大多数情况下,上游发射数据的速度大于下游处理数据的速度,背压策略就是控制数据流速,在RxJava中通过设置下游的处理能力实现“响应式拉取”解决背压问题。

    样例

    下面是同步订阅带背压的样例:

    private void testBackPressure() {
            //同步订阅事件,发送一个接收一个,不会出现被观察者发送事件速度 > 观察者接收事件速度的情况。
            // 可是,却会出现被观察者发送事件数量 > 观察者接收事件数量的问题
            Flowable<Integer> upstream = Flowable.create(new FlowableOnSubscribe<Integer>() {
                @Override
                public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                    emitter.onNext(1);
                    emitter.onNext(2);
                    emitter.onNext(3);
                    emitter.onComplete();
                }
            }, BackpressureStrategy.ERROR);//背压策略,下游接收不到数据的时候报MissingBackpressureException异常
    
            Subscriber<Integer> subscriber = new Subscriber<Integer>() {
                @Override
                public void onSubscribe(Subscription s) {
    
                    Log.e(TAG, "Flowable:onSubscribe");
                    //设置下游接收的事件个数,如果不设置
                    //这里只接收两个事件
                    //同步订阅的情况下,如果不设置接收能力,也会报背压异常
                    s.request(2);
                }
    
                @Override
                public void onNext(Integer integer) {
                    Log.e(TAG, "Flowable:onNext: " + integer);
                }
    
                @Override
                public void onError(Throwable t) {
                    Log.e(TAG, "Flowable:onError: ", t);
                }
    
                @Override
                public void onComplete() {
                    Log.e(TAG, "Flowable:onComplete: ");
                }
            };
    
            upstream.subscribe(subscriber);
        }
    

    可见写法和无背压的模式基本一致,而里面出现的元素也和无背压一一对应:
    1.Flowable:被观察者,对应Observable;
    2.FlowableOnSubscribe:中间件,类似于“管道”,用于发射数据,对应ObservableOnSubscribe;

    1. FlowableEmitter:中间件,类似于“泵”,用于真正发射数据,对应ObservableEmitter;
    2. Subscription:数据流管道开关,类似于“阀门”,对应于Disposable,它可以控制下游的处理能力;
    3. Subscriber:被观察者,对应Observer。
      好了找到对应关系,我们猜想数据流的流向也是和无背压基本一致,事实上也是如此,因此本篇不再重点说明流向,而侧重点放到“响应式拉取”的实现上。由于和无背压的流程相似,不详细阐述他们之间的关系,不熟悉的请看上篇博客。下面先大概看看各元素的构成:

    Flowable

    public abstract class Flowable<T> implements Publisher<T> {
    ....
    }
    public interface Publisher<T> {
    
        /**
         * Request {@link Publisher} to start streaming data.
         * <p>
         * This is a "factory method" and can be called multiple times, each time starting a new {@link Subscription}.
         * <p>
         * Each {@link Subscription} will work for only a single {@link Subscriber}.
         * <p>
         * A {@link Subscriber} should only subscribe once to a single {@link Publisher}.
         * <p>
         * If the {@link Publisher} rejects the subscription attempt or otherwise fails it will
         * signal the error via {@link Subscriber#onError}.
         *
         * @param s the {@link Subscriber} that will consume signals from this {@link Publisher}
         */
        public void subscribe(Subscriber<? super T> s);
    }
    

    实现了Publisher接口的subscribe方法:

     @BackpressureSupport(BackpressureKind.SPECIAL)
        @SchedulerSupport(SchedulerSupport.NONE)
        @Override
        public final void subscribe(Subscriber<? super T> s) {
            ...
            try {
                s = RxJavaPlugins.onSubscribe(this, s);
                ....
                subscribeActual(s);
            } catch (NullPointerException e) { // NOPMD
                throw e;
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                // can't call onError because no way to know if a Subscription has been set or not
                // can't call onSubscribe because the call might have set a Subscription already
                RxJavaPlugins.onError(e);
    
                NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
                npe.initCause(e);
                throw npe;
            }
        }
    

    依然调用的subscribeActual抽象方法,我们接着看create操作符返回的FlowableCreate,它实现了subscribeActual方法。

    FlowableCreate

    public final class FlowableCreate<T> extends Flowable<T> {
        //用户传入的FlowableOnSubscribe
        final FlowableOnSubscribe<T> source;
        //背压策略
        final BackpressureStrategy backpressure;
        public FlowableCreate(FlowableOnSubscribe<T> source, BackpressureStrategy backpressure) {
            this.source = source;
            this.backpressure = backpressure;
        }
     @Override
        public void subscribeActual(Subscriber<? super T> t) {
            BaseEmitter<T> emitter;
            //根据背压策略,构造对应的发射器
            switch (backpressure) {
            case MISSING: {
                emitter = new MissingEmitter<T>(t);
                break;
            }
            case ERROR: {
                emitter = new ErrorAsyncEmitter<T>(t);
                break;
            }
            case DROP: {
                emitter = new DropAsyncEmitter<T>(t);
                break;
            }
            case LATEST: {
                emitter = new LatestAsyncEmitter<T>(t);
                break;
            }
            default: {
                emitter = new BufferAsyncEmitter<T>(t, bufferSize());
                break;
            }
            }
            //订阅时的观察方法,在使用的时候,需要在这个方法里面设置下游的处理事件个数
            t.onSubscribe(emitter);
            try {
                //通过emitter发射数据,用户实现
                source.subscribe(emitter);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                emitter.onError(ex);
            }
        }
    }
    

    和ObservableCreate类似,封装了FlowableOnSubscribe和背压策略,在subscribeActual中根据背压策略和传入的观察者设置不同的发射器,然后通过source.subscribe(emitter)发射数据,在这方法里,emitter封装了观察者Subscriber,最终调用它的接收数据方法。
    下面我们着重看看发射器的结构,它们既然和背压策略紧密联系,那么背压功能必然由它们实现。

    FlowableEmitter族

    FlowableEmitter接口

    先看它们的顶层接口:

    public interface FlowableEmitter<T> extends Emitter<T> {
      ...
      long requested();
      ...
    }
    

    其他方法和ObservableEmitter方法基本一致,这里多了requested方法,它用于返回当前可以接受事件的个数。另外发射器族还实现了Subscription接口:

    public interface Subscription {
            //设置下游接收能力,n为事件个数,多次调用时,容量累加
            public void request(long n);
           //断开管道
            public void cancel();
    }
    

    所以发射器拥有设置下游接收容量和断开管道的功能。
    接下来我们再看看背压数据流发射器的基类BaseEmitter:

    BaseEmitter

    abstract static class BaseEmitter<T>
        extends AtomicLong
        implements FlowableEmitter<T>, Subscription {
            private static final long serialVersionUID = 7326289992464377023L;
            //真正的被观察者
            final Subscriber<? super T> actual;
    
            final SequentialDisposable serial;
    
            BaseEmitter(Subscriber<? super T> actual) {
                this.actual = actual;
                this.serial = new SequentialDisposable();
            }
    
            @Override
            public void onComplete() {
                if (isCancelled()) {
                    return;
                }
                try {
                    //观察者onComplete
                    actual.onComplete();
                } finally {
                    serial.dispose();
                }
            }
    
            @Override
            public void onError(Throwable e) {
                if (e == null) {
                    e = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
                }
                if (isCancelled()) {
                    RxJavaPlugins.onError(e);
                    return;
                }
                try {
                    //观察者onError
                    actual.onError(e);
                } finally {
                    serial.dispose();
                }
            }
    
            @Override
            public final void cancel() {
                serial.dispose();
                onUnsubscribed();
            }
    
            void onUnsubscribed() {
                // default is no-op
            }
    
            @Override
            public final boolean isCancelled() {
                return serial.isDisposed();
            }
            
            //重点:调用层调用这个方法设置处理能力
            @Override
            public final void request(long n) {
                if (SubscriptionHelper.validate(n)) {
                    //计数器+n
                    BackpressureHelper.add(this, n);
                    onRequested();
                }
            }
    
            void onRequested() {
                // default is no-op
            }
    
            @Override
            public final void setDisposable(Disposable s) {
                serial.update(s);
            }
    
            @Override
            public final void setCancellable(Cancellable c) {
                setDisposable(new CancellableDisposable(c));
            }
    
            @Override
            public final long requested() {
                //返回当前可以处理的事件个数
                return get();
            }
    
            @Override
            public final FlowableEmitter<T> serialize() {
                return new SerializedEmitter<T>(this);
            }
        }
    

    继承AtomicLong类,说明发射器是原子操作的Long型数,本质是一个计数器。主要覆写了onError、onComplete等方法,其中request方法是设置Long型数,代表当前发射器的事件容量,调用者在接收数据的onSubscribe方法中必须调用这个方法,设置容量。至于其他方法的实现我们再看他的子类实现。

    NoOverflowBaseAsyncEmitter

    abstract static class NoOverflowBaseAsyncEmitter<T> extends BaseEmitter<T> {
    
            private static final long serialVersionUID = 4127754106204442833L;
    
            NoOverflowBaseAsyncEmitter(Subscriber<? super T> actual) {
                super(actual);
            }
    
            @Override
            public final void onNext(T t) {
                if (isCancelled()) {
                    return;
                }
    
                if (t == null) {
                    onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                    return;
                }
                //取当前剩余容量
                if (get() != 0) {
                     //被观察者onNext方法
                    actual.onNext(t);
                    //容量-1
                    BackpressureHelper.produced(this, 1);
                } else {
                    //剩余容量为0,超出处理能力交给子类实现
                    onOverflow();
                }
            }
            //溢出的数据再这个方法中处理,不同的背压策略有不同的实现
            abstract void onOverflow();
        }
    

    到目前为止,我们知道了request方法是做+计数,onNext方法是做减计数,在同步订阅方法中通过这两点实现响应式拉取,即:我能吃多少就只能吃多少,超过的事件交给onOverflow处理。接下来我们看看具体的几个发射器实现

    发射器的实现

    MissingEmitter

    MissingEmitter对应MISSING策略:

    static final class MissingEmitter<T> extends BaseEmitter<T> {
            private static final long serialVersionUID = 3776720187248809713L;
    
            MissingEmitter(Subscriber<? super T> actual) {
                super(actual);
            }
    
            @Override
            public void onNext(T t) {
                if (isCancelled()) {
                    return;
                }
    
                if (t != null) {
                    actual.onNext(t);
                } else {
                    onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                    return;
                }
    
                for (;;) {
                    long r = get();
                    //当前计数器为0或者安全-1返回,则退出循环
                    if (r == 0L || compareAndSet(r, r - 1)) {
                        return;
                    }
                }
            }
    
        }
    

    发现它没有继承NoOverflowBaseAsyncEmitter,所以没有做溢出处理,在onNext方法中检查当前计数器的值,安全减1或者计数值为0时返回,可见它接收到事件能处理就处理,处理不了就睁一只鸭闭一只眼直接返回。

    ErrorAsyncEmitter

    static final class ErrorAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {
            private static final long serialVersionUID = 338953216916120960L;
    
            ErrorAsyncEmitter(Subscriber<? super T> actual) {
                super(actual);
            }
    
            @Override
            void onOverflow() {
                //事件溢出处理
                onError(new MissingBackpressureException("create: could not emit value due to lack of requests"));
            }
        }
    

    ErrorAsyncEmitter在事件溢出时直接抛出背压异常。

    DropAsyncEmitter

    static final class DropAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {
    
    
            private static final long serialVersionUID = 8360058422307496563L;
    
            DropAsyncEmitter(Subscriber<? super T> actual) {
                super(actual);
            }
    
            @Override
            void onOverflow() {
                //事件溢出,就当没看见
            }
    
        }
    

    到目前为止,我们梳理了背压原理,其实本质就是在发射数据的时候设置了计数器,没接收一个事件计数器减一,背压策略就是处理数据溢出的情况。

    相关文章

      网友评论

          本文标题:RxJava源码分析(二)基本的数据流分析(有背压)

          本文链接:https://www.haomeiwen.com/subject/nwtsbftx.html