美文网首页
Rxjava2 简析Flowable背压(4)

Rxjava2 简析Flowable背压(4)

作者: PuHJ | 来源:发表于2019-04-02 10:09 被阅读0次

    一、简介

    前面几章都是介绍Observable,而Observable类是实现无背压方式的。而有背压的方式就不能使用Observable而是Flowable。其实这两个类没有太大的区别,尤其是操作符的处理这块。

    关于Flowable和Observable有几个相似作用的类。

    • Flowable对应Observable
    • FlowableEmitter对应ObservableEmitter
    • Subscriber对应Observer
    • Subscription对于Disposable

    关于这几个相同概念的类,背压方式的增加了一些额外的功能。

    先来看下背压方式的简单实现:

    // 1、创建Flowable对象
    Flowable<Integer> flowable = Flowable
                    .create(new FlowableOnSubscribe<Integer>() {
                        @Override
                        public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                            Log.e(TAG, "subscribe: " + emitter.requested());
    
                            for (int i = 0; i < 128; i++) {
                                Log.e(TAG, "subscribe: "+i );
                                emitter.onNext(i);
                            }
                        }
                    }, BackpressureStrategy.DROP);
            // 2、创建Subscriber观察者对象
            Subscriber<Integer> subscriber = new Subscriber<Integer>() {
                Subscription mSubscription = null;
    
                @Override
                public void onSubscribe(Subscription s) {
                    Log.e(TAG, "onSubscribe");
                    mSubscription = s;
                    s.request(1);
                }
    
                @Override
                public void onNext(Integer integer) {
                    Log.e(TAG, "onNext: " + integer);
                }
    
                @Override
                public void onError(Throwable t) {
                    Log.e(TAG, "onError: ", t);
                }
    
                @Override
                public void onComplete() {
                    Log.e(TAG, "onComplete");
                }
            };
            // 3、发生订阅关系
            flowable.subscribe(subscriber);
    

    从使用角度来说,Flowable的整体框架和Observable没有区别,仅仅是换了不同的类,但是实现的功能大体一致。

    • 1、创建Flowable对象
    • 2、创建Subscriber观察者对象
    • 3、发生订阅关系

    关于出现了压力后,也有不同的策略处理。Flowable提供了以下几种策略:

    public enum BackpressureStrategy {
        /**
         * OnNext events are written without any buffering or dropping.
         * Downstream has to deal with any overflow.
         * <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators.
         */
        MISSING,
        /**
         * Signals a MissingBackpressureException in case the downstream can't keep up.
         */
        ERROR,
        /**
         * Buffers <em>all</em> onNext values until the downstream consumes it.
         */
        BUFFER,
        /**
         * Drops the most recent onNext value if the downstream can't keep up.
         */
        DROP,
        /**
         * Keeps only the latest onNext value, overwriting any previous value if the
         * downstream can't keep up.
         */
        LATEST
    }
    
    • BUFFER 当发送的事件有来不及处理的时候,会放在缓冲区里面,这个缓冲区会无限的增加,直到发生OOM
    • ERROR 当FlowableEmitter发射器在emitter.requested() == 0的时候发送就会抛出异常
    • DROP Rxjava默认的缓冲区为128,如果有来不及处理的事件,就会放到缓冲区,128个放满后,接下来的事件就会抛弃。
    • LATEST 与DROP策略类似,他会抛弃最开始的数据,缓冲最后的数据。

    二、基本类的介绍

    1)、Subscription

    /**
     * A {@link Subscription} represents a one-to-one lifecycle of a {@link Subscriber} subscribing to a {@link Publisher}.
     * <p>
     * It can only be used once by a single {@link Subscriber}.
     * <p>
     * It is used to both signal desire for data and cancel demand (and allow resource cleanup).
     *
     */
    
    public interface Subscription {
    
        public void request(long n);
    
        public void cancel();
    }
    

    对于Subscription的解释是,它是和Subscriber对象是一对一的关系的,以及他是个控制类,控制事件流的流向。用户可以用该对象去拉取相应的数据。

    Subscription类是对应Disposable类的,Disposable类原来的作用就是取消事件流的,Subscription保留了该方法。但同时增加了拉取方法request。该对象会在调用观察者Subscriber的时候传入。

    2)、Subscriber

    Subscriber是观察者,对应着Observable中的Observer类。

    public interface Subscriber<T> {
        /**
         * Invoked after calling {@link Publisher#subscribe(Subscriber)}.
         * <p>
         * No data will start flowing until {@link Subscription#request(long)} is invoked.
         * <p>
         * It is the responsibility of this {@link Subscriber} instance to call {@link Subscription#request(long)} whenever more data is wanted.
         * <p>
         * The {@link Publisher} will send notifications only in response to {@link Subscription#request(long)}.
         * 
         * @param s
         *            {@link Subscription} that allows requesting data via {@link Subscription#request(long)}
         */
        public void onSubscribe(Subscription s);
    
        public void onNext(T t);
    
        public void onError(Throwable t);
    
        public void onComplete();
    }
    

    Subscriber相比较Observer类,在onSubscribe传入的不是Disposable对象,而是Subscription对象,使用Subscription对象控制事件流。

    3)、FlowableEmitter

    FlowableEmitter类是发射器类,对应ObservableEmitter类。

    public interface FlowableEmitter<T> extends Emitter<T> {
    
        /**
         * Sets a Disposable on this emitter; any previous Disposable
         * or Cancellation will be unsubscribed/cancelled.
         * @param s the disposable, null is allowed
         */
        void setDisposable(Disposable s);
    
        /**
         * Sets a Cancellable on this emitter; any previous Disposable
         * or Cancellation will be unsubscribed/cancelled.
         * @param c the cancellable resource, null is allowed
         */
        void setCancellable(Cancellable c);
    
        /**
         * The current outstanding request amount.
         * <p>This method is thread-safe.
         * @return the current outstanding request amount
         */
        long requested();
    
        /**
         * Returns true if the downstream cancelled the sequence.
         * <p>This method is thread-safe.
         * @return true if the downstream cancelled the sequence
         */
        boolean isCancelled();
    
        /**
         * Ensures that calls to onNext, onError and onComplete are properly serialized.
         * @return the serialized FlowableEmitter
         */
        FlowableEmitter<T> serialize();
    }
    

    FlowableEmitter类有他独特的方法,主要的方法是long requested();可以用该方法来感知当前的下游的情况,可以使下游和上游产生联系。

    FlowableEmitter本身是继承Emitter的,它具有发射的功能。在发射的之前可以通过requestd方法判断下游还可以处理多少,这样就完成了响应式拉取的核心东西。

    3)、Flowable被观察者

    Flowable是整个观察者模式中的被观察者概念。作为被观察者它是有个订阅功能。

    Flowable是继承自Publisher类,该类的作用就是定义一个订阅方法。其中Subscriber是观察者。

    public interface Publisher<T> {
        public void subscribe(Subscriber<? super T> s);
    }
    

    Flowable的结构和Observable是差不多的,现在就来看下一份精简的代码。

    public abstract class Flowable<T> implements Publisher<T> {
    
        @BackpressureSupport(BackpressureKind.SPECIAL)
        @SchedulerSupport(SchedulerSupport.NONE)
        public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) {
            ObjectHelper.requireNonNull(source, "source is null");
            ObjectHelper.requireNonNull(mode, "mode is null");
            return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
        }
    
        @BackpressureSupport(BackpressureKind.SPECIAL)
        @SchedulerSupport(SchedulerSupport.NONE)
        @Override
        public final void subscribe(Subscriber<? super T> s) {
            ObjectHelper.requireNonNull(s, "s is null");
            try {
                s = RxJavaPlugins.onSubscribe(this, s);
    
                ObjectHelper.requireNonNull(s, "Plugin returned null Subscriber");
    
                subscribeActual(s);
            } catch (NullPointerException e) { // NOPMD
                throw e;
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                // can't call onError because no way to know if a Subscription has been set or not
                // can't call onSubscribe because the call might have set a Subscription already
                RxJavaPlugins.onError(e);
    
                NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
                npe.initCause(e);
                throw npe;
            }
        }
    
        protected abstract void subscribeActual(Subscriber<? super T> s);
    
    }
    

    源码一看和Observable没什么区别,通过Create或者其他类似方式创建一个Flowable对象。在Create中和其他操作符中保存上一个的Observable对象,在subscribe中用上一个Observable对象调用下游传入的Subscriber,这样就形成了一个链式结构。

    三、响应式拉去原理

    背压策略

    白话描述响应式拉取原理:观察者中的onSubscribe方法被调用的时候,会传入一个Subscription对象,该对象的request(int n)方法就是发送一个命令,下游可以处理n个数据。保存的变量就会增加n。上游的发送器中也可以访问到这个保存的变量,通过判断这个变量是否0就知道下游现在的情况了,从而触发onNext()方法继续发送事件。

    现在就开始从源码角度看了:

    • 1、注册的时候走到FlowableCreate中的subscribeActual方法
      @Override
        public void subscribeActual(Subscriber<? super T> t) {
            BaseEmitter<T> emitter;
    
            switch (backpressure) {
            case MISSING: {
                emitter = new MissingEmitter<T>(t);
                break;
            }
            case ERROR: {
                emitter = new ErrorAsyncEmitter<T>(t);
                break;
            }
            case DROP: {
                emitter = new DropAsyncEmitter<T>(t);
                break;
            }
            case LATEST: {
                emitter = new LatestAsyncEmitter<T>(t);
                break;
            }
            default: {
                emitter = new BufferAsyncEmitter<T>(t, bufferSize());
                break;
            }
            }
    
            t.onSubscribe(emitter);
            try {
                source.subscribe(emitter);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                emitter.onError(ex);
            }
        }
    

    可以看出这里传入不同的策略,会生成对应的BaseEmitter策略子类。

    • 第一步还是执行相应的 t.onSubscribe(emitter);传入的Emitter就是Subscription的子类。

    • 第二步还是source.subscribe(emitter);通知用户开始发射事件流。

    • 2、第二步触发事件流

    public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
          if (emitter.requested()>0){
                emitter.onNext(1);
          }
    }
    

    source.subscribe(emitter)执行中,通常会用emitter去触发emitter.onNext(int),这样会触发事件流,也就到了DropAsyncEmitter中的onNext方法。

    DropAsyncEmitter#onNext源码:

            @Override
            public final void onNext(T t) {
                if (isCancelled()) {
                    return;
                }
    
                if (t == null) {
                    onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                    return;
                }
    
                if (get() != 0) {
                    actual.onNext(t);
                    BackpressureHelper.produced(this, 1);
                } else {
                    onOverflow();
                }
            }
    

    前面都是检测合法性,重要的是下面的判断,如果get()!=0,说明下游可以处理数据,那就发送一个数据,否则则丢弃不管了(该事件到这就停止了),也就是Drop策略。get()!=0则处理actual.onNext(t);并将记录的处理数据减一。

    再看下生产者这边,也就是在BaseEmitter中的request(int)方法:

            @Override
            public final void request(long n) {
                if (SubscriptionHelper.validate(n)) {
                    BackpressureHelper.add(this, n);
                    onRequested();
                }
            }
    

    先判断传入的n是否大于0是否合法,合法就更改可以处理的保存的记录数,生产者到这就完成了。

    这里逻辑就基本清晰了,生产者(request(int))和消费者(onNext()),他们操作的是同一个内存数据。

    生产者消费者模型

    • 触发onNext()是消费者,代表消费一个数据,前提是有数据可以消费。
    • request(int)是生产者,生产数据。

    由于生产者和消费者可以在不同的线程操作,可能会带来线程不安全,所以采用了AtomicLong线程安全的Long来保存可消费的数据。

    关于对数据内存的操作,被封装成了单独的类,下面是精简版本

    public final class BackpressureHelper {
        /** Utility class. */
        private BackpressureHelper() {
            throw new IllegalStateException("No instances!");
        }
    
         // 判断是否超出了范围
        public static long addCap(long a, long b) {
            long u = a + b;
            if (u < 0L) {
                return Long.MAX_VALUE;
            }
            return u;
        }
    
        // 生产数据
        public static long add(AtomicLong requested, long n) {
            for (;;) {
                long r = requested.get();
                if (r == Long.MAX_VALUE) {
                    return Long.MAX_VALUE;
                }
                long u = addCap(r, n);
                if (requested.compareAndSet(r, u)) {
                    return r;
                }
            }
        }
    
    
        // 消费数据
        public static long produced(AtomicLong requested, long n) {
            for (;;) {
                long current = requested.get();
                if (current == Long.MAX_VALUE) {
                    return Long.MAX_VALUE;
                }
                long update = current - n;
                if (update < 0L) {
                    RxJavaPlugins.onError(new IllegalStateException("More produced than requested: " + update));
                    update = 0L;
                }
                if (requested.compareAndSet(current, update)) {
                    return update;
                }
            }
        }
    }
    

    以add(AtomicLong requested, long n)生产者为例。传入的参数是requested代表原来的数据的索引,n为需要再生产个数。本质上就是将request+n,但这样是线程不安全的。

    首先使用for (;;) 循环,这里没有采用锁机制,而是采用自旋锁,将生产的数据和理论的数据进行对比,如果不是则重新操作,一致就说明这些操作是线程安全的,采用原理的比较设置为最新的数即可。

    同理消费者也是。

    四、小结

    关于响应式拉取,本质上就是生产者和消费者模型。AtomicLong代表着数据源,是个被操作的对象;FlowableEmitter为发射器是消费者;Subscription为控制器,内部有request(int)方法,为生产者。

    而BaseEmitter则继承了AtomicLong、FlowableEmitter和Subscription这三个类,也就是说明在BaseEmitter是个集大成者,将生产者消费者模型中的元素都放在了一起,产生了联系,这样就可以操作了。

    其中BackpressureHelper是个代理处理数据源的类,他采用自旋锁机制提高了CPU的利用率,也保证了安全。因为这里的线程冲突场景不可能特别多,加锁的话会占用上千的CPU时钟周期,而自旋锁一般状态也就消耗几个时钟周期。

    相关文章

      网友评论

          本文标题:Rxjava2 简析Flowable背压(4)

          本文链接:https://www.haomeiwen.com/subject/vhubbqtx.html