RxJava框架内部采用观察者模式,基于事件流的链式调用、逻辑简洁、使用简单,在Android开发中被广泛的使用。
简单用例:
//创建被观察者
Observable observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(0);
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
});
//创建被观察者
Observer observer = new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
};
//绑定观察者和被观察者
observable.subscribe(observer);
- 创建被观察者。
- 创建观察者。
- 观察者订阅被观察者,连接双方。
一、创建被观察者
首先需要调用Observable的create()方法创建Observable对象。
//Observable.java
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
Observable的create方法需要传递参数ObservableOnSubscribe。ObservableOnSubscribe是一个接口。
public interface ObservableOnSubscribe<T> {
void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
}
而ObservableOnSubscribe接口中的subscribe方法参数是ObservableEmitter, ObservableEmitter继承了Emitter接口。
public interface ObservableEmitter<T> extends Emitter<T> {
void setDisposable(@Nullable Disposable d);
void setCancellable(@Nullable Cancellable c);
boolean isDisposed();
@NonNull
ObservableEmitter<T> serialize();
@Experimental
boolean tryOnError(@NonNull Throwable t);
}
public interface Emitter<T> {
void onNext(@NonNull T value);
void onError(@NonNull Throwable error);
void onComplete();
}
Emitter的英文翻译是发射器,在这里我们可以理解为事件的生产者,调用Emitter#onNext()方法发送事件给观察者。Emitter的onNext()、onError()、onComplete()与观察者Observer的onNext()、onError()、onComplete()方法一一对应。
Observable#create()
再回到Observable.create()方法
这里要说一个Rxjava操作符的套路,Observable.create()会返回ObservableCreate对象,返回对象就是类+方法名,比如Observable.zip()方法就会返回ObservableZip对象。
//Observable.java
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
新建ObservableCreate对象,然后调用RxJavaPlugins.onAssembly()方法返回Observable类型参数。
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
}
- ObservableCreate构造函数需要传递一个ObservableOnSubscribe对象,ObservableCreate内部保存ObservableOnSubscribe为全局变量source。
- ObservableCreate继承了Observable,创建ObservableCreate对象就是创建了Observable对象。
RxJavaPlugins#onAssembly()
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
//onObservableAssembly默认为空
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
RxJavaPlugins中的onObservableAssembly变量默认为空,所以onAssembly()方法是直接把参数source返回出去。
小结:创建被观察者主要经过两个步骤;
(1) 创建ObservableOnSubscribe实例对象;
(2) 把ObservableOnSubscribe对象传递给ObservableCreate的构造方法,创建ObservableCreate对象,把ObservableOnSubscribe对象作为全局变量source保存起来。
(3)把第二步创建的ObservableCreate对象返回出去。
二、创建观察者
public interface Observer<T> {
void onSubscribe(@NonNull Disposable d);
void onNext(@NonNull T t);
void onError(@NonNull Throwable e);
void onComplete();
}
Observer只是一个接口,创建观察者就是创建Observer的实例。
三、让观察者订阅被观察者,连接双方。
observable.subscribe(observer);
1. 调用Observable的subscribe()方法
//Observable.java
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
//非空判断
ObjectHelper.requireNonNull(observer, "observer is null");
try {
...
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
Observable.subscribe()方法也有一个套路,subscribe()最后都会调用Observable的派生类的subscribeActual()方法。
2. 调用subscribeActual(observer)方法。
subscribeActual()是Obserable的抽象方法,而上文有讲到创建Obserable.create最后得到ObservableCreate对象。所以subscribeActual()会调用ObservableCreate对象的subscribeActual()方法。
//ObservableCreate.java
@Override
protected void subscribeActual(Observer<? super T> observer)
//创建CreateEmitter实例对象
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//回调observer的onSubscribe()方法
observer.onSubscribe(parent);
try {
//回调ObservableOnSubscribe的subscribe()方法
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
2.1 首先通过传递observer参数创建CreateEmitter的实例。看一下CreateEmitter这个类
static final class CreateEmitter<T>extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
...
if (!isDisposed()) {
observer.onNext(t);//回调observer的onNext()方法
}
}
@Override
public void onError(Throwable t) {
//调用tryOnError回调observer的onError()方法
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
...
if (!isDisposed()) {
try {
observer.onError(t);//回调observer的onError()方法
} finally {
dispose();
}
return true;
}
return false;
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();;//回调observer的onComplete()方法
} finally {
dispose();
}
}
}
}
observer传给CreateEmitter之后,赋值给全局变量observer。CreateEmitter继承了ObservableEmitter接口,是ObservableEmitter的实现,CreateEmitter中onNext()、onError()、onComplete()方法被调用时会调用观察者observer的同名方法。
2.2 调用observer的onSubscribe方法,说明observable.subscribe(observer)方法执行时,观察者observer的onSubscribe()方法就会马上被回调。
2.3 执行source.subscribe(parent)方法,source为ObservableOnSubscribe的实例,parent为CreateEmitter的实例,那么ObservableOnSubscribe的subscribe()方法会被回调。
Observable observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
e.onNext(0);
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
});
source.subscribe(parent)方法的执行,会通过调用ObservableEmitter的e变量执行onNext方法,也就是执行了CreateEmitter的onNext()方法,CreateEmitter的onNext方法内部会调用observer.onNext()方法。
到这里,RxJava的订阅流程就分析完成了。
Rxjava订阅流程.png总结:
1、创建ObservableOnSubscribe对象。
2、把ObservableOnSubscribe对象传递给ObservableCreate的构造方法,创建ObservableCreate对象(即Obserable对象),把ObservableOnSubscribe对象作为全局变量source保存起来。
3、创建Observer对象。
4、执行observable.subscribe(observer)连接观察者和被观察者,会调用ObservableCreate的subscribeActual方法。
5、把Observer对象传递给CreateEmitter的构造方法,创建CreateEmitter对象。
6、回调observer的onSubscribe()方法。
7、回调ObservableOnSubscribe的subscribe(ObservableEmitter<T> e)方法,参数为CreateEmitter对象;那么 e.onNext(0)方法就会调用CreateEmitter中的onNext()方法,然后回调observer的onNext()方法。
网友评论