介绍
只有调用了subscribe()方法,才能将Observable跟Observer关联起来,并触发Observable内处理器的执行。
执行代码
//初始化被观察者Observable,并给其加上数据处理器Observable.OnSubscribe
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("杨");
subscriber.onNext("月");
subscriber.onCompleted();
}
});
//初始化观察者Observer,视作结果接收器
Observer observer = new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String string) {
LogShowUtil.addLog("RxJava","结果: "+string,true);
}
};
//订阅
observable.subscribe(observer);
源码分析
1. 初始化被观察者Observable
Observable observable = Observable.create(数据处理器);
接下来会进入Observable#create方法
Observable#create
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f));
}
RxJavaHooks.onCreate(f)此处不做分析了,知道最终返回的还是数据处理器即可
接下来会进入真正的初始化方法
protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}
由此可知被观察者Observable持有数据处理器对象Observable.OnSubscribe。
2. 初始化结果接受器观察者Observer
Observer observer = new Observer<String>() {
...
}
3. 订阅
observable.subscribe(observer);
接下来回进入
Observable#subscribe
public final Subscription subscribe(final Observer<? super T> observer) {
if (observer instanceof Subscriber) {
return subscribe((Subscriber<? super T>)observer);
}
if (observer == null) {
throw new NullPointerException("observer is null");
}
return subscribe(new ObserverSubscriber<T>(observer));
}
Observable#subscribe
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
if (subscriber == null) {
throw new IllegalArgumentException("subscriber can not be null");
}
if (observable.onSubscribe == null) {
throw new IllegalStateException("onSubscribe function can not be null.");
}
//通知观察者做准备工作
subscriber.onStart();
if (!(subscriber instanceof SafeSubscriber)) {
subscriber = new SafeSubscriber<T>(subscriber);
}
try {
//获取数据处理器Observable.OnSubscribe,并做数据处理工作
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
if (subscriber.isUnsubscribed()) {
RxJavaHooks.onError(RxJavaHooks.onObservableError(e));
} else {
try {
subscriber.onError(RxJavaHooks.onObservableError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
RxJavaHooks.onObservableError(r);
throw r;
}
}
return Subscriptions.unsubscribed();
}
}
上面方法的主要功能代码均已标注说明。RxJavaHooks.onObservableStart()方法是用来获取当前被观察者的数据执行器。然后调用数据处理器的call()方法,此方法就是外部用户自己实现的方法。call()方法传递的参数就是结果接收器观察者Observer。
数据处理器内的cal()l方法
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("杨");
subscriber.onNext("月");
subscriber.onCompleted();
}
上面方法中的subscriber就是结果接受器Observer,经过上面执行
onNext()``onCompleted()
方法会进入对应的Observer的方法内。
接着就会进入结果接收器Observer内方法体内
Observer observer = new Observer<String>() {
@Override
public void onCompleted() {
LogShowUtil.addLog("RxJava","结束",true);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String string) {
LogShowUtil.addLog("RxJava","结果: "+string,true);
}
};
最终输出结果
结果: 杨
结果: 月
结束
网友评论