如需下载源码,请访问
https://github.com/fengchuanfang/Rxjava2Tutorial
文章原创,转载请注明出处:
Rxjava2入门教程二:Observable与Observer响应式编程在Rxjava2中的典型实现
在RxJava中,函数响应式编程具体表现为一个观察者(Observer)订阅一个可观察对象(Observable),通过创建可观察对象发射数据流,经过一系列操作符(Operators)加工处理和线程调度器(Scheduler)在不同线程间的转发,最后由观察者接受并做出响应的一个过程
ObservableSource与Observer是RxJava2中最典型的一组观察者与可观察对象的组合,其他四组可以看做是这一组的改进版或者简化版。
Observable
抽象类Observable是接口ObservableSource下的一个抽象实现,我们可以通过Observable创建一个可观察对象发射数据流。
demo1_1.jpg上例中,调用Observable.create创建一个可观察对象,并发送“Hello World”,然后通知发送完成
Observer
创建一个观察者Observer来接受并响应可观察对象发射的数据流
demo1_2.jpg在onNext方法中接收到可观察对象发射的数据"Hello World",并做出响应——打印到控制台。
Observer订阅Observable
demo1_3.jpg一旦Observer与Observable建立了订阅关系,Observer与Observable便成为了一个整体,Observer便可对Observable中的行为作出响应。
Emitter/Observer
通过Observable.create创建可观察对象时,我们可以发现具体执行发射动作的是接口ObservableEmitter的实例化对象,而ObservableEmitter<T> 继承自 接口Emitter<T>,查看源码接口Emitter的具体代码如下:
public interface Emitter<T> {
//用来发送数据,可多次调用,每调用一次发送一条数据
void onNext(@NonNull T value);
//用来发送异常通知,只发送一次,若多次调用只发送第一条
void onError(@NonNull Throwable error);
//用来发送完成通知,只发送一次,若多次调用只发送第一条
void onComplete();
}
onNext:用来发送数据,可多次调用,每调用一次发送一条数据
onError:用来发送异常通知,只发送一次,若多次调用只发送第一条
onComplete:用来发送完成通知,只发送一次,若多次调用只发送第一条
onError与onComplete互斥,两个方法只能调用一个不能同时调用,数据在发送时,出现异常可以调用onError发送异常通知也可以不调用,因为其所在的方法subscribe会抛出异常,若数据在全部发送完之后均正常可以调用onComplete发送完成通知;其中,onError与onComplete不做强制性调用。
接口Observer中的三个方法(onNext,onError,onComplete)正好与Emitter中的三个方法相对应,分别对Emitter中对应方法的行为作出响应。
Emitter调用onNext发送数据时,Observer会通过onNext接收数据。
Emitter调用onError发送异常通知时,Observer会通过onError接收异常通知。
Emitter调用onComplete发送完成通知时,Observer会通过onComplete接收完成通知。
步骤简化
去掉中间变量可以对之前的代码简化为以下形式:
demo2.jpg在响应式编程的基础上,加上函数式编程,真正的函数响应式编程可以将代码简化成以下形式:
demo3.jpg其中,just操作符是经过封装后,专门用来发射单条数据的,可以是一个数据,一条字符,一个对象,一整个数组,一整个集合。
Consumer可以看做是对观察者Observer功能单一化之后的产物——消费者,上例中的Consumer通过其函数accept只接收可观察对象发射的数据,不接收异常信息或完成信息。
如果想接收异常信息或完成信息可以用下面的代码:
第二个参数Consumer规定泛型<Throwable>通过函数accept接收异常信息。
第三个参数Action也是对观察者Observer功能单一化之后的产物--行动,通过函数run接收完成信息,作出响应行动。
发送数据序列
Observable可以发送单条数据或者数据序列,上面的事例都是发送单条数据'Hello World"的情形,那么怎样发送数据序列呢?
可以通过最基础的方法:
通过在方法subscribe中循环遍历String类型的集合list中的元素,然后通过emitter.onNext(str)将他们逐一发送;如果发送过程中捕获到异常,通过emitter.onError(e)发送异常信息;最后如果数据正常发送完毕调用 emitter.onComplete()发送完成通知,Observer中通过onNext接收emitter发送的每一条信息并打印到控制台(emitter发送几次,Observer便接收几次),通过onError(Throwable e)接收异常信息,onComplete()接收完成信息。
同样可以通过操作符对其进行简化,如下;
其中fromIterable(list)也是一个封装好的操作符,可以将一个可迭代对象中的每一个元素逐一发送
Disposable
在之前的例子中,可以看到Observer接口中还有一个方法没有说
public void onSubscribe(Disposable d) {
}
这个方法中有个Disposable类型的参数,
onSubscribe表示在订阅时,当观察者Observer订阅可观察对象Observable,建立订阅关系后,会触发这个方法,并且会生成一个Disposable对象,其实无论观察者Observer以何种方式订阅可观察对象Observable,都会生成一个Disposable,不管有没有onSubscribe(Disposable d)方法,如下:
查看Disposable接口的源码,如下:
public interface Disposable {
void dispose();
boolean isDisposed();
}
Disposable是观察者Observer与可观察对象Observable建立订阅关系后生成的用来取消订阅关系和判断订阅关系是否存在的一个接口。
只有当观察者Observer与可观察对象Observable之间存在订阅关系时,Observer才能接收Observable发送的数据或信息。如果Observer在接收Observable的信息的过程中,取消了订阅关系,则Observer只能接收订阅关系取消之前Observable发送的数据,对于订阅关系取消之后Observable发送的数据,Observer将不会再接收。
运行下面的代码,当Observable接收到第5条数据时,取消订阅关系。
控制台日志如下:
I/System.out: 发送0
I/System.out: 接收0
I/System.out: 发送1
I/System.out: 接收1
I/System.out: 发送2
I/System.out: 接收2
I/System.out: 发送3
I/System.out: 接收3
I/System.out: 发送4
I/System.out: 接收4
I/System.out: 发送5
I/System.out: 接收5
I/System.out: 发送6
I/System.out: 发送7
I/System.out: 发送8
I/System.out: 发送9
可以发现取消订阅关系之前,Observable发送一条数据,Observe接收一条,取消订阅关系之后,Observe将不再接收Observable发送的数据。
网友评论