标准观察者设计模式
RxJava是一种特殊的观察者模式,首先我们先来看标准的观察者设计模式。在标准观察者模式中,存在两种对象,一种是观察者,一种是被观察者,“被观察者与“观察者之间是一对多的关系。当被观察者发出通知改变的时候,观察者才能察觉到。
Observerable.java
public interface Observerable {
private List<Observer> observers = new ArrayList<>();
public void addObserver(Observer observer) ;
public void removeObserver(Observer observer);
public void notifyObservers() ;
public void pushMessage(String msg);
}
Observer.java
public interface Observer{
void update();
}
RxJava Hook(钩子)
Hook技术又叫钩子函数,在系统没有调用函数之前,钩子就先捕获该消息,得到控制权。这时候钩子程序既可以改变该程序的执行,插入我们要执行的代码片段,还可以强制结束消息的传递。
RxJava中的hook的使用
RxJavaPlugins.setOnObservableAssembly(new Function<Observable, Observable>() {
@Override
public Observable apply(Observable observable) throws Throwable {
return observable;
}
});
来观察这么一个代码段:
public class MainActivity extends AppCompatActivity {
@InjectView(R.id.tv_text)
TextView textView;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
RxJavaPlugins.setOnObservableAssembly(new Function<Observable, Observable>() {
@Override
public Observable apply(Observable observable) throws Throwable {
System.out.println("apply : " + observable);
return observable;
}
});
testHook();
}
private void testHook() {
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Object> e) throws Throwable {
e.onNext(null);
}
}).map(new Function<Object, Boolean>() {
@Override
public Boolean apply(Object o) throws Throwable {
return null;
}
}).subscribe(subscribe(new Observer<Boolean>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Boolean aBoolean) {
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
}
}
运行后Logcat
打印出
2021-01-18 14:18:53.452 7793-7793/com.example.anatationtest I/System.out: apply : io.reactivex.rxjava3.internal.operators.observable.ObservableCreate@1569e83
2021-01-18 14:18:53.452 7793-7793/com.example.anatationtest I/System.out: apply : io.reactivex.rxjava3.internal.operators.observable.ObservableMap@27f9800
可以看到ObservableCreate
和ObservableMap
都被hook方法捕获了,这样就可以实现RxJava的全局监听。观察create()
源码:
public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
Objects.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
}
和map()
方法源码:
public final <R> Observable<R> map(@NonNull Function<? super T, ? extends R> mapper) {
Objects.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<>(this, mapper));
}
我们再看RxJavaPlugins.onAssembly()
方法:
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
在这里有个onObservableAssembly
,如果该变量不为空,则为Observable
对象应用apply(f,source)
,再往前寻找,找到该变量被赋值的地方,发现就是在我们前面通过setOnObservableAssembly()
函数设置全局监听,顾名思义,就是设置被观察者的装配。
public static void setOnObservableAssembly(@Nullable Function<? super Observable, ? extends Observable> onObservableAssembly) {
if (lockdown) {
throw new IllegalStateException("Plugins can't be changed anymore");
}
RxJavaPlugins.onObservableAssembly = onObservableAssembly;
}
RxJava 观察者设计模式
两者观察者设计模式对比
在标准观察者设计模式中,“被观察者”与“观察者”是一对多的关系,并且需要被观察者发出改变通知后,所有的观察者才能观察到。在RxJava观察者设计模式中,“被观察者”与“观察者”是多对一的关系,并且需要在起点和终点订阅一次后,才能发出改变通知,也可以称之为发布订阅模式。
在标准观察者模式中,被观察者中的容器直接存放观察者的引用,耦合度更高。RxJava中,被观察者与观察者通过抽象层发射器连接,降低了耦合度。
- Observer源码
- Observerable创建过程,源码分析
- subscribe订阅过程,源码分析
- map转换流程,源码分析
Observer
//Observer是一个泛型接口,包含四个抽象方法
public interface Observer<@NonNull T> {
//当Observer对象被Observerable对象subscribe时候马上调用
//d 为此次订阅的事件对象,如果调用dispose,则订阅会被杀死
void onSubscribe(@NonNull Disposable d);
//把被观察者从上游传递来的对象给观察者
void onNext(@NonNull T t);
//通知观察者,被观察者发送了错误,调用此方法将不会调用onNext()和onComplete()方法
void onError(@NonNull Throwable e);
//通知观察者,被观察者已经发送完事件,如果抛出onError()错误则不会调用
void onComplete();
}
Observerable创建过程
首先调用create()
方法,传入自定义source作为参数
public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
Objects.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
}
接着先对传进来的source进行判空,如果为空,则直接抛出异常。如果不为空,则走onAssembly()
方法,该方法在上面已经分析过了,所以我们再来看new ObservebleCreate<>(source)
的创建过程。
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
......
}
ObservableCreate
中创建了一个source
成员变量,并在创建的时候直接将传入的自定义source赋值给它。
subscribe订阅过程
observable.subscribe(new Observer(object){
......
})
我们来分析subcribe()
函数
public final void subscribe(@NonNull Observer<? super T> observer) {
Objects.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
//错误处理
......
}
protected abstract void subscribeActual(@NonNull Observer<? super T> observer);
}
前面几行代码是对observer
对象进行校验,如果校验没问题,最后调用subscribeActual()
方法。而该方法是一个抽象方法,那么是谁实现了该方法呢,是我们的ObservableCreate
,因此我们再来看它里面subscribeActual()
的实现。
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
首先CreateEmitter
为我们传进来的自定义observer
创建一个事件发射,并将它包裹起来,然后调用observer.onSubscribe()
,这就是前文提到一调用subscribe()
,observer.onSubscribe()
就会调用的原因。
接着调用source.subcribe()
,把创建的事件发射器传进去。还记得source是什么吗,它就是我们一开始创建的自定义source,而该方法的抽象实现又回到了在我们代码最开始的地方。
Observable.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Object> e) throws Throwable {
e.onNext("");
}
})
然后就会调用到CreateEmitter.onNext()
public void onNext(T t) {
if (t == null) {
onError(ExceptionHelper.createNullPointerException("onNext called with a null value."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
由于CreateEmitter
中已经持有了observer
的对象,最后就会调用到observer.onNext(t)
,这样完整的订阅流程就形成了。
map()源码分析
public final <R> Observable<R> map(@NonNull Function<? super T, ? extends R> mapper) {
Objects.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<>(this, mapper));
}
我们重点来ObservableMap
这个类。
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
//保存function
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
......
}
}
首先在构造函数中,将Function保存起来,map()
之后,返回的依旧是Observerable
对象,并将ObserverableCreate
对象保存为它的source,接下来会走到ObservableMap.subscribeActual()
中,然后为observer
对象t
包裹一层MapObserver
。这时候再调用ObserverableCreate.subscribeActual()
,后续的订阅流程就跟之前一样了。
在这里,我们重点分析MapObserver
中的处理。
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v;
//mapper合法性校验
try {
//应用function转换
v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
//向下游传递
//实际为我们自定义的observer
downstream.onNext(v);
@Override
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}
//异常处理
@Nullable
@Override
public U poll() throws Throwable {
T t = qd.poll();
return t != null ? Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
}
}
}
由前面的流程分析,会一直调用到CreateEmitter.onNext()
,然后又会进一步对MapObserver
进行拆包,进而走到它的MapObserver.onNext()
方法中,先对Function 对象 mapper
进行合法性校验,然后调用apply()
函数,这里的apply函数是抽象函数。具体实现在我们的链式调用中,最后得到返回值v,也就是经过变换后的对象,并调用downstream.onNext(v)
向下游传递。downsteam
则为被包裹住的observer
对象。
new Function<Object, Boolean>() {
@Override
public Boolean apply(Object o) throws Throwable {
return null;
}
}
为什么RxJava要这么写呢,从上文的分析中,我们可以看到在链式调用中,如果调用多次map()
,相当于为自定义observer封装了层层包裹,发布订阅的过程就相当于是封包->拆包的过程,代码逻辑清晰,避免了函数嵌套。这就是一种装饰模型,一开始我们创建了ObserverableCreate
,然后为它穿上一件件ObserverableMap
的外套,然后subscribe
的过程就是脱外套的过程。
总结
本篇文章通过对RxJava中Observer、Observable、subscribe源码分析,比较了其与标准观察者设计模式的差别,更深的学习了RxJava的思想和原理。
网友评论