//被观察者
Observable<String> stringObservable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("Jenchar");
subscriber.onCompleted();
}
});
//观察者
Observer<String> observer = new Observer<String>() {
@Override
public void onCompleted() {
Log.w(TAG, "onCompleted");
}
@Override
public void onError(Throwable e) {
Log.w(TAG, "onError");
}
@Override
public void onNext(String s) {
Log.w(TAG, "onNext");
}
};
//订阅
stringObservable.subscribe(observer);
通过Observable的create静态方法传入OnSubscribe的实例,OnSubscribe继承Action1并且实现了其call方法,
传入的泛型为OnSubscribe<String>的泛型,Action1<T>中传入了Subscriber<T>,泛型作为T被层层传递
public interface Action1<T> extends Action {
void call(T t);
}
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
// cover for generics insanity
}
从Action1<Subscriber<? super T>>这个接口可以看出泛型已经指定为Subscriber<? super T>,这个Subscriber泛型怎么传递进来的呢?
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
// validate and proceed
if (subscriber == null) {
throw new IllegalArgumentException("subscriber can not be null");
}
if (observable.onSubscribe == null) {
throw new IllegalStateException("onSubscribe function can not be null.");
/*
* the subscribe function can also be overridden but generally that's not the appropriate approach
* so I won't mention that in the exception
*/
}
// new Subscriber so onStart it
subscriber.onStart();
/*
* See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
* to user code from within an Observer"
*/
// if not already wrapped
if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber<T>(subscriber);
}
// The code below is exactly the same an unsafeSubscribe but not used because it would
// add a significant depth to already huge call stacks.
try {
// allow the hook to intercept and/or decorate
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
//省略
}
return Subscriptions.unsubscribed();
}
}
以上这段源码为链式结构最后的执行方法,可以看到subscriber最终是通过 RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber)传入,让我们看看这个方法里面怎么操作
public static <T> Observable.OnSubscribe<T> onObservableStart(Observable<T> instance, Observable.OnSubscribe<T> onSubscribe) {
Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableStart;
if (f != null) {
return f.call(instance, onSubscribe);
}
return onSubscribe;
}
最终返回的还是onSubscribe对象,再调用继承了Action1的call方法传入了该subscriber指定了泛型,形成了观察者的传递,subscriber实现Observer接口。
public interface Observer<T> {
void onCompleted();
void onError(Throwable e);
void onNext(T t);
}
public abstract class Subscriber<T> implements Observer<T>, Subscription
以下代码省略
还有一点值得注意,在RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);之前还会将subscriber转成SafeSubscriber,这一步的主要作用就是保证onComplete和onError方法只执行一次,并且在onNext方法中如果trycatch到不在判断范围内的异常不会抛出而是会进行捕捉调用onError终止
@Override
public void onNext(T t) {
try {
if (!done) {
actual.onNext(t);
}
} catch (Throwable e) {
// we handle here instead of another method so we don't add stacks to the frame
// which can prevent it from being able to handle StackOverflow
Exceptions.throwOrReport(e, this);
}
}
public static void throwOrReport(Throwable t, Observer<?> o) {
Exceptions.throwIfFatal(t);
o.onError(t);
}
public static void throwIfFatal(Throwable t) {
if (t instanceof OnErrorNotImplementedException) {
throw (OnErrorNotImplementedException) t;
} else if (t instanceof OnErrorFailedException) {
throw (OnErrorFailedException) t;
} else if (t instanceof OnCompletedFailedException) {
throw (OnCompletedFailedException) t;
}
// values here derived from https://github.com/ReactiveX/RxJava/issues/748#issuecomment-32471495
else if (t instanceof VirtualMachineError) {
throw (VirtualMachineError) t;
} else if (t instanceof ThreadDeath) {
throw (ThreadDeath) t;
} else if (t instanceof LinkageError) {
throw (LinkageError) t;
}
}
网友评论