Rxjava2主要的作用你可以直接理解为,就是为了异步事件而生。也就是在某个线程中去请求某个事情,但是搞完事情之后又不想在这个线程中负责任,于是需要把责任推给隔壁的那个家伙。来具体看看:
一.使用步骤:
步骤很简单,就是三步。大家都知道RxJava的设计模式是观察者模式,所以它的实现分为:创建Observeable(被观察者),创建Observer(观察者),将这两者关联起来(通过 subscribe)
二.具体每个步骤的重要方法
因为Observable里边发送的方法很多,我这里不多说因为有许多相关的文章。我的目的是把这些东西给你串联起来。然后基于重要的方法去分析它内部具体实现的源码。
Obserable的方法:
1.create() 首先肯定是进行创建这个对象,Obserable使用的是个链式结构,所以所有的方法你都可以“.”到底
- subscribeOn(Scheduler scheduler)用来指定你搞事情的线程
3.observeOn(Scheduler scheduler)用来制定你要负责任的线程(处理UI事件肯定是UI线程)
4.subscribe(Observer obs)用来将观察者和被观察者关联起来
观察者主要是回调方法:
1.onNext()用来接受数据
2.onComplete()整个传输过程完成之后的回调
3.onError()出错了的回调,这个方法跟onComplete()是互斥方法(也就是有我没他)
三.就这点儿东西,现在上代码
得到Observeable对象
fun createObservabel( message:Message):Observable<Message>{
var obervered:Observable<Message> = Observable.create<Message> {
emitter->
emitter.onNext(message)
message.name = "wangyi2"
emitter.onNext(message)
emitter.onComplete()
}
return obervered;
}
得到Oberver对象
fun createObserver():Observer<Message>{
var oberver: Observer<Message> = object :Observer<Message>{
override fun onComplete() {
Log.i("wangyi","complete")
}
override fun onSubscribe(d: Disposable) {
}
override fun onNext(t: Message) {
Log.i("wangyi",t.name+"--age--"+t.age)
}
override fun onError(e: Throwable) {
}
}
return oberver;
}
进行Observeable和Observer绑定
fun subscribe(){
var message:Message= Message()
message.name = "wangyi1"
message.age = 3
var observerable = createObservabel(message)
var observer= createObserver();
observerable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread())
observerable.subscribe(observer)
}
好了,使用上边就这些东西。接下来咱们就是对具体源码的进行分析。
四.源码
源码分析
我个人分析源码,其实就是弄清楚这些回调方法是什么时候调用如何调用调用ok来看第一步,Observeable.create()
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
这里就是简单的进行了一个判null操作,继续看 onAssembly()这个方法。
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
你会发现这里就是对source进行一个判null的操作,对整体流程没有什么大的影响。于是咱们回头看ObservableCreate,这个就是一个Observable对象。其实到这里这个方法就看完了,就是创建了ObservableCreate一个Observable类型的对象。ObservableCreate有很多我们后期会一点点的看到的方法,不要急,咱们继续。
关于Observer这个对象其实没有什么可以分析的,因为它就是一个接口,一个用来后续回调的接口。
subscribe方法的分析
仔细一看大部分都是判断null的代码(个人也确实方案Java中空指针,这也是为啥我要使用kotlin的原因)
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
subscribeActual(observer);
其中最主要的就是 subscribeActual(observer)这个方法,我们进一步跟进去发现它是一个抽象方法。于是咱们具体得看ObservableCreate对这个方法的实现。
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
其中CreateEmitter这个静态内部类很重要,这个静态内部类里边调用了Observer的所有回调方法。哈哈,好爽吧,这里就看到了光明,后续的回调都会CreateEmitter来实现。其实这就是不指定监听线程和回调线程的情况。
RxJava线程切换
直接去参考这篇文章吧,写的很详细了https://www.jianshu.com/p/a9ebf730cd08
网友评论