Rxjava基于观察者模式,一个完整的观察者模式要有三个角色对象:观察者,被观察者,事件。
而且在异步观察者模式中,一般会有三个相关的线程:被观察者线程(事件产生的线程),观察者线程(处理事件的线程),订阅关系产生的线程(一般都是主线程)。
一般情况下,在同步的观察者模式中(或者我们不进行任何线程的切换),事件产生的线程和事件处理线程还有订阅关系发生的线程是同一个线程(这个线程就是订阅关系产生的线程)。
RxJava的观察者模式是一种扩展的观察者模式,一个被观察者可以发送若干的事件,这些事件组成了一个事件队列。我们可以针对每一个事件或者或者整个事件队列进行处理,处理完之后再发送给观察者。
下面来介绍一下Rxjava中这三个角色对象
-
Observer或者Subscriber,这是观察者对象,用来处理事件。
Observer处理事件有三个方法:
onNext()
,onCompleted()
,onError
,onNext()
用来处理普通事件,onCompleted()
在事件队列已经发送完普通事件的时候来调用,onError()
在事件队列发生错误的时候来调用onCompleted()
和onError()
是互斥的,事件队列正常发送完普通事件的时候,应该调用onCompleted()
,出错的时候应该调用onError()
Subscriber相比于Observer增加了两个方法,
onStat()
和unsubscribe()
,-
onStart
在订阅关系产生,但是还没有发送事件的时候调用,我们可以在这里做一些初始化的工作。但是要注意,onStart
方法的线程是在订阅关系产生的线程,也就是说如果订阅关系是在主线程产生的,onStart
就工作在主线程中,如果订阅关系是在工作线程产生的,那么onStart
就工作在工作线程。如果我们对于初始化的线程有要求,比如要求初始化必须在主线程进行,那么我们可以使用
doOnSubscribe()
方法,该方法也是在订阅关系产生但是还没有发送事件的时候调用,但是该方法可以指定自己要运行在哪一个线程中。 -
unsubscribe()
该方法是Subscription
接口的方法,该接口是对 订阅关系 的一个抽象,代表了一个订阅关系,Subscriber
实现了这个接口,并且在subscribe()
方法中返回了这个接口,借助这个接口,当我们不在需要订阅被观察者的时候,可以通过该接口的unsubscribe
方法解除订阅关系,从而防止内存泄漏。
-
-
Observable 被观察者对象,用来产生并发送事件
-
事件对象,我们在创建被观察者对象的时候,要指定被观察者能够产生什么样子的事件,也就是事件的类型
接下来看一下如何产生观察者对象和被观察者对象
-
观察者对象的生成
正常的new方法来生成观察者对象
Subscriber subscriber = new Subscriber(String s){ @Override public void onNext(String s){ } @Override public void onCompleted(){ } @Override public void onError(Throwable e){ } }
这就生成了一个观察者对象,该观察者感兴趣的事件类型是String
-
被观察者对象的生成
Rxjava提了了几个不同的方法来方便我们生成被观察者对象
-
Observable.create静态方法
Observable observable = Observable.create(new Observable.OnSubscribe(){ @Override public void call(Subscriber subscriber){ subscriber.onNext("hello"); subscriber.onNext("world"); subscriber.onCompleted(); } });
Observable.create方法接受Observable的内部类Onsubscribe作为参数,并把该OnSubscriber存储起来,当订阅关系真正发生的时候,通过OnSubscriber来调用观察者的对应方法,实现消息的发送。
(Observable为什么不直接存储Subscriber的引用?因为这个时候订阅关系还没有真正发生,拿不到啊)
-
Observable.just(T...)
just静态方法,通过传入一系列的事件对象T来生成一个Observable对象,当订阅关系发生的时候,就把一系列的事件对象发送到Subscriber的
onNext
方法中 -
Observable.from(Iterator it)
from静态方法,通过传入一个迭代器,把迭代器中的内容作为一系列的事件对象,从而生成一个Observable对象,当订阅关系发生的时候,把迭代器中的内容作为事件发送给Subscriber的
onNext
方法。
-
-
订阅关系的产生
通过
observable.subscribe(subscriber)
来发生订阅关系
那我们看一下在发生关系的时候都做了哪些事情,引用 扔物线中的一段代码
public Subscription subscribe(Subscribe subscribe){
subscribe.onStart();
onSubscribe.call(subscribe);
return subscribe;
}
所以,还记得我们说Subscriber实现了Subscription接口,以方便在将来unsubscribe
吗?
还记得前面我们说Subscriber的onStart
方法在 订阅关系 产生的线程中执行吗?
还记得我们Observable的静态方法create()需要传入一个OnSubscribe对象作为参数吗?
通过这段代码就都清楚了。
还有什么的要说的吗?
就是我们每次在发生订阅关系的时候,都需要一个被观察者和一个观察者,而观察者一般都要实现onNext
,onCompleted
,onError
这三个方法,我们在开发中一般并不是三个方法都关心,比如只关心onNext
方法,RxJava给我们提供了subscribe
方法的几个重载版本,来使我们更方便的实现订阅。
比如
observable.subscribe(new Action1<String>(){
@Override
public void call(String s){
}
});
先说一下ActionX<T>接口,这个接口有一个方法call(),X为N,则call方法就有N个参数,并且ActionX的call方法是没有返回值的。
比如Action1<String>就说明call
方法有一个参数,该参数的类型是String。而onNext
方法也是无返回值,也是只有一个参数,所以在上面的代码中,RxJava就会帮助我们生成一个Subscriber,并且在这个为我们生成的Subscriber的onNext
方法中,调用这个Action1的call
方法。类似于Python中的闭包。
网友评论