先来看看rxjava一般用法
Observable.create<String> {
it.onNext("next")
it.onComplete()
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe({
}, {
}, {
})
我这里是用kotlin写的,这样写可能不太好懂,可以按下面这样来解读
var sources = object : ObservableOnSubscribe<String> {
override fun subscribe(emitter: ObservableEmitter<String>) {
emitter.onNext("下一步")
emitter.onComplete()
}
}
var observable = Observable.create(sources)
var observable1 = observable.subscribeOn(Schedulers.io())
var observable2 = observable1.observeOn(AndroidSchedulers.mainThread())
var observer = object :Observer<String>{
override fun onComplete() {
}
override fun onSubscribe(d: Disposable) {
}
override fun onNext(t: String) {
}
override fun onError(e: Throwable) {
}
}
observable2.subscribe(observer)
这样子,可能会比较容易理解,现在我我们先忽略里面的线程调度方法,就如下面这样
var sources = object : ObservableOnSubscribe<String> {
override fun subscribe(emitter: ObservableEmitter<String>) {
emitter.onNext("下一步")
emitter.onComplete()
}
}
var observable = Observable.create(sources)
var observer = object :Observer<String>{
override fun onComplete() {
}
override fun onSubscribe(d: Disposable) {
}
override fun onNext(t: String) {
}
override fun onError(e: Throwable) {
}
}
observable.subscribe(observer)
先来看observable是如何创建的
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
//这个是用来判断是否为null,如果source为null,则会抛出异常
ObjectHelper.requireNonNull(source, "source is null");
//先判断onObservableAssembly是否为null,为null直接返回ObservableCreate
//第一次创建,onObservableAssembly为null,所以返回的是括号中的内容
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
直接返回ObservableCreate对象,看这个对象的构造方法
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
//传进来一个source,并赋值
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
//.....
}
所以Observable.create就是创建一个ObservableCreate对象,并把source保存到ObservableCreate中,其中source就是前面创建的,继续看subscribe方法,知道observable为ObservableCreate对象,所以在ObservableCreate中找,但是ObservableCreate并没有这个方法,去看ObservableCreate的父类
//Observable类,ObservableCreate的父类
public final void subscribe(Observer<? super T> observer) {
//判断observer是否为null,为null抛出异常
ObjectHelper.requireNonNull(observer, "observer is null");
try {
//判断onObservableSubscribe是否为null,为null直接返回observer自身
//到目前为止都没有看见onObservableSubscribe的赋值,先默认为null
observer = RxJavaPlugins.onSubscribe(this, observer);
//判断处理后的observer是否为null
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
//调用子类的subscribeActual方法,即ObservableCreate.subscribeActual
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
subscribeActual是一个抽象方法,所以又回到ObservableCreate中
@Override
protected void subscribeActual(Observer<? super T> observer) {
//创建CreateEmitter类,并将observer传过去
//CreateEmitter是ObservableCreate的内部类,用来处理onNext等方法的
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//调用observer的onSubscribe方法
//也就是传过来的observer的onSubscribe方法,说明已经绑定了
observer.onSubscribe(parent);
try {
//调用source的subscribe方法,并将CreateEmitter过去
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
从上面可以知道source就是前面创建的
//这里就是真实的sources
var sources = object : ObservableOnSubscribe<String> {
override fun subscribe(emitter: ObservableEmitter<String>) {
emitter.onNext("下一步")
emitter.onComplete()
}
}
var observable = Observable.create(sources)
//....
所以就回调到这里的sources的subscribe方法,其中emitter就是ObservableCreate的内部类CreateEmitter,当我们调用emitter.onNext()方法,就是调用CreateEmitter.onNext()方法,来看做了什么
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
//构造方法传过来的observer,
//就是我们调用 observable.subscribe(observer)传过来的observer
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
//调用onNext方法
//判断onNext传过来的参数是否为null,为null,调用自身onError
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
//判断是否已经解除绑定,
//所以如果调用了onComplete再调用onNext,observer是不会收到消息的
if (!isDisposed()) {
//调用observer.onNext(t),并将参数传过去
//所以,到这就能回到我们的observer的onNext方法中,并获取到参数
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
//判断是否解绑,没有就调用observer.onError
//再调用dispose()方法解绑
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
@Override
public void onComplete() {
//判断是否解绑,没有就调用onComplete()然后再调用 dispose()解绑
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
//......
从上面可以知道,如何从sources调用onNext方法能传到observer的onNext方法,并接收到参数。
网友评论