RxJava 2.x知识笔记

作者: 正规程序员 | 来源:发表于2017-12-01 20:48 被阅读183次

    观察者模式的运用

    传统的Java观察者模式可以参考此篇博客:Java观察者模式案例简析

    RxJava 是基于Java的观察者模式开展的。构建被观察者(Observable/Flowable)、观察者(Observer/Subscriber),并通过建立两者的订阅关系实现观察,在事件的传递过程中可以对事件进行各种处理。

    在rxjava 1.x、rxjava 2.x里,Observable是被观察者,Observer是观察者,正常逻辑是观察者通过subscribe订阅Observable的事件处理,当Observable发射事件时Observer接收数据。但为了保持流式API风格,观察者订阅被观察者的代码顺序设计有一些调整。
    如:

    Observable.subscribe(Observer);
    

    RxJava 1.x 和RxJava 2.x的主要区别

    在RxJava 2.x中的观察者模式有两种。而Flowable作为被观察者是专门支持背压的。这也是RxJava 1.x 和RxJava 2.x的主要区别。当然还有一些区别是操作符、接口的不兼容更新。

    • Observable ( 被观察者 ) / Observer ( 观察者 )
    • Flowable (被观察者)/ Subscriber (观察者)


      image.png

    RxJava1.x 平滑升级到RxJava2.x

    由于RxJava2.0变化较大无法直接升级,幸运的是,官方提供了RxJava2Interop这个库,可以方便地将RxJava1.x升级到RxJava2.x,或者将RxJava2.x转回RxJava1.x。
    RxJava2Interop

    经典流式API调用风格

      Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                    e.onNext(1);
                    e.onNext(2);
                    e.onNext(3);
                }
            })
    //                ...省略很多在发射过程中的流式处理代码
                    .subscribe(new Observer<Integer>() {
                        private Disposable mDisposable;
    
                        @Override
                        public void onSubscribe(Disposable d) {
                            mDisposable = d;
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            Log.d("onNext", "" + integer);
                            //新增的Disposable可以做到切断的操作,让Observer观察者不再接收上游事件。
                            if (integer == 3) {
                                mDisposable.dispose();
                                Log.d("onNext", "已停止接收事件");
                            }
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onComplete() {
    
                        }
                    });
    

    打印结果:

    11-28 13:00:42.195 29930-29930/? D/onNext: 1
    11-28 13:00:42.195 29930-29930/? D/onNext: 2
    11-28 13:00:42.195 29930-29930/? D/onNext: 3
    11-28 13:00:42.195 29930-29930/? D/onNext: 已停止接收事件
    

    Rxjava 线程调度

    subscribeOn() 指定的就是发射事件的线程,observerOn 指定的就是订阅者接收事件的线程。

    多次指定发射事件的线程只有第一次指定的有效,也就是说多次调用 subscribeOn() 只有第一次的有效,其余的会被忽略。
    但多次指定订阅者接收线程是可以的,也就是说每调用一次 observerOn(),下游的线程就会切换一次。

    Rxjava 2.x常用操作符

    1,Function<T, R> ——将输入的value类型T转换成输出的value类型R。通常结合Map操作符。

    /**
     * A functional interface that takes a value and returns another value, possibly with a
     * different type and allows throwing a checked exception.
     *
     * @param <T> the input value type
     * @param <R> the output value type
     */
    public interface Function<T, R> {
        /**
         * Apply some calculation to the input value and return some other value.
         * @param t the input value
         * @return the output value
         * @throws Exception on error
         */
        @NonNull
        R apply(@NonNull T t) throws Exception;
    }
    

    2,Map——将一个Observable被观察者通过特定函数的执行,转换成另一种Observable被观察者。
    在 2.x 中和 1.x 中作用几乎一致,不同点在于:2.x 将 1.x 中的 Func1 和 Func2 改为了 Function 和 BiFunction。

      /**
         * Returns an Observable that applies a specified function to each item emitted by the source ObservableSource and emits the results of these function applications.
         * 
        @SchedulerSupport(SchedulerSupport.NONE)
        public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
            ObjectHelper.requireNonNull(mapper, "mapper is null");
            return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
        }
    

    3,Consumer——接收一个单独的数据,类似于一个简化版的观察者observer。

    /**
     * A functional interface (callback) that accepts a single value.
     * @param <T> the value type
     */
    public interface Consumer<T> {
        /**
         * Consume the given value.
         * @param t the value
         * @throws Exception on error
         */
        void accept(@NonNull T t) throws Exception;
    }
    

    4,distinct——去重操作符。即先有的数字保留,重复的数字去除并保留原先顺序的方式输出。

      Observable.just(2, 1, 2, 3, 4, 2, 3)
                    .distinct()
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(@NonNull Integer integer) throws Exception {
                            Log.d("accept", "" + integer);
                        }
                    });
    

    输出

    11-28 11:59:17.511 7052-7052/com.zjrb.sjzsw D/accept: 2
    11-28 11:59:17.511 7052-7052/com.zjrb.sjzsw D/accept: 1
    11-28 11:59:17.511 7052-7052/com.zjrb.sjzsw D/accept: 3
    11-28 11:59:17.511 7052-7052/com.zjrb.sjzsw D/accept: 4
    

    5,concat—— 可以做到不交错的发射两个甚至多个 Observable 的发射事件,并且只有前一个 Observable 终止(onComplete) 后才会订阅下一个 Observable。比如可以采用 concat 操作符先读取缓存再通过网络请求获取数据。

    案例说明:

      Observable observable = Observable.just(1, 2, 3, 4, 5, 6)
                    .map(new Function<Integer, Integer>() {
                        @Override
                        public Integer apply(@NonNull Integer integer) throws Exception {
                            return integer + 1;
                        }
                    });
            Observable.concat(Observable.just(-1, -2, -3, -4, -5, -6), observable)
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(@NonNull Integer integer) throws Exception {
                            Log.d("accept", "" + integer);
                        }
                    });
    

    打印输出:看吧,两个Observable是按照顺序依次无交错执行的。

    11-29 01:48:50.430 31564-31564/com.zjrb.sjzsw D/accept: -1
    11-29 01:48:50.430 31564-31564/com.zjrb.sjzsw D/accept: -2
    11-29 01:48:50.430 31564-31564/com.zjrb.sjzsw D/accept: -3
    11-29 01:48:50.430 31564-31564/com.zjrb.sjzsw D/accept: -4
    11-29 01:48:50.430 31564-31564/com.zjrb.sjzsw D/accept: -5
    11-29 01:48:50.430 31564-31564/com.zjrb.sjzsw D/accept: -6
    11-29 01:48:50.430 31564-31564/com.zjrb.sjzsw D/accept: 2
    11-29 01:48:50.430 31564-31564/com.zjrb.sjzsw D/accept: 3
    11-29 01:48:50.430 31564-31564/com.zjrb.sjzsw D/accept: 4
    11-29 01:48:50.430 31564-31564/com.zjrb.sjzsw D/accept: 5
    11-29 01:48:50.431 31564-31564/com.zjrb.sjzsw D/accept: 6
    11-29 01:48:50.431 31564-31564/com.zjrb.sjzsw D/accept: 7
    

    注:熟悉操作符的目的在于,不同场景中都能随时想到有对应的工具可用。

    背压

    背压:指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略。
    因为事件产生的速度远远快于事件消费的速度,最终导致数据积累越来越多,从而导致OOM等异常。这就是背压产生的必要性。

    RxJava2.0中,Flowable是能够支持Backpressure的Observable,是对Observable的补充(而不是替代)。所以Observable被观察者支持的API,Flowable也都支持,并且Flowable的API里也都强制支持背压。

    背压经典代码

    Flowable.create(new FlowableOnSubscribe<Integer>() {
                @Override
                public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                    if (e.requested() != 0) {
                        for (int i = 0; i < 10; i++) {
                            e.onNext(i + 1);
                            Log.d(TAG, "已发送" + (i + 1) + "个——剩下" + e.requested());
                        }
                        e.onComplete();
                    }
                }
            }, BackpressureStrategy.LATEST)
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Subscriber<Integer>() {
    
                        @Override
                        public void onSubscribe(Subscription s) {
                            subscription = s;
                        }
    
                        @Override
                        public void onNext(Integer s) {
                            Log.d(TAG, "接收 ——" + s);
                            if (s == 9){
                                subscription.cancel();
                            }
                        }
    
                        @Override
                        public void onError(Throwable t) {
                            Log.d(TAG, "接收错误——" + t);
                        }
    
                        @Override
                        public void onComplete() {
                        }
                    });
    

    外部调用subscription请求配合配额11个。

      if (subscription != null) {
          subscription.request(11);
      }
    

    在BackpressureStrategy.LATEST背压策略下,上游发射10个事件,下游由外部调用请求发布配额指令,当下游接收到第9个事件时暂停上游发布(此操作会清空上游事件源)。

    背压策略

    • BackpressureStrategy.MISSING
      此策略下,上游发射的数据不做缓存也不丢弃,下游处理溢出的问题。简单说就是没有背压。
    • BackpressureStrategy.ERROR
      此策略下,在上游发射速度过快并超出下游接收速度时,抛出MissingBackpressureException异常。
    • BackpressureStrategy.BUFFER
      此策略下,把上游发射过来的所有数据全部缓存在缓存区,不做丢弃,待下游接收。
    • BackpressureStrategy.DROP
      此策略下,相当于一种令牌机制(或者配额机制),下游通过request请求产生令牌(配额)给上游,上游接到多少令牌,就给下游发送多少数据。当令牌数消耗到0的时候,上游开始丢弃数据。BackpressureStrategy.LATEST的策略和此类似。
    • BackpressureStrategy.LATEST
      此策略和BackpressureStrategy.DROP的策略类似,但在令牌数为0的时候有一点微妙的区别:onBackpressureDrop直接丢弃数据,不缓存任何数据;而onBackpressureLatest则缓存最新的一条数据,这样当上游接到新令牌的时候,它就先把缓存的上一条“最新”数据发送给下游。
    /**
     * Represents the options for applying backpressure to a source sequence.
     */
    public enum BackpressureStrategy {
        /**
         * OnNext events are written without any buffering or dropping.
         * Downstream has to deal with any overflow.
         * <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators.
         */
        MISSING,
        /**
         * Signals a MissingBackpressureException in case the downstream can't keep up.
         */
        ERROR,
        /**
         * Buffers <em>all</em> onNext values until the downstream consumes it.
         */
        BUFFER,
        /**
         * Drops the most recent onNext value if the downstream can't keep up.
         */
        DROP,
        /**
         * Keeps only the latest onNext value, overwriting any previous value if the
         * downstream can't keep up.
         */
        LATEST
    }
    

    Flowable案例代码

    Flowable.create(new FlowableOnSubscribe<Integer>() {
                @Override
                public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                    e.onNext(1);
                    e.onNext(2);
                    e.onNext(3);
                    e.onComplete();
                    e.onNext(4);
                }
            }, BackpressureStrategy.ERROR)
                    //下面两行代码执行线程切换,达到异步效果
    //                .subscribeOn(Schedulers.io())
    //                .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Subscriber<Integer>() {
    
                        @Override
                        public void onSubscribe(Subscription s) {
                            subscription = s;
    //                        subscription.request(1);
                        }
    
                        @Override
                        public void onNext(Integer s) {
    //                        subscription.request(1);
                            Log.d(TAG, "接收——" + s);
    //                        subscription.cancel();
                        }
    
                        @Override
                        public void onError(Throwable t) {
                            Log.d(TAG, "接收错误——" + t);
                        }
    
                        @Override
                        public void onComplete() {
                        }
                    });
    

    这里指定背压策略是BackpressureStrategy.ERROR,这种策略下执行此段代码会报如下错误。

    D/rxjava2: 接收错误——io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
    

    因为,上下游是同步的。上游发射了事件但是下游没有接收,就会造成阻塞(即便上游的事件队列长度只有3个 < 128)。为了避免ANR,就要提示MissingBackpressureException异常。

    如果恢复第12、13行处理线程切换的代码,表示上下游位于不同线程,是异步状态。此种情形下,上游发射数据后就不会报MissingBackpressureException异常,但虽然上游能正常发射数据,下游同样接收不到数据。

    这里涉及到一个知识点:

    Flowable默认事件队列大小为128。BackpressureStrategy.BUFFER策略下事件队列无限大,和没有采取背压的Observable ( 被观察者 ) / Observer ( 观察者 )类似了。

    注:在处理同一组数据时,Observable ( 被观察者 ) / Observer ( 观察者 )比BackpressureStrategy.BUFFER策略下的Flowable (被观察者)/ Subscriber (观察者)性能更优,内存消耗更少。

    在上下游异步的情况下,上游会先把事件发送到长度为128的事件队列中,待下游发送请求数据指令后从事件队列中拉取数据。这种“响应式拉取”的思想用于解决上下游流速不均衡的情况。

    上述代码中,第19、24行代码是表示下游接收前、接收后发送请求配额指令给上游。也可以通过subscription.request(n);在外围调用发送n个请求配额给上游以获取数据。

    API自带的被观察者的背压策略

    API内的其他被观察者,API也为我们提供了背压策略方法:

    • onBackpressureBuffer()
    • onBackpressureDrop()
    • onBackpressureLatest()
      示例代码如下:
       Flowable.interval(1, TimeUnit.MICROSECONDS)
                    .onBackpressureDrop()  //加上背压策略
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Subscriber<Long>() {
                        @Override
                        public void onSubscribe(Subscription s) {
                            Log.d(TAG, "onSubscribe");
                            subscription = s;
                            s.request(Long.MAX_VALUE);
                        }
    
                        @Override
                        public void onNext(Long aLong) {
                            Log.d(TAG, "onNext: " + aLong);
                            try {
                                Thread.sleep(1000);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
    
                        @Override
                        public void onError(Throwable t) {
                            Log.w(TAG, "onError: ", t);
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "onComplete");
                        }
                    });
    

    如果不加背压策略,则会报错:

    D/rxjava2: onSubscribe
    W/rxjava2: onError: 
               io.reactivex.exceptions.MissingBackpressureException: Can't deliver value 128 due to lack of requests
                   at io.reactivex.internal.operators.flowable.FlowableInterval$IntervalSubscriber.run(FlowableInterval.java:87)
                   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:428)
                   at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:278)
                   at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:273)
                   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133)
                   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607)
                   at java.lang.Thread.run(Thread.java:761)
    

    响应式编程

    当上下游在同一个线程中的时候,在下游调用request(n)就会直接改变上游中的requested的值,多次调用便会叠加这个值,而上游每发送一个事件之后便会去减少这个值,当这个值减少至0的时候,上游若继续发送事件便会抛异常了。

    案例情景一:同步环境下,在下游发出10个请求配额情况下,上游发射130个事件。

          Flowable.create(new FlowableOnSubscribe<Integer>() {
                @Override
                public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                        for (int i = 0; i < 130; i++) {
                            e.onNext(i + 1);
                            Log.d(TAG, "requested—left—" + e.requested());
                        }
    //                    e.onComplete();
                }
            }, BackpressureStrategy.ERROR)
                    //下面两行代码执行线程切换,达到异步效果
    //                .subscribeOn(Schedulers.io())
    //                .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Subscriber<Integer>() {
    
                        @Override
                        public void onSubscribe(Subscription s) {
                            subscription = s;
                            subscription.request(10);
                        }
    
                        @Override
                        public void onNext(Integer s) {
                            Log.d(TAG, "接收 ——" + s);
                        }
    
                        @Override
                        public void onError(Throwable t) {
                            Log.d(TAG, "接收错误——" + t);
                        }
    
                        @Override
                        public void onComplete() {
                        }
                    });
    

    打印LOG日志:

    D/rxjava2: 接收 ——1
    D/rxjava2: requested—left—9
    D/rxjava2: 接收 ——2
    D/rxjava2: requested—left—8
    D/rxjava2: 接收 ——3
    D/rxjava2: requested—left—7
    D/rxjava2: 接收 ——4
    D/rxjava2: requested—left—6
    D/rxjava2: 接收 ——5
    D/rxjava2: requested—left—5
    D/rxjava2: 接收 ——6
    D/rxjava2: requested—left—4
    D/rxjava2: 接收 ——7
    D/rxjava2: requested—left—3
    D/rxjava2: 接收 ——8
    D/rxjava2: requested—left—2
    D/rxjava2: 接收 ——9
    D/rxjava2: requested—left—1
    D/rxjava2: 接收 ——10
    D/rxjava2: requested—left—0
    D/rxjava2: 接收错误——io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
    D/rxjava2: requested—left—0
    

    下游不再发送请求配额时,上游的配额令牌就为0。此时上游还有事件强行发个的话,就会出现异常。这里可以验证上面说的结论。

    案例情景二:异步环境,在下游发出10个请求配额情况下,上游发射128个事件。

    只是多了切换线程的代码:

    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    

    打印LOG日志:

    D/rxjava2: requested—left—127
    D/rxjava2: requested—left—126
    D/rxjava2: requested—left—125
    ...
    D/rxjava2: requested—left—2
    D/rxjava2: requested—left—1
    D/rxjava2: requested—left—0
    D/rxjava2: 接收 ——1
    D/rxjava2: 接收 ——2
    D/rxjava2: 接收 ——3
    D/rxjava2: 接收 ——4
    D/rxjava2: 接收 ——5
    D/rxjava2: 接收 ——6
    D/rxjava2: 接收 ——7
    D/rxjava2: 接收 ——8
    D/rxjava2: 接收 ——9
    D/rxjava2: 接收 ——10
    

    异步的情况下,上游先把事件序列内的事件发射完毕,下游才开始接收。如果上游的时间序列超过默认的128个,则上游事件发射到第129个就会报MissingBackpressureException异常,下游就接收不到事件了。这就涉及到下一个知识点了。

    当上下游工作在不同的线程里时,每一个线程里都有一个requested,而我们下游调用request(1000)时,实际上改变的是下游线程中的requested,而上游中的requested的值是由RxJava内部调用request(n)去设置的,这个调用会在合适的时候自动触发。

    何时自动触发呢?我们一起看下:

     Flowable.create(new FlowableOnSubscribe<Integer>() {
                @Override
                public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                    Log.d(TAG, "e.requested() == "+e.requested());
                        for (int i = 0; ; i++) {
                            boolean flag = false;
                            //这里做了一个循环,使发射循环处于保活状态,并在适当时机继续发射事件
                            while (e.requested() == 0){
                                if (!flag){
                                    Log.d(TAG, "e.requested() == "+e.requested());
                                    flag = true;
                                }
                            }
                            e.onNext(i + 1);
                            Log.d(TAG, "已发送" + (i + 1) + "个——剩下" + e.requested());
                        }
                }
            }, BackpressureStrategy.ERROR)
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Subscriber<Integer>() {
    
                        @Override
                        public void onSubscribe(Subscription s) {
                            subscription = s;
                        }
    
                        @Override
                        public void onNext(Integer s) {
                            Log.d(TAG, "接收 ——" + s);
                        }
    
                        @Override
                        public void onError(Throwable t) {
                            Log.d(TAG, "接收错误——" + t);
                        }
    
                        @Override
                        public void onComplete() {
                        }
                    });
    

    外部手动调用request向上游请求发射配额

     if (subscription != null) {
         subscription.request(96);
         Log.d(TAG,"下游请求了96个");
      }
    

    打印Log日志:

    D/rxjava2: e.requested() == 128
    D/rxjava2: 已发送1个——剩下127
    D/rxjava2: 已发送2个——剩下126
    ...
    D/rxjava2: 已发送127个——剩下1
    D/rxjava2: 已发送128个——剩下0
    D/rxjava2: e.requested() == 0
    D/rxjava2: 下游请求了96个
    D/rxjava2: 接收 ——1
    D/rxjava2: 接收 ——2
    ...
    D/rxjava2: 接收 ——95
    D/rxjava2: 接收 ——96
    D/rxjava2: 已发送129个——剩下95
    ...
    D/rxjava2: 已发送223个——剩下1
    D/rxjava2: 已发送224个——剩下0
    D/rxjava2: e.requested() == 0
    

    1,2,3,4行表示启动之后,上游自动想上游的事件序列缓存区发射128个事件。
    8,9...13行表示下游手动请求96个发射配额时接收的事件。此时会自动触发上游继续发送事件,如14,15...17,18行,上游会自动再次发射96个事件(95-0+1=96)。

    如果你下游请求95个发射配额的话,上游不会自动触发事件发射的(这个应该是底层设置的触发阀吧)。

    因此得出结论:当下游每消费96个事件便会自动触发内部的request()去设置上游的requested的值。

    在某一些场景下,可以在发送事件前先判断当前的requested的值是否大于0,若等于0则说明下游处理不过来了,则需要等待,

    注意:是在onNext事件里,onComplete和onError事件不会消耗requested。

    本章完~

    相关文章

      网友评论

        本文标题:RxJava 2.x知识笔记

        本文链接:https://www.haomeiwen.com/subject/yitgbxtx.html