一、什么是RxJava?
引用GitHub上的说明:
RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.
It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.
意思大概是:RxJava是Reactive Extensions的java 虚拟机的实现,Reactive Extensions就是通过可被观察的序列代码编写异步和基于事件驱动程序的类库。
它扩展了观察者模式以支持数据和/或事件序列,并增加了操作符,使您可以声明性地组合序列,同时抽象出对低级线程,同步,线程安全性,并发数据结构和非线程等事物的关注阻塞I / O
二、RxJava的使用
三、源码基础流程分析
最简单调用案例:
最简单调用基本调用流程:首先生成一个Observable实例,然后调用它的subscribe与观察者进行绑定,绑定的过程最终会执行ObservableOnSubscribe接口实例的subscribe(emitter)方法,通过调用emitter.onNext(12)就会回到观察者Consumer的accept(data),这里的data就是onNext(12)中的12,这里暂时没涉及到线程切换,所以所有代码都在调用者线程中执行。
Observable被观察者的创建:先看看Observable.create(ObservableOnSubscribe)方法,传参就是ObservableOnSubscribe接口的实例,
Observable.create(ObservableOnSubscribe source)方法先将ObservableOnSubscribe实例source传入ObservableCreate(source)进行一层装饰,ObservableCreate类本身就是Observable的子类,这样ObservableCreate直接持有ObservableOnSubscribe接口的实例source。RxJavaPlugins.onAssembly(source)对ObservableCreate实例用hook函数进行处理,如果没有hook函数直接返回source,这样就获得的被观察者,它是ObservableCreate的实例。
被观察者与观察者的绑定:通过调用Observable.subscribe(consumer)进行绑定,Observable里重载了subscribe()方法,会调用
Observable的subscribe()方法onNext就是我们传进来的参数,其他参数默认,Consumer只是一个简单的接口并不是我们所需要的观察者,真正的观察者是LambdaObserver,LambdaObserver实现了Observer接口
public interface Observer {
void onSubscribe(@NonNull Disposable d);
void onNext(@NonNull T t);
void onError(@NonNull Throwable e);
void onComplete();
}
通过分析LambdaObserver源码我们得知,调用LambdaObserver的onSubscribe()、onNext()、onError()其实就是调用LambdaObserver(onNext, onError, onComplete, onSubscribe)对应四个入参的方法,至此,我们的观察者也出生了,那么是时候与真正的绑定了,最终调用重载的subscribe(ls)方法如下
最终调用的subscribe(observer)方法绑定这个方法的就重要的一行代码就是subscribeActual(observer),subscribeActual()是Observable的一个抽象方法,实现在其子类中,本例中创建的被观察者就是前面提到的ObservableCreate类,所以最终调用ObservableCreate的subscribeActual(observer)方法,
调用被观察者ObservableCreate实例的subscribeActual()最终调用source.subscribe(parent),这里的source就是Observable.create(source)的入参,
这样observable.subscribe()最终会执行Observable.create()的入参的subscribe()方法。这个例子中没有切换线程,所以只在调用者线程中执行subscribe(emitter)方法的。
接下类分析下emitter.onNext(12),前面说过这行代码会通知观察者,即会执行上面的Consumer的accept(Integer data)方法。onNext(12)到底做了什么?先找到emitter对象的类,回到ObservableCreate类中,emitter就是这里的内部类CreateEmitter,CreateEmitter类实现了接口ObservableEmitter,关键的是CreateEmitter通过LambdaObserver间接持有了observable.subscribe(consumer)的入参consumer,调用LambdaObserver的onNext()其实就是调用consumer的accept()方法。
到此,最简单的例子分析完成,从观察者,被观察者的创建,到两者的关联(执行subscribe()),最后被观察者主动通知观察者它的工作处理完了,这就是响应式编程。
三、RxJava2链式调用操作符源码分析
RxJava提供了链式调用代码,以及很多操作符,通过不同的操作完成所需功能。操作符很多不能每个都分析,下面选取map()来分析。操作符的使用请参考(二)。map一般用来对数据的转换处理,如网络请求的数据解析。
map将Integer转String这里map将整型Integer转为String型,从上面可以知道,我们只是将Function接口的实例传给了map,Function接收两个泛型,第一个是原始数据类型,第二个是转换的目标类型。
map源码看着这两行是不是与前面我们创建observable的create()的类似。这里创建了ObservableMap对象,ObservableMap也是Observable的子类,入参为this与mapper,这个this其实就是Observable.create()创建的ObservableCreate的对象,ObservableMap构造函数只是简单的持有了这两个入参,与ObservableCreate的构造函数如出一辙。经过map的处理返回的是ObservableMap对象,由之前的ObservableCreate对象转化为了ObservableMap对象,其实就是包了一层装饰。
map()返回的Observable对象然后调用subscribe()方法绑定观察者,流程前面分析得很清楚了,最终也是会调用ObservableMap的subscribeActual(Observer t)
ObservableMap的subscribeActual(Observer t)这个subscribe()其实就是Observable对象的,这样就又重新跑了没有map()的最简单的例子的流程,这样以来两个Observable(ObservableCreate和ObservableMap)都关联了观察者。
这里将Observer转换为了MapObserver对象,MapObserver关键是重写了onNext()方法,在onNext()方法中对数据进行Function().apply()转化,当我们调用emitter.onNext(12)的时候就会跑到这里,最后将转化后的数据传回给观察者。
如果在看看其他的操作符源码,如just、range、merge等,你会发现它们是类似的处理,这里用到的是装饰模式,每个操作符都对ObservableOnSubscribe的加了一层装饰。
四、优秀的线程切换
RxJava的一行代码即可在UI线程与子线程之间轻松切换,从此再也不用handler、thread、runOnUiThread(),使得代码更加简洁,易于维护。那么RxJava内部到底是如何处理线程的切换的呢?下面就详细而简单的分析下。
RxJava的可以通过observeOn()(改变观察者的执行线程)与subscribeOn()(改变被观察者的执行线程)来控制,两个操作符的入参都是Scheduler对象.
线程切换AndroidSchedulers.mainThread()切换到android的主线程,Schedulers.io()切换到子线程,本例中观察者的代码(即Consumer.accept())在android的ui线程中执行,被观察者的代码(即ObservableOnSubscribe.subscribe(emitter))放到了io子线程执行,通过简单的两行代码即可完成线程的切换,不像用thread、activity.runOnUi()、handler、异步任务这么繁琐。observeOn和subscribeOn就做了两件事,一、Scheduler的创建,本例中创建的两个scheduler分别是HandlerScheduler和IoScheduler;二、对observable装饰一层(装饰者模式),分别对应的两个被观察者是ObservableObserveOn和ObservableSubscribeOn,因为subscribeOn后与observeOn,所以最后得到的是ObservableSubscribeOn的对象。经过这两个操作符的修饰后就调用subscribe(consumer)绑定观察者了,先执行ObservableSubscribeOn的subscribeActual(Observer)
这个scheduler就是Schedulers.io()返回的IoScheduler,调用scheduler.scheduleDirect(new SubscribeTask(parent)),这个方法的入参是SubscribeTask实现了Runnable接口
SubscribeTask到了这一步就可以猜都scheduler.scheduleDirect(new SubscribeTask(parent))就是在子线程中执行SubscribeTask,从而让source.subscribe(parent)在子线程中执行,这样就将被观察者的执行代码放到了io子线程.
那么又有个问题,IoScheduler是如何切换到子线程的呢?接着我们看看scheduler.scheduleDirect()到底做了什么。
ioScheduler.scheduleDirect()这里先Worker类的对象,然后调用它的schedule()方法执行传进来的Runnable。createWorker()是个抽象方法具体实现在子类IoScheduler中,
IoScheduler的createWorker()找到EventLoopWorker的schedule()方法,我们发现真正schedule的是threadWorker.scheduleActual()。
NewThreadWorker的scheduleActual()先对Runnable包一层,ScheduleRunnable实现了Callable接口,然后调用executor来运行,这样就将SubscribeTask放到了线程中执行,executor是ScheduledExecutorService的实例,终于清楚RxJava2的线程切换时通过java线程池来实现的了。
通过上面的分析我们就明白subscribeOn(Schedulers.io())的目的是将每层的Observable的subscribeActual()放到了io子线程中执行,最终会执行Observable.create(ObservableOnSubscribe)入参的subscribe()。
接下来分析observeOn(AndroidSchedulers.mainThread()),这行代码对上层装饰成ObservableObserveOn,由前面分析subscribeOn(Schedulers.io())得知ObservableObserveOn的subscribeActual()切换到了子线程。
切换观察者线程那么如何将观察者的执行线程切换到Ui线程呢?AndroidSchedulers.mainThread()获得的是HandlerScheduler,所以这个scheduler是HandlerScheduler的实例,创建了自己的Worker,通过ObserveOnObserver可知线程切换同样是通过Worker.schedule(Runnable)来实现的。
ObserveOnObserver的schedule()查看HandlerScheduler的源码,可以发现这个scheduler是通过handler来实现线程控制了,这个handler是通过new Handler(Looper.getMainLooper())创建,所以它执行的Runnable就切换到了Ui线程,这样观察者的执行线程就在主线程中执行。
subscribeOn在操作链上最好只调用一次,如果多次调用,依然只有第一次生效,observeOn可以在链上调用多次,它主要是用来指定下一个操作在哪一个线程上执行
当多次调用observeOn的时候为什么会是指定下一个操作在哪个线程?当我们调用subscribeOn(Consumer)的时候,会一层一层依次调用每层observable的subscribeActual(observer),每层subscribeActual(observer)会对observer装饰一层,这样一层一层从下往上的嵌套,当调用的是observeOn()就会给observer套一层切换到指定的线程的功能,最后调用嵌套出来的observer的onNext()会从外到内(装饰模式)调onNext(),这样下层(即下一个操作符)的操作就会在前面指定的线程执行,因为上层的observer已经切换到了指定线程。
分析完subscribeOn(Schedulers.io())和observeOn(AndroidSchedulers.mainThread())我们可以知道RxJava的线程控制是通过Scheduler来实现,通过Scheduler的createWorker()来获取worker,worker决定执行线程,真正由Worker的schedule()来运行Runnable.
五、结论
RxJava提供了丰富的操作符链式编程,使代码简洁,易于维护,线程切换功能极其强大,可任意指定观察者发生的线程以及被观察者的线程,随意调整极其强大~
网友评论