1.基本结构
我们先来看看,RxJava基本代码:
//被观察者Observable observable = Observable.create(newObservable.OnSubscribe() {@Overridepublicvoidcall(Subscriber subscriber){ subscriber.onNext("Hello World"); subscriber.onNext("Hello Keye"); subscriber.onNext("Hello jikexueyuan"); subscriber.onCompleted(); subscriber.onError(); }});//观察者Subscriber subscriber =newSubscriber() {@OverridepublicvoidonCompleted(){ Logger.d("onCompleted()"); }@OverridepublicvoidonError(Throwable e){ Logger.d("onError:"+ e.getMessage()); }@OverridepublicvoidonNext(String s){ Logger.d("onNext():"+ s); } };//观察者订阅被观察者observable.subscribe(observer);
在调用subscribe()方法后,observer作为参数传入OnSubscribe的call()方法中,observable通过call()方法,来调用observer的onNext(),onCompleted(),onError()方法。从而实现了事件的传递。
2.流程分析
来看看源码是如何实现的:
创建Observable时:
Observable.create(onSubscribe);//Observable的create()方法publicstaticObservablecreate(OnSubscribe f){returnnewObservable(hook.onCreate(f));}//Observable的构造方法protectedObservable(OnSubscribe f){this.onSubscribe = f;}//RxJavaObservableExecutionHook的onCreate()方法publicOnSubscribeonCreate(OnSubscribe f){returnf;}
这里直接调用构造方法,来创建一个Observable对象。构造方法中传入一个OnSubscribe对象,赋值给新建的Observable对象中的OnSubscribe。
扩展:追溯源码发现,Observable的生命周期方法中都有RxJavaObservableExecutionHook的身影,它的onCreate()方法只是传入参数再返回,没有其他操作。这里可以理解为把ExecutionHook注入到Observable的生命周期中,它默认无任何操作,可以通过RxJavaPlugin.getInstance()方法,自定义hook进行相关扩展。我们暂时用不到,先不管它。
创建Subscriber时:
publicabstractclassSubscriberimplementsObserver,Subscription{...}publicinterfaceObserver{voidonCompleted();voidonError(Throwable e);voidonNext(T t);}
Subscriber是一个继承了Observer(观察者)接口的抽象方法,它重写它了的onNext(),onCompleted(),onError()方法,并进行了一些扩展;
publicinterfaceSubscription{//取消订阅voidunsubscribe();//是否取消了订阅booleanisUnsubscribed();}
另外Subscriber还继承了Subscription接口,方便取消订阅和查询订阅状态。
订阅subscribe()时:
privatestaticSubscriptionsubscribe(Subscriber subscriber, Observable observable){// new Subscriber so onStart itsubscriber.onStart(); observable.onSubscribe.call(subscriber);returnhook.onSubscribeReturn(subscriber); }}
这里只写出了源码的核心部分,可以看到subscribe()传入了Subscriber和observable;
首先,调用subscriber的onStart()方法,在订阅之前执行一些准备代码。
之后,Observable中的onSubscribe对象调用call(),传入subscriber作为参数。
最后,返回一个Subscription对象;之前提到过hook对象是没有任何操作的,也就是说返回的是一个实现Subscription接口的subscriber,便于取消和查询订阅。
网友评论