RxJava 里面采用了观察者的设计模式,因此存在观察者与被观察者,被观察者发送消息通知观察者,观察者接收到消息更新操作,下面先看一下基础的 Rx 代码:
public static void testRx() {
//被观察者
Observable.create(
new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("发送一条消息");
}
})
//ObservableCreate.subscribe
.subscribe(
//观察者
new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("---", "onSubscribe");
}
@Override
public void onNext(String s) {
Log.e("---", "onNext:" + "收到:" + s);
}
@Override
public void onError(Throwable e) {
Log.e("---", "onError");
}
@Override
public void onComplete() {
Log.e("---", "onComplete");
}
});
}
Observer(观察者)
我们来看看 Observer 里面做了什么,点击进去 Observer 这个类里面发现这是一个接口,当我们 new 出来后,走到的就是我们自定义的接口方法中,观察者的代码并不复杂,只是一个接口回调:
public interface Observer<T> {
//订阅时的回调方法
void onSubscribe(@NonNull Disposable d);
//接受到消息的回调方法
void onNext(@NonNull T t);
//出错时的消息回调方法
void onError(@NonNull Thr1owable e);
//订阅完成之后的回调方法
void onComplete();
}
Observable(被观察者)
接下来看看 Observable 做了什么,前面的基础代码是调用了 Observable 的 create 方法,我们进去到 create 方法里面:
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
在里面发现返回了一个 RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)),这里先介绍一下 RxJavaPlugins.onAssembly 这个方法。
RxJavaPlugins.onAssembly
当我们去看 RxJava 源码的时候我们会发现很多地方都出现了这个方法,这是一个类似hook 的方法,通过这个方法可以监听到 Rx 的行动轨迹(自己概括的),我们以 create 的 onAssembly 来举例,当 Rx1.0时直接把我们上面传进来的 ObservableOnSubscribe (自定义source)返回了,这里会疑惑为何要多此一举,当看到 2.0 时就会发现多了一个 onObservableAssembly,可以看到里面的 onObservableAssembly 一般情况下是个空值,所以一般情况下还是直接返回 ObservableOnSubscribe(自定义source) ,当需要监听这一步的生命轨迹时,可以通过设置对应的 onObservableAssembly 进行监听,到时就会先走 onObservableAssembly 的 apply 方法:
//Rx1.0
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
return source;
}
//Rx2.0
static volatile Function<? super Observable, ? extends Observable> onObservableAssembly;
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
//设置对应的监听器
public static void setOnObservableAssembly(@Nullable Function<? super Observable, ? extends Observable> onObservableAssembly) {
if (lockdown) {
throw new IllegalStateException("Plugins can't be changed anymore");
}
RxJavaPlugins.onObservableAssembly = onObservableAssembly;
}
===============================================
RxJavaPlugins.setOnObservableAssembly(new Function<Observable, Observable>() {
//这里传进来的被观察者就是我们刚才传进去的自定义source(ObservableOnSubscribe )
//会先走到 ObservableAssembly 的apply 回调,然后再走下一步操作
@Override
public Observable apply(Observable observable) throws Exception {
Log.e("---", "Hook 住");
return observable;
}
});
我们把这个方法简称为监听器,通过这个监听器,只要 RxJava 里面有它存在的地方都可以通过上面的这个方法进行监听。
讲完监听器,继续看刚才的源码,发现 new 了一个 ObservableCreate,从源码可以看到这个 自定义source 继承了 Observable 被观察者,接着来看看 ObservableCreate 做了什么:
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
//这里的observer 指的是订阅的观察者,
protected void subscribeActual(Observer<? super T> observer) {
//将传入的观察者封装成一个 CreateEmitter
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//发生订阅时,调用观察者的 onSubscribe 方法
observer.onSubscribe(parent);
try {
//这个source就是上面 new 出来的那个自定义source,调用的 subscribe
//就是调用的自定义source 的subscribe 方法
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
//省略其他代码
//.....
}
可以看到在 ObservableCreate 里面重写了一个 subscribeActual 方法,在这个方法里面,接收到了观察者,先是把观察封装了一层(CreateEmitter)然后再调用观察者的 onSubscribe 方法,最后是调用前面自定义source 的 subscribe 方法。
subscribe(订阅)
从上面一开始的代码的看到 create 操作之后就会进行 subscribe(订阅)操作,因为是链式调用,因此这一步实际上是由 ObservableCreate 这个对象的 subscribe 操作,点进源码看看 subscribe 操作做了什么:
//这里的 observer (观察者)就是上面代码new 出来的传进来的
public final void subscribe(Observer<? super T> observer) {
//空判断
ObjectHelper.requireNonNull(observer, "observer is null");
try {
//空判断
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
//调用 subscribeActual 方法,把new 的观察者传进去
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
.....
}
}
==================================================
protected abstract void subscribeActual(Observer<? super T> observer);
从 subscribe 源码可以看到传进来的 observer(观察者)就是我们自己 new出来的 observer。先是对传进来的 observer 判空,然后调用 subscribeActual 方法,可以看到这个 subscribeActual 是个抽象方法,上面说到 subscribe(订阅)操作是由 ObservableCreate 调用的,因此这里的 subscribeActual 就回调到前面代码ObservableCreate 类里重写的 subscribeActual 方法,这里再重新看一下:
@Override
protected void subscribeActual(Observer<? super T> observer) {
//打包传进来的观察者成为一个 CreateEmitter
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//调用传进来观察者的 onSubscribe 方法,把打包好的数据传进去
observer.onSubscribe(parent);
try {
//这里的这个 source 就是我们 new 出来的自定义source,把打包的数据
//传进去,调用自定义source的subscribe方法
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
====================================================
//自定义source
Observable.create(
//自定义source
new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("发送一条消息");
}
})
从上面代码可以看到,先是把传进来的观察者包装成一个 CreateEmitter,然后调用观察者的 onSubscribe 方法,这也是为什么一订阅就会回调 onSubscribe 方法的原因,接着会调用前面自定义 source 的 subscribe 方法,上面已经贴出了自定义 source 的 subscribe 方法,可以看到在 自定义 source 的 subscribe 方法中调用了刚才打包的 CreateEmitter 的 onNext 方法,也就是调用到刚才打包的 CreateEmitter 的 onNext 方法:
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
//通过构造函数保存观察者
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
//调用到我们 new 出来的观察者的 onNext 方法
observer.onNext(t);
}
}
·····
}
从上面的源码可以看到,当调用到打包的 CreateEmitter 的 onNext 方法时,最终会调用到我们上面 new 出来的 observer (观察者对象)的 onNext 方法。这也是为什么在链式调用时,可以在 new 出来的 observer (观察者对象)中的 onNext 方法中处理消息的原因。
Map操作符
之前已经有文章介绍过 map 操作符的用法,对于这一类操作符就是对传进来的 observer (观察者对象)进行打包再传给上游 Observable(被观察者)。当发生订阅,接受到消息后则是先经过 map 操作符的的 onNext 在到我们new 出来的 observer (观察者对象)的 onNext 方法,这也就是为什么说又把 RxJava 叫成一个卡片式编程的原因:
RxJava流程图线程切换
上一章说过 RxJava 通常会和 Retrofit 配套使用,在Android4.4以后需要把耗时操作放到异步线程去做,因此就需要切换线程,在子线程获取数据,在主线程更新界面UI。主要涉及以下两个操作符:subscribeOn(给上游分配线程),observeOn(给下游分配线程):
Observable.create(
//自定义source
new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("发送一条消息");
}
})
//ObservableCreate.map
.map(
new Function<String, Bitmap>() {
@Override
public Bitmap apply(String s) throws Exception {
Log.e("---", "Map:拦截一条消息");
return null;
}
})
//ObservableMap.subscribeOn
.subscribeOn(Schedulers.io())
//AndroidSchedulers.mainThread() 是 RxAndroid 下的 因此必须导入RxAndroid
.observeOn(AndroidSchedulers.mainThread())
//ObservableMap.subscribe
.subscribe(
new Observer<Bitmap>() {
········
········
});
Schedulers.io()
在调用 subscribeOn 时,会传入 Schedulers.io(),我们先来看看这个 Schedulers.io() 是个什么东西:
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
=================================================
static {
SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
IO = RxJavaPlugins.initIoScheduler(new IOTask());
TRAMPOLINE = TrampolineScheduler.instance();
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}
发现这里又是前面提到的那个监听器,注意这里使用了策略模式,根据参数进行不同的操作,到这里 new 了一个 IOTask() ,我们跟进去这里做了什么:
static final class IOTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return IoHolder.DEFAULT;
}
}
==============================================
public IoScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference<CachedWorkerPool>(NONE);
start();
}
@Override
public void start() {
CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
if (!pool.compareAndSet(NONE, update)) {
update.shutdown();
}
}
这里返回了一个 IoScheduler实例,到这里发现是一个线程池的操作,现在回到前面的 subscribeOn 方法,我们跟进去 subscribeOn 做了什么:
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
=============================
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
================================
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
================================
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
==================================
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
//这里的 createWorker 调用的就是前面 IoScheduler 的 createWorker 方法
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
一开始可以看到又是监听器,跟进监听器可以看到又是那个熟悉的重写方法 subscribeActual,看到这里联想到刚才说的流程不难发现这是在订阅时回调的,可以看到 new 了一个 SubscribeTask 实现了 Runnable 接口,在 run 方法里面传给了上游。
接着前面是调用了scheduler.scheduleDirect(),这里的 scheduler 就是我们前面传进去的Schedulers.io(),也就是刚才找到最后的 IoScheduler,跟踪到最后在 scheduleDirect 方法里调用了一个 createWorker 方法,这是个抽象方法,因此调用到的是前面 IoScheduler 的 createWorker 方法。通过 IoScheduler 生成了 Worker 对象,接着调用 IoScheduler 生成的 Worker 对象 的 schedule 方法:
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
//我们要跟紧这个 action 就是前面那个实现了 runnable 接口的方法
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
}
================================
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
//以下都是对前面这个的 runnable 方法的包装 / 判空
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
//如果没有延迟马上执行调用 submit 方法
if (delayTime <= 0) {
//注意这里的 executor 就是 IoScheduler 里定义的线程池
//在这里执行 前面实现了runnable 接口方法,因为这里是通过submit 方法,因此可以接收到线程执行后的返回接口
f = executor.submit((Callable<Object>)sr);
} else {
//如果有延迟调用到 schedule 方法传入 延迟时间及单位
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
通过上面源码可以发现最后是通过调用 IoScheduler 里面的线程池里的 submit / schedule 方法去执行前面实现了 runnable 接口的方法,从而实现切换线程。
observeOn
上面说了subscribeOn 的源码,知道了是通过线程池的方式给上游分配异步线程,下面仍然通过相同的方式来看源码,首先仍然是看到 AndroidSchedulers.mainThread() 里做了什么:
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
=======================================
private static final class MainHolder {
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}
==========================================
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
这里一开始可以看到仍然是一个监听器,跟进去后发现是返回的是一个 MainHolder.DEFAULT,再跟进去后发现是 new 了一个 HandlerScheduler,注意到这里传进去的是一个 Handler(Looper.getMainLooper()),这时候心里猜想是通过Handler 来切换线程的,这里通过 Looper.getMainLooper() 的方式新建 Handler 确保了新建的Handler 是100% 运行在主线程的。接下来跟进去这个 HandlerScheduler 看看做了什么:
//这里保存新建的 Handler
HandlerScheduler(Handler handler) {
this.handler = handler;
}
======================
@Override
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
handler.postDelayed(scheduled, Math.max(0L, unit.toMillis(delay)));
return scheduled;
}
//省略部分代码
·······
可以看到,首先是保存了传进去的主线程 Handler,然后看到了前面 IoScheduler 里熟悉的 scheduleDirect 方法,我们先跟踪 AndroidSchedulers.mainThread() 到这里,接下来跟进 observeOn 的源码:
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
====================
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
//空判断
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
==============================
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
可以看到先是返回了一个 observeOn 方法,跟进去后先是对我们传进去的 Scheduler 进行判空,这里的 Scheduler 就是刚才我们跟踪找到的 HandlerScheduler。最后是new 了一个 ObservableObserveOn 方法,进去后又看到了我们熟悉的 subscribeActual 方法。前面说过个 Scheduler 是 HandlerScheduler,因此走的是 else 方法,走 HandlerScheduler 的 createWorker 方法创建一个 Worker 对象。
前面说过在发生订阅时,会触发这个方法,在往下游 Observer(观察者)传递的时候是一个拆包的过程,因此会先经过这个 observeOn 的 onNext 方法:
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
========================
void schedule() {
if (getAndIncrement() == 0) {
//这里的 worker 就是我们前面找到的 HandlerScheduler,注意这里的this,
//observerOn 实现了 runnable接口,因此这里传的是 this
worker.schedule(this);
}
}
================================
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
//空判断
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
//包装传进来的 ObserverOn 对象,这个 ScheduledRunnable 实现了 Runnable 接口
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
//通过 handler 发送数据,这个 handler 就是前面传Looper.getMainLoop新建的Handler
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
====================================
private static final class ScheduledRunnable implements Runnable, Disposable {
private final Handler handler;
private final Runnable delegate;
private volatile boolean disposed;
ScheduledRunnable(Handler handler, Runnable delegate) {
this.handler = handler;
this.delegate = delegate;
}
@Override
public void run() {
try {
//这个 delegate 就是前面传进来的 ObserverOn
delegate.run();
} catch (Throwable t) {
IllegalStateException ie =
new IllegalStateException("Fatal Exception thrown on Scheduler.", t);
RxJavaPlugins.onError(ie);
Thread thread = Thread.currentThread();
thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
}
}
}
这里先把 ObserverOn 进行了一层封装 ScheduledRunnable,通过源码可以看到,当 Handler 一发送消息时,就会执行 ScheduledRunnable 的 run 方法,也就是执行到 ObserverOn 中的 run 方法:
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
==========================
void drainFused() {
······
//这个 actual 就是下游的 Observer(观察者)
actual.onNext(null);
··········
}
这里省略一些代码,直接看重要的主干,走到 run 方法后,可以看到调用了下游 observser(观察者)的 onNext 方法,到这里整个流程串起来,前面的猜想通过Handler 来切换线程成功验证。
总结
- RxJava 通过流程看起来是一个U型结构,一切的导火线是从 subscribe (订阅)这个操作开始,他是触发所有操作的核心,在往上游被观察者时传递 Observer(观察者)的时候是一个一层层打包的过程,类似于给包上一层一层的洋葱。
- 在最顶层发生订阅后再一层层地拆包发送给下游的 Observer (观察者)。
- 线程切换问题也是一个类似的打包过程,不同的是 subscribeOn 使用的是线程池进行线程切换,而 observerOn 使用的是 Handler 进行线程切换。
- 对于标准的观察者设计模式来说,是一个 “被观察者”,多个 “观察者”,并且需要 “被观察者” 发出改变通知后,所有的 “观察者” 才能观察到。
- 在 RxJava 观察者设计模式中,是多个“被观察者”,一个“观察者”,并且需要 起点 和 终点 在 “订阅” 一次后,才发出改变通知,“终点(观察者)” 才能观察到。
- 在标准的观察者设计模式中:当发出通知改变时,会遍历Observable里面的容器,此容器里面有10个Observer,就会通知10个Observer。
- 在 RxJava观察者设计模式中:分发事件时,会拿到发射器,通过发射器关联 到 我们自定义的 Observer,发射器调用 到 我们自定义的Observer。
能力有限,希望对你有所帮助~~
网友评论