传统的观察者模式
观察者模式.png类图.png
RxJava 四个要素
- 被观察者
- 观察者
- 订阅
- 事件
- 创建被观察者
subscriber就是观察者
- 创建被观察者
创建被观察者.png
- 创建观察者
- 订阅
核心
核心.png操作符
map
map操作符.pngimage.png
//调用lift方法,创建一个OperatorMap对象作为参数
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return lift(new OperatorMap<T, R>(func));
}
public final class OperatorMap<T, R> implements Operator<R, T> {
final Func1<? super T, ? extends R> transformer;
public OperatorMap(Func1<? super T, ? extends R> transformer) {
this.transformer = transformer;
}
@Override
public Subscriber<? super T> call(final Subscriber<? super R> o) {
//创建新的观察者对象
return new Subscriber<T>(o) {
@Override
public void onCompleted() {
o.onCompleted();
}
@Override
public void onError(Throwable e) {
o.onError(e);
}
@Override
public void onNext(T t) {
try {
//使用传入的观察者调用onNext方法回调Func1的方法
o.onNext(transformer.call(t));
} catch (Throwable e) {
Exceptions.throwOrReport(e, this, t);
}
}
};
}
}
//lift方法中创建新的被观察者对象,相当于代理,负责接收原始的被观察者的事件
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> o) {
try {
//拿到map方法中创建的观察者对象
Subscriber<? super T> st = hook.onLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
onSubscribe.call(st);
} catch (Throwable e) {
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
o.onError(e);
}
}
});
总结:
map操作符是通过创建一个Observable<R>响应实现的是Observable<T>中的观察者Subscriber<? super T>
这里有点绕,hook.onLift(operator).call(o)这行代码是调用OperatorMap方法中的call方法传入新创建的Subscriber观察者,在新创建的观察者中的onNext犯法中执行的是原观察者的Func1中的call回调。
flatMap
这个其实就是把多个map串联起来统一处理
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
if (getClass() == ScalarSynchronousObservable.class) {
return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func);
}
return merge(map(func));
}
RxJava 线程调度
Schedulers
Schedulers.png 线程控制.png
subscribeOn 被观察者处理观察者的call回调(发出事件)
observeOn 观察者的回调处理
总结:
subscribeOn只能调用一次,因为subscribeOn作用域是全局的每次创建新的Observable,subscribeOn指定Observable的线程执行位置。Observable只有一个。
observeOn指定是它之后的subscriber观察者回调线程执行位置。subscriber是多个。
网友评论