RxJava2.0你不知道的事

作者: jdsjlzx | 来源:发表于2017-04-01 16:02 被阅读8976次

    前言

    如果你对RxJava1.x还不是了解,可以参考下面文章。

    1. RxJava使用介绍 【视频教程】
    2. RxJava操作符
    • Creating Observables(Observable的创建操作符) 【视频教程】
    • Transforming Observables(Observable的转换操作符) 【视频教程】
    • Filtering Observables(Observable的过滤操作符) 【视频教程】
    • Combining Observables(Observable的组合操作符) 【视频教程】
    • Error Handling Operators(Observable的错误处理操作符) 【视频教程】
    • Observable Utility Operators(Observable的辅助性操作符) 【视频教程】
    • Conditional and Boolean Operators(Observable的条件和布尔操作符) 【视频教程】
    • Mathematical and Aggregate Operators(Observable数学运算及聚合操作符) 【视频教程】
    • 其他如observable.toList()、observable.connect()、observable.publish()等等; 【视频教程】
    3. RxJava Observer与Subcriber的关系 【视频教程】
    4. RxJava线程控制(Scheduler) 【视频教程】
    5. RxJava 并发之数据流发射太快如何办(背压(Backpressure)) 【视频教程】

    开始

    Rxjava 已经于2016年11月12日正式发布了2.0.1版本。

    RxJava 2.0 已经按照Reactive-Streams specification规范完全的重写了。RxJava2.0 已经独立于RxJava 1.x而存在。

    RxJava2.0相比RxJava1.x,它的改动还是很大的,下面我将带大家了解这些改动。

    RxJava2.0与1.x的区别

    Maven地址

    为了让 RxJava 1.x 和 RxJava 2.x 相互独立,我们把RxJava 2.x 被放在了maven io.reactivex.rxjava2:rxjava:2.x.y 下,类放在了 io.reactivex 包下用户从 1.x 切换到 2.x 时需要导入的相应的包,但注意不要把1.x和2.x混淆了。

    这里写图片描述

    接口变化

    RxJava2.0 是遵循 Reactive Streams Specification 的规范完成的,新的特性依赖其提供的4个基础接口。分别是:

    • Publisher
    • Subscriber
    • Subscription
    • Processor

    在后边的介绍中我们会涉及到。

    Javadoc文档

    官方2.0的 Java 文档 http://reactivex.io/RxJava/2.x/javadoc/

    添加依赖

    Android端使用RxJava需要依赖新的包名:

    //RxJava的依赖包
    compile 'io.reactivex.rxjava2:rxjava:2.0.3'
    //RxAndroid的依赖包
    compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
    

    Nulls

    RxJava1.x中,支持 null 值,如下代码所示:

    Observable.just(null);
    Single.just(null);
    

    RxJava 2.0不再支持 null 值,如果传入一个null会抛出 NullPointerException

    Observable.fromCallable(() -> null)
        .subscribe(System.out::println, Throwable::printStackTrace);
    
    Observable.just(1).map(v -> null)
        .subscribe(System.out::println, Throwable::printStackTrace);
    

    Observable and Flowable

    在本节开始之前,我们先了解下RxJava背压(Backpressure)机制的问题。

    什么是背压(Backpressure)

    在RxJava中,可以通过对Observable连续调用多个Operator组成一个调用链,其中数据从上游向下游传递。当上游发送数据的速度大于下游处理数据的速度时,就需要进行Flow Control了。如果不进行Flow Control,就会抛出MissingBackpressureException异常。

    这就像小学做的那道数学题:一个水池,有一个进水管和一个出水管。如果进水管水流更大,过一段时间水池就会满(溢出)。这就是没有Flow Control导致的结果。

    再举个例子,在 RxJava1.x 中的 observeOn, 因为是切换了消费者的线程,因此内部实现用队列存储事件。在 Android 中默认的 buffersize 大小是16,因此当消费比生产慢时, 队列中的数目积累到超过16个,就会抛出MissingBackpressureException。

    如果你想了解更多关于背压的知识,请参考:

    http://blog.csdn.net/jdsjlzx/article/details/52717636
    http://www.jianshu.com/p/2c4799fa91a4

    下面我们通过一段代码来“感受”一下背压。

    Observable.interval(1, TimeUnit.MILLISECONDS)
           //将观察者的工作放在新线程环境中
           .observeOn(Schedulers.newThread())
           //观察者处理每1000ms才处理一个事件
           .subscribe(new Subscriber<Long>() {
               @Override
               public void onCompleted() {
                   System.out.println("onCompleted");
               }
               @Override
               public void onError(Throwable e) {
                System.out.println("onError :"+ e);
               }
               @Override
               public void onNext(Long value) {
                
                try {
                       Thread.sleep(1000);
                   } catch (InterruptedException e) {
                       e.printStackTrace();
                   }
                System.out.println("onNext value :"+ value);
               }
           });
    

    Flow Control有哪些思路呢?大概是有四种:

    1. 背压(Backpressure);
    2. 节流(Throttling);
    3. 打包处理;
    4. 调用栈阻塞(Callstack blocking)。

    这里限于篇幅的问题,我们就不再一一介绍了,请移步:https://gold.xitu.io/post/58535b5161ff4b0063aa6b10

    如何让Observable支持Backpressure?

    在RxJava 1.x中,有些Observable是支持Backpressure的,而有些不支持。但不支持Backpressure的Observable可以通过一些operator来转化成支持Backpressure的Observable。这些operator包括:

    • onBackpressureBuffer
    • onBackpressureDrop
    • onBackpressureLatest
    • onBackpressureBlock(已过期)

    它们转化成的Observable分别具有不同的Backpressure策略。

    而在RxJava2.0 中,Observable 不再支持背压,而是改用Flowable 支持非阻塞式的背压。Flowable是RxJava2.0中专门用于应对背压(Backpressure)问题而新增的(抽象)类。其中,Flowable默认队列大小为128。并且规范要求,所有的操作符强制支持背压。幸运的是, Flowable 中的操作符大多与旧有的 Observable 类似。

    上面提到的四种operator的前三种分别对应Flowable的三种Backpressure策略:

    • BackpressureStrategy.BUFFER
    • BackpressureStrategy.DROP
    • BackpressureStrategy.LATEST

    onBackpressureBuffer是不丢弃数据的处理方式。把上游收到的全部缓存下来,等下游来请求再发给下游。相当于一个水库。但上游太快,水库(buffer)就会溢出。

    这里写图片描述

    而Consumer即消费者,用于接收单个值,BiConsumer则是接收两个值,Function用于变换对象,Predicate用于判断。这些接口命名大多参照了Java8,熟悉Java8新特性的应该都知道意思,这里也就不再赘述了。

    public interface Consumer<T> {
        void accept(T t) throws Exception;
    }
    

    新的ActionX、FunctionX的方法声明都增加了一个throws Exception,这带来了显而易见的好处,现在我们可以这样写:

    Flowable.just("qq.txt")
        .map(new Function<String, Integer>() {
            @Override
            public Integer apply(String value) throws Exception {
                File file = new File(value);
                file.createNewFile();
                return 99;
            }
        });
    

    而createNewFile方法显式的抛出了一个IOException,而在以前是不可以这样写的。

    Schedulers

    在2.0的API中仍然支持主要的默认scheduler: computation, io, newThread 和 trampoline,可以通过io.reactivex.schedulers.Schedulers这个实用的工具类来调度。

    2.0中不存在immediate 调度器。 它被频繁的误用,并没有正常的实现 Scheduler 规范;它包含用于延迟动作的阻塞睡眠,并且不支持递归调度。你可以使用Schedulers.trampoline()来代替它。

    Schedulers.test()已经被移除,这样避免了默认调度器休息的概念差异。那些返回一个”global”的调度器实例是鉴于test()总是返回一个新的TestScheduler实例。现在我们鼓励测试人员使用这样简单的代码new TestScheduler()。

    io.reactivex.Scheduler抽象类现在支持直接调度任务,不需要先创建然后通过Worker调度。

    操作符的差别

    2.0中大部分操作符仍然被保留,实际上大部分行为和1.x一样。

    关于操作符,引用JakeWharton的总结就是:

    All the same operators(you konw and love or hate and despise) are still there.

    Transformer

    RxJava 1.x 中Transformer实际上就是Func1<Observable,Observable>,换句话说就是提供给他一个Observable它会返回给你另一个Observable,这和内联一系列操作符有着同等功效。

    相关API如下:

    public interface Transformer<T, R> extends Func1<Observable<T>, Observable<R>> {
       // cover for generics insanity
    }
    
    public interface Func1<T, R> extends Function {
        R call(T t);
    }
    

    实际操作下,写个方法,创建一个Transformer调度器:

    //子线程运行,主线程回调
    public Observable.Transformer<T, T> io_main(final RxAppCompatActivity context) {
            return new Observable.Transformer<T, T>() {
    
                @Override
                public Observable<T> call(Observable<T> tObservable) {
    
                    Observable<T> observable = (Observable<T>) tObservable
                            .subscribeOn(Schedulers.io())
                            .doOnSubscribe(new Action0() {
                                @Override
                                public void call() {
                                    DialogHelper.showProgressDlg(context, mMessage);
                                }
                            })
                            .subscribeOn(AndroidSchedulers.mainThread())
                            .observeOn(AndroidSchedulers.mainThread())
                            .compose(RxLifecycle.bindUntilEvent(context.lifecycle(), ActivityEvent.STOP));
    
                    return observable;
    
                }
            };
        }
    
    

    上面这个方法出自本人的Community框架,用法和源码详见:RxHelper.java

    在实际应用中,Transformer 经常和 Observable.compose() 一起使用。本人的Community框架也有使用,这里就不多介绍了。

    在RxJava2.0中,Transformer划分的更加细致了,每一种“Observable”都对应的有自己的Transformer,相关API如下所示:

    public interface ObservableTransformer<Upstream, Downstream> {
        ObservableSource<Downstream> apply(Observable<Upstream> upstream);
    }
    
    public interface CompletableTransformer {
        CompletableSource apply(Completable upstream);
    }
    
    public interface FlowableTransformer<Upstream, Downstream> {
        Publisher<Downstream> apply(Flowable<Upstream> upstream);
    }
    
    public interface MaybeTransformer<Upstream, Downstream> {
        MaybeSource<Downstream> apply(Maybe<Upstream> upstream);
    }
    
    
    public interface SingleTransformer<Upstream, Downstream> {
        SingleSource<Downstream> apply(Single<Upstream> upstream);
    }
    

    这里以FlowableTransformer为例,创建一个Transformer调度器:

    //子线程运行,主线程回调
        public FlowableTransformer<T, T> io_main(final RxAppCompatActivity context) {
            return new FlowableTransformer<T, T>() {
    
    
                @Override
                public Publisher<T> apply(Flowable<T> flowable) {
                    return flowable
                            .subscribeOn(Schedulers.io())
                            .doOnSubscribe(new Consumer<Subscription>() {
                                @Override
                                public void accept(Subscription subscription) throws Exception {
                                    DialogHelper.showProgressDlg(context, mMessage);
                                }
                            })
                            .subscribeOn(AndroidSchedulers.mainThread())
                            .observeOn(AndroidSchedulers.mainThread())
                            .compose(RxLifecycle.<T, ActivityEvent>bindUntilEvent(context.lifecycle(), ActivityEvent.DESTROY));
                }
            };
        }
    

    上面这个方法出自本人的CommunityRxJava2框架,用法和源码详见:RxHelper.java

    其他改变

    doOnCancel/doOnDispose/unsubscribeOn

    在1.x中,doOnUnsubscribe总是执行终端事件,因为SafeSubscriber调用了unsubscribe。这实际上是没有必要的。Reactive-Streams规范中,一个终端事件到达Subscriber,上游的Subscription会取消,因此调用 cancel()是一个空操作。

    由于同样的原因unsubscribeOn也没被在终端路径上调用,但只有实际在链上调用cancel时,才会调用unsubscribeOn。

    因此,下面的序列不会被调用

    doOnCancel

    Flowable.just(1,2,3)
            .doOnCancel(new Action() {
                @Override
                public void run() throws Exception {
                    Log.e(TAG, " doOnCancel " );
                }
            })
            .subscribe(new DisposableSubscriber<Integer>() {
                @Override
                public void onNext(Integer integer) {
                    Log.e(TAG, " onNext : " + integer);
                }
    
                @Override
                public void onError(Throwable t) {
    
                }
    
                @Override
                public void onComplete() {
                    Log.e(TAG, " onComplete isDisposed() = " + isDisposed());
    
                }
            });
    

    输出结果如下:

    onNext : 1
    onNext : 2
    onNext : 3
    onComplete isDisposed() = false
    

    然而,下面将会调用take操作符在传送过程中取消onNext

    
    Flowable.just(1,2,3)
            .doOnCancel(new Action() {
                @Override
                public void run() throws Exception {
                    Log.e(TAG, " doOnCancel " );
                }
            })
            .take(2)
            .subscribe(new DisposableSubscriber<Integer>() {
                @Override
                public void onNext(Integer integer) {
                    Log.e(TAG, " onNext : " + integer);
                }
    
                @Override
                public void onError(Throwable t) {
    
                }
    
                @Override
                public void onComplete() {
                    Log.e(TAG, " onComplete isDisposed() = " + isDisposed());
    
                }
            });
    

    输出结果如下:

    onNext : 1
    onNext : 2
    doOnCancel 
    onComplete isDisposed() = false
    

    使用take操作符,调用了cancel方法,我们看一下take操作符的源码:

      @CheckReturnValue
      @BackpressureSupport(BackpressureKind.SPECIAL) // may trigger UNBOUNDED_IN
      @SchedulerSupport(SchedulerSupport.NONE)
      public final Flowable<T> take(long count) {
          if (count < 0) {
              throw new IllegalArgumentException("count >= 0 required but it was " + count);
          }
          return RxJavaPlugins.onAssembly(new FlowableTake<T>(this, count));
      }
    

    关键点就是这个FlowableTake类,这里限于篇幅的原因就不看源码了,大家可以自己看一下,然后找找是什么地方调用了cancel。

    同样的,如果你需要在终端或者取消时执行清理,考虑使用using操作符代替。


    以上就是RxJava2.0中的改动,下面我们重点介绍下RxJava2.0中的观察者模式。

    RxJava2.0中的观察者模式

    RxJava始终以观察者模式为骨架,在2.0中依然如此。

    在RxJava2.0中,有五种观察者模式:

    1. Observable/Observer
    2. Flowable/Subscriber
    3. Single/SingleObserver
    4. Completable/CompletableObserver
    5. Maybe/MaybeObserver

    后面三种观察者模式差不多,Maybe/MaybeObserver可以说是Single/SingleObserverCompletable/CompletableObserver的复合体。

    下面列出这五个观察者模式相关的接口。

    Observable/Observer

    public abstract class Observable<T> implements ObservableSource<T>{...}
    
    public interface ObservableSource<T> {
        void subscribe(Observer<? super T> observer);
    }
    
    public interface Observer<T> {
        void onSubscribe(Disposable d);
        void onNext(T t);
        void onError(Throwable e);
        void onComplete();
    }
    

    Completable/CompletableObserver

    //代表一个延迟计算没有任何价值,但只显示完成或异常。类似事件模式Reactive-Streams:onSubscribe(onError | onComplete)?
    public abstract class Completable implements CompletableSource{...}
    
    //没有子类继承Completable
    public interface CompletableSource {
        void subscribe(CompletableObserver cs);
    }
    
    public interface CompletableObserver {
        void onSubscribe(Disposable d);
        void onComplete();
        void onError(Throwable e);
    }
    

    Flowable/Subscriber

    public abstract class Flowable<T> implements  Publisher<T>{...}
    
    public interface Publisher<T> {
        public void subscribe(Subscriber<? super T> s);
    }
    
    public interface Subscriber<T> {
        public void onSubscribe(Subscription s);
        public void onNext(T t);
        public void onError(Throwable t);
        public void onComplete();
    }
    

    Maybe/MaybeObserver

    //Maybe类似Completable,它的主要消费类型是MaybeObserver顺序的方式,遵循这个协议:onSubscribe(onSuccess | onError | onComplete)
    public abstract class Maybe<T> implements MaybeSource<T>{...}
    
    public interface MaybeSource<T> {
        void subscribe(MaybeObserver<? super T> observer);
    }
    
    public interface MaybeObserver<T> {
        void onSubscribe(Disposable d);
        void onSuccess(T t);
        void onError(Throwable e);
        void onComplete();
    }
    

    Single/SingleObserver

    //Single功能类似于Observable,除了它只能发出一个成功的值,或者一个错误(没有“onComplete”事件),这个特性是由SingleSource接口决定的。
    public abstract class Single<T> implements SingleSource<T>{...}
    
    public interface SingleSource<T> {
        void subscribe(SingleObserver<? super T> observer);
    }
    
    public interface SingleObserver<T> {
        void onSubscribe(Disposable d);
        void onSuccess(T t);
        void onError(Throwable e);
    }
    

    其实从API中我们可以看到,每一种观察者都继承自各自的接口(都有一个共同的方法subscrib()),但是参数不一样),正是各自接口的不同,决定了他们功能不同,各自独立(特别是Observable和Flowable),同时保证了他们各自的拓展或者配套的操作符不会相互影响。

    这里写图片描述

    下面我们重点说说在实际开发中经常会用到的两个模式:Observable/Observer和Flowable/Subscriber。

    Observable/Observer

    Observable正常用法:

    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onComplete();
        }
    }).subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
    
        }
    
        @Override
        public void onNext(Integer integer) {
    
        }
    
        @Override
        public void onError(Throwable e) {
    
        }
    
        @Override
        public void onComplete() {
    
        }
    });
    

    需要注意的是,这类观察模式不支持背压,下面我们具体分析下。

    当被观察者快速发送大量数据时,下游不会做其他处理,即使数据大量堆积,调用链也不会报MissingBackpressureException,消耗内存过大只会OOM。

    在测试的时候,快速发送了100000个整形数据,下游延迟接收,结果被观察者的数据全部发送出去了,内存确实明显增加了,遗憾的是没有OOM。

    所以,当我们使用Observable/Observer的时候,我们需要考虑的是,数据量是不是很大(官方给出以1000个事件为分界线,供各位参考)。

    Flowable/Subscriber

    Flowable.range(0, 10)
            .subscribe(new Subscriber<Integer>() {
                Subscription subscription;
    
                //当订阅后,会首先调用这个方法,其实就相当于onStart(),
                //传入的Subscription s参数可以用于请求数据或者取消订阅
                @Override
                public void onSubscribe(Subscription s) {
                    Log.d(TAG, "onsubscribe start");
                    subscription = s;
                    subscription.request(1);
                    Log.d(TAG, "onsubscribe end");
                }
    
                @Override
                public void onNext(Integer o) {
                    Log.d(TAG, "onNext--->" + o);
                    subscription.request(3);
                }
    
                @Override
                public void onError(Throwable t) {
                    t.printStackTrace();
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            });
    

    输出结果如下:

    onsubscribe start
    onNext--->0
    onNext--->1
    onNext--->2
    onNext--->3
    onNext--->4
    onNext--->5
    onNext--->6
    onNext--->7
    onNext--->8
    onNext--->9
    onComplete
    onsubscribe end
    

    Flowable是支持背压的,也就是说,一般而言,上游的被观察者会响应下游观察者的数据请求,下游调用request(n)来告诉上游发送多少个数据。这样避免了大量数据堆积在调用链上,使内存一直处于较低水平。

    当然,Flowable也可以通过create()来创建:

    Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }
        }, BackpressureStrategy.BUFFER);//指定背压策略
    

    Flowable虽然可以通过create()来创建,但是你必须指定背压的策略,以保证你创建的Flowable是支持背压的(这个在1.0的时候就很难保证,可以说RxJava2.0收紧了create()的权限)。

    根据上面的代码的结果输出中可以看到,当我们调用subscription.request(n)方法的时候,不等onSubscribe()中后面的代码执行,就会立刻执行onNext方法,因此,如果你在onNext方法中使用到需要初始化的类时,应当尽量在subscription.request(n)这个方法调用之前做好初始化的工作;

    当然,这也不是绝对的,我在测试的时候发现,通过create()自定义Flowable的时候,即使调用了subscription.request(n)方法,也会等onSubscribe()方法中后面的代码都执行完之后,才开始调用onNext。

    平滑升级

    RxJava1.x 如何平滑升级到RxJava2.0呢?

    由于RxJava2.0变化较大无法直接升级,幸运的是,官方提供了RxJava2Interop这个库,可以方便地将RxJava1.x升级到RxJava2.0,或者将RxJava2.0转回RxJava1.x。

    地址:https://github.com/akarnokd/RxJava2Interop

    总结

    可以明显的看到,RxJava2.0最大的改动就是对于backpressure的处理,为此将原来的Observable拆分成了新的Observable和Flowable,同时其他相关部分也同时进行了拆分。

    除此之外,就是我们最熟悉和喜爱的RxJava。

    相关文章

      网友评论

      • zpayh:机智的我创建了100000个bitmap,立马OOM了
        zpayh:@A_si 应该不是
        A_si:这个是大胖子吗

      本文标题:RxJava2.0你不知道的事

      本文链接:https://www.haomeiwen.com/subject/tjwiottx.html