美文网首页
Rxjava2.0的改动

Rxjava2.0的改动

作者: mcivicm | 来源:发表于2017-05-09 10:54 被阅读0次

RxJava2.0已经完全基于Reactive-Streams库重写,Reactive-Streams是从RxJava 1.x版本中分离出来一个库,为响应式系统开发库提供基本规格。

因为Reactive-Streams有 的着不同设计风格,它改变了很多RxJava中一些被熟知的类型。RxJava项目的wiki页概括了这些改变以及如何把1.x的代码转到2.x的代码。

目录:

Maven address and base package

Javadoc

Nulls

Observable and Flowable

Single

Completable

Maybe

Base reactive interfaces

Subjects and Processors

Other classes

Functional interfaces

Subscriber

Subscription

Backpressure

Reative-Stream compliance

Runtime hooks

Error handling

Scheduler

Entering the reactive world

Leaving the reactive world

Testing

Operator differences

Miscellaneous Changes

Maven地址和基本包名

RxJava 1.x和RxJava 2.x现已分开管理,RxJava 2.x的maven地址是io.reactivex.rxjava2:rxjava:2.x.y包名是io.reactivex.

Javadoc

官方的Javadoc网页托管在http://reactivex.io/RxJava/2.x/javadoc/

Nulls

RxJava 2.x不再接收null,下面的代码会马上产生NullPointerException给下游的流。

Observable.just(null);

Single.just(null);

Observable.fromCallable(()->nul)).subscribe(System::out::println,Throwable::printStackTrace);

Observable.just(1).map(v->null).subscribe(System::out::println,Throwable::printStrackTrace);

这意味着Observable<Void>不会发射任何值,而只会正常中断或者抛出一个异常。要实现该功能,可以定义一个以Object作为泛型的Observable,即Observable<Object>,这里的Object是一个无关紧要的类。一个比较好的例子就是定义一个只有一个值的枚举类型:

enum Irrelevant{

INSTANCE;

}

Observable<Object> source=Observable.create((ObservableEmitter<Object> emitter)->{

System.out.println("Side-effect 1");

emitter.onNext(Irrelevant.INSTANCE);

System.out.println("Side-effect 2");

emitter.onNext(Irrelevant.INSTANCE);

System.out.println("Side-effect 3");  

emitter.onNext(Irrelevant.INSTANCE);

});

source.subscribe(e->{/*这里我们并不关心发射的对象是什么,即使如此,被发射的对象也不能为null或者Void*/},Throwable::printStackTrace);

Observable and Flowable

RxJava 0.x 引入背压的时候没有单独定义一个基类,导致Observable概念复杂而多样(一词多义)。对于一些常见的数据源,比如UI事件,是不能背压的,但是使用Observable却不得不背压,从而导致UI事件也会触发MissBackpressureException异常。

为此,2.x版本定义Flowable来表示可背压的数据源,而Observable则表示不能背压的数据源。

好消息是操作符(大部分)维持原样,坏消息是在导包时需要特别注意,不然一不小心就会在需要背压时引入了不支持背压的Observable。

选择Observable还是Flowable?

避免出错需要考虑的因素:

何时使用Observable

·个数不超过1000的数据流:数量少以至于几乎不可能出现Out Of Memory异常

·GUI事件,比如鼠标事件,触点事件:这些数据源通常很少出现背压并且不频繁。可以考虑使用sampling/debouncing来处理频率接近或者小于1000Hz的数据源。

·在不支持Java Streams的平台使用Streams特性。

何时使用Flowable

·超过10k+的数据源,并且可以控制数据源生成的数据数量

·读取文件

·读取JDBC数据库

·网络IO请求

·阻塞或者拉取式数据源

Single

2.x版本Single响应式数据源,发射一个onSuccess或者onError事件,根据Reactive-Streams规格重写。Single数据源的消费者(rx.Single.SingleSubscriber<T>)已经从一个实现rx.Subscription接口的类变成一个具有三个方法的接口io.reactivex.SingleObserver<T>:

interface SingleObserver<T>{

   void onSubscribe(Disposable d);

   void onSuccess(T value);

   void onError(Throwable error);

}

Single数据源的onSuccess和onError的事件只发送一个。

Completable

Completable数据源大体保持不变。1.x版本在设计时就和Reactive-Streams规范符合得很好,所以没有用户层面的改动。

和其他类一样,rx.Completable.CompletableSubscriber迁移到io.reactivex.CompletableObserver,并且增加了方法onSubscribe(Disposable):

interface CompletableObserver<T> {

   void onSubscribe(Disposable d);

   void onComplete();

   void onError(Throwable error);

}

同样onCompleteonError事件只触发一次。

Maybe

RxJava 2.0.0-RC2介绍了一种新的响应式数据源,叫做Maybe。从概念上讲,Maybe是Single和Completable的结合体,它提供捕获从某些响应式数据源发射的信号,或0个或1个或error,的方法。

Maybe依赖一个MaybeSource作为基本接口类型,依赖一个MaybeObserver<T>作为信号接收的接口,并且遵从协议onSuccess,onError,onComplete事件只发送一次。因为至多只有一个元素被发射,所有Maybe没有背压的概念(因为长度未知的Flowable或者Observable没有缓冲膨胀的可能性)。

这意味着调用onSubscribe(Disposable)之后会紧跟着一个onXXX方法。和Flowable不同的是,如果只有一个信号被发射,只有onSuccess会被调用和onComplete不会。

Maybe这个新的响应式数据源和其他的数据源没什么两样,但是它提供的操作符是Flowable操作符的一个子集,这个子集处理0个或者1个的数据序列。

基本响应式接口

正如Flowable实现了Reactive-Streams的Publisher<T>接口一样,其他的基本响应式数据源也实现了类似的接口:

interface ObservableSource<T>{

   void subscribe(Observer<? super T> observer);

}

interface SingleSource<T>{

   void subscribe(SingleObserver<? super T> observer);

}

interface CompletableSource{

   void subscribe(CompletableObserver observer);
}

interface MaybeSource<T>{

   void subscribe(MaybeObserver<? super T> observer);

}

所以,很多需要传入响应式基本数据源的操作函数现在可以接受Publisher和XSource对象。

Flowable<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper);

Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper);

Publisher(Flowable实现了该接口)作为数据源时,可以在不把这些操作函数转化为Flowable的情况下组合(compose 在用于主动语态时,一般它所表示的“构成”或“组成”总包含着融合为一,而且主语或者是复数名词或者是集体名词)其他的Reactive-Streams操作函数(比如subScribeOn, observeOn等):

(这句话怎么理解呢?因为很多操作函数的返回值是Publisher, 而不是Flowable, 但是不需要手动把Publisher转成Flowable就能保证链式调用的连续性)

source.compose((Flowable<T> flowable)->

flowable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()));

如果一个操作函数返回一个响应式基本类型,那么将会返回全称(但是Xsource基本数据源不会有此现象,因为Xsource类型的数据源不提供这样的函数,比如Single就没有window函数):

Flowable<Flowable<Integer>> windows=source.window(5);

Subjects and Processors

在Reactive-Streams规格中,subject特性,即是消费者的同时又是供应商的特性,由接口org.reactivestreams.Processor实现。鉴于Observable和Flowable分开,Reactive-Streams背压机制的实现是基于FlowableProcessor<T>类(该类继承于Flowable,具有丰富的操作符)。关于Subject(即Reactive-Streams的FlowableProcessor),一个重要的改动是它们不再提供T->R的转换(即输入T,输出R)。(因为1.x版本中从未使用过该转换,之所以1.x版本中重载该转换,是因为该转换来自于.NET(原来是借鉴.NET),在.NET中,该函数能够接收多个不同类型的参数)。

类io.reactivex.subjects.AsyncSubject, io.reactivex.subjects.BehaviorSubject, io.reactivex.subjects.PublishSubject, io.reactivex.subjects.ReplaySubject和io.reactivex.subjects.UnicastSubject, 在2.x版本中不支持背压(作为2.x版本Observable家族的一部分)。

类io.reactivex.processors.AsyncSubject, io.reactivex.processors.BehaviorSubject, io.reactivex.processors.PublishSubject, io.reactivex.processors.ReplaySubject和io.reactivex.processors.UnicastSubject支持背压。BehaviorProcessorPublishProcessor不会协调下游消费者的请求,如果下游消费者消费速度跟不上,就会收到MissingBackpressureException异常。其他XProcessor类型遵从下游消费者的背压(跟不上时不会抛出异常,等待下游消费者消费完)。但除此之外,这些XProcessor订阅一个数据源时,它们以无限制的方式消费(即reqesting Long.MAX_VALUE)。

TestSubject

1.x版本的TestSubject被移除。它的功能通过TestScheduler, PublishProcessor/PublishSubject和observerOn(testScheduler)/scheduler参数实现。

TestScheduler scheduler=new TestScheduler();

PublishSubject<Integer> ps=PublishSubject.create();

TestObserver<Integer> ts=ps.delay(1000, TimeUnit.MILLISECONDS, scheduler)).test();

(delay方法属于Observable<T>,返回一个带有scheduler的Observable<T>,这个schedule里面维护了一个时间,这个时间是可以手动进行调节的)

ts.assertEmpty();

ps.onNext(1);

scheduler.advanceTimeBy(999,TimeUnit.MILLISECONDS);

ts.assertEmpty();

scheduler.advanceTimeBy(1,TimeUnit.MILLISECONDS);

ts.assertValue(1);

SerializedSubject

SerializedSubject不再是个公开类,取而代之的是通过Subject.toSerialized()和FlowableProcessor.toSerialized()方法间接获取。

其他类

类rx.observables.ConnectableObservable分开为两个类io.reactivex.observables.ConnectableObservable<T>和io.reactivex.flowables.ConnectableFlowable<T>

GroupedObservable

rx.observables.GroupedObservable也拆分为两个:io.reactivex.observables.GroupedObservable<T>和io.reactivex.flowables.GroupedFlowable<T>

1.x版本中,可以通过GroupedObservable.from()来创建该类的实例,但是2.x版本,所有实例的类都必须的类必须继承GroupedObservable,1.x版本中所有创建该类实例的工厂方法都不在使用;GroupedObservable变成一个抽象类。

可以继承该抽象类并重载subscribeActual方法来实现1.x版本中相似的特性。

class MyGroup<K,V> extends GroupedObservable<K,V>{

    final K key;

    final Subject<V> subject;

    public MyGroup(K key){

    this.key=key;

    this.subject=PublishSubject.create();

@Override

public T getKey(){

     return key;

}

@Override

protected void subscribeActual(Observer<? super T> observable){

    subject.subscribe(observer);

}

相关文章

  • Rxjava2.0的改动

    RxJava2.0已经完全基于Reactive-Streams库重写,Reactive-Streams是从RxJa...

  • Rxjava系列(六) RxJava2.0操作符详解

    Rxjava2.0概述 通过前面的文章介绍,读者对RxJava2.0应该有了初步的认识。RxJava2.0相对1....

  • RXjave总结

    文章 给初学者的RxJava2.0教程(一)给初学者的RxJava2.0教程(二)

  • RxJava

    教程 给初学者的RxJava2.0教程(一) 给初学者的RxJava2.0教程(二) 给初学者的RxJava2.0...

  • RxJava2.0的使用

    这里的讲解比较简单,易懂 给初学者的RxJava2.0教程(一) :基本工作原理给初学者的RxJava2.0教程(...

  • RxJava2.0源码初探

    RxJava2.0源码初探 RxJava2.0的源码相对于1.0发生了很大的变化, 命名方式也发生了很大变化, 下...

  • Rxjava2.0 发生订阅关系 的源码解析

    由于要做一场关于rxjava2.0 的内部分享,本人便怀着期待的心情去了解了下rxjava2.0,关于rxjava...

  • #RxJava2.0 操作符(2)—— Transforming

    RxJava2.0 操作符(2)—— Transform 转换符 Transforming 转换符 Buffer ...

  • 改动

    1、加删除页面 页面html改动部分(1)表格改动地方 (2)帖子改动部分 2、个人新增和改动部分 3、设置 9/...

  • 改动

    车水马龙的城市 心灰意冷 我在虐风四起的荒野 安身立命 孤独不听话 还有一些寂寞 下落不明 微风吹皱了湖水 夜色掩...

网友评论

      本文标题:Rxjava2.0的改动

      本文链接:https://www.haomeiwen.com/subject/fdrbtxtx.html