简单的写一点自己理解的源码分析
最简例1:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
//e 就是包装后的观察者对象,请往后看
e.onNext("1");
e.onComplete();
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String value) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
响应式
-Observable 被观察者
-Observer 观察者/订阅者
Observable.create创建了一个Observable对象,参数是ObservableOnSubscribe接口的实现,并在Observable里保存ObservableOnSubscribe的引用source。这个引用对象为方法,所以使用接口形式实现。
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");//非空判断
//onAssembly hook相关可忽略
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
//创建的ObservableCreate就是一个Observable对象实现了subscribeActual(Observer<? super T> observer)方法
public final class ObservableCreate<T> extends Observable<T>{
final ObservableOnSubscribe<T> source;//源引用
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {//重要!!!订阅时执行
// 包装observer 当传入observer时CreateEmitter开始发送数据
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);//onSubscribe(Disposable d) CreateEmitter也实现了Disposable
try {
//重要!!!使观察者和被观察者相关联
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}}
CreateEmitter实现了ObservableEmitter 译为被观察者的发射器,当被观察者订阅观察者的时候(以代码流程直译的,应该理解为观察者订阅被观察者)也就是Observable.subscribe的时候,源source会调用subscribe将订阅的观察者observer作为参数传进去,使观察者和被观察者建立关系,此时数据开始传递。
//装饰类包装observer,添加了isDisposed标识是否可用相关的东西
//实现了ObservableEmitter,所以可以作为ObservableOnSubscribe的参数。
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
//先调用onError ,onComplete不会被执行
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
@Override
public void onComplete() {
//先调用onComplete 再调用onError 会crash
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
}
//作为Observable.create的参数
public interface ObservableOnSubscribe<T> {
void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
}
//是一个装饰接口,继承Emitter,在源码里包装了一次Observer
public interface ObservableEmitter<T> extends Emitter<T> {
void setDisposable(@Nullable Disposable d);
void setCancellable(@Nullable Cancellable c);
boolean isDisposed();
ObservableEmitter<T> serialize();
boolean tryOnError(@NonNull Throwable t);
}
public interface Emitter<T> {
void onNext(@NonNull T value);
void onError(@NonNull Throwable error);
void onComplete();
}
调用Observable.subscribe(Observer<? super T> observer) 完成订阅
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
subscribeActual(observer);
}
例2:Observable.map(Function<? super T, ? extends R> mapper)
//将String变换成Integer
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("1");
e.onComplete();
}
}).map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
return Integer.parseInt(s);
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});;
首先看map方法干了什么
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
//不用想ObservableMap也应该是一个Observable对象
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
//AbstractObservableWithUpstream继承Observable 里面只有一个成员ObservableSource<T> source的引用
//Observable 实现了ObservableSource所以这2个可以看成是一个东西,这个引用就是保存上游的Observable 引用
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
//MapObserver为Observer即订阅者 t:下游的Observer | function:map()参数
source.subscribe(new MapObserver<T, U>(t, function));
}
}
ObservableMap的subscribeActual()
参考上面的简单例子Observable .subscribe参数为Observer即订阅者,所以MapObserver肯定实现了Observer,那么source.subscribe这个方法调用的时候就会调用上游源Observable 的subscribeActual()方法,将MapObserver包装成ObservableEmitter,那么当调用e.onNext("1")的时候,即调用了MapObserver.onNext("1"); 看下MapObserver的onNext做了什么
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);//actual为下游的Observer引用
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
//mapper 就是map()里传入的Function接口的实现。实现变换
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
//下游Observer继续执行onNext
actual.onNext(v);
}
}
Function 接口
public interface Function<T, R> {
//很简单,给我一个T还你一个R
R apply(@NonNull T t) throws Exception;
}
到此,源Observable 和 ObservableMap已经做好准备,ObservableMap的MapObserver也已经实现,就等最下游Observable .subscribe调用了,调用后反向执行Observable 的subscribeActual()方法,然后数据正向开始传递。
之前见有人说Observable 在订阅前的每一个点(.)都是创建了一个Observable 对象。这么看来还是很贴切的,rx的各种变换和线程切换也就不难理解了。具体后面看完再做总结。
网友评论