在上一篇文章RxJava2笔记(三、订阅线程切换)中,我们分析了订阅线程是如何切换的,即调用subscribeOn()来切换订阅线程时都执行了哪些操作。在本文我们将继续介绍观察者线程切换,也就是将线程由子线程切换回UI线程。
继续在前面的基础上修改代码,在订阅线程切换方法后调用observeOn(AndroidSchedulers.mainThread())将线程切换回主线程:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) {
Log.i(TAG, "subscribe--运行线程:" + Thread.currentThread().getName());
emitter.onNext(1);
emitter.onNext(2);
try {
TimeUnit.MILLISECONDS.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
emitter.onNext(3);
emitter.onComplete();
}
})
//将线程由UI线程切换到子线程执行IO请求
.subscribeOn(Schedulers.io())
//将线程切换回UI线程,方面后续操作更新UI界面
.observeOn(AndroidSchedulers.mainThread())
.subscribe(observer);
看下运行结果:
I/MainActivity: onSubscribe--运行线程:main
I/MainActivity: subscribe--运行线程:RxCachedThreadScheduler-1
I/MainActivity: onNext: 1 --运行线程:main
I/MainActivity: onNext: 2 --运行线程:main
I/MainActivity: onNext: 3 --运行线程:main
I/MainActivity: onComplete--运行线程:main
可以看到,subscribe方法运行在子线程中(也就是订阅线程运行在名为RxCachedThreadScheduler-1的一个子线程中,上文提到该线程是由RxJava实现的一个工厂类创建的),而observer运行在名为main的线程中,这个main线程就是UI线程。
看完了输出结果,接下来就看看这个observeOn(AndroidSchedulers.mainThread())
是如何将线程切换到UI线程的,点进去看下:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
//当出现异常时,默认无延迟发送错误。bufferSize()是缓冲区大小,RxJava设置了一个默认大小,为128。
return observeOn(scheduler, false, bufferSize());
}
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
//包装类,保存上游observable
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
熟悉的套路,跟调用subscribe.on方法时类似,只是多了一个验证缓冲区大小不为空的代码,这些我们都略过,直接看ObservableObserveOn这个类:
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
//如果传入的调度器是TrampolineScheduler,则不切换线程,在当前线程调度
//但是调度的任务并不是马上执行,而是等待当前任务执行完毕再执行
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
//创建工作者worker
Scheduler.Worker w = scheduler.createWorker();
//上游的subscribe,该方法会触发上游的subscribeActual,
//ObserveOnObserver也是一个包装类,保存下游的observer
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
//......代码省略
}
熟悉的装饰器模式:
- 1、ObservableObserveOn继承了AbstractObservableWithUpstream,其继承的ObservableSource类型的source成员变量用于保存上游的observable。
- 2、AndroidSchedulers.mainThread()为本次传入的scheduler,负责将线程切换到UI线程。
- 3、下游调用subscribe方法是触发=>当前observable的subscribeActual方法=>触发上游observable的subscribe方法=>传入参数包装类ObserveOnObserver(包装了下游的观察者observer)。
这里简要介绍下步骤3:
这里的source是上游的observable对象,source.subscribe()
方法实际调用的是上游observable对象的subscribeActual方法,并将下游observer对象的包装类ObserveOnObserver作为参数传递进去,在上游observable对象的subscribeActual方法内,调用ObserveOnObserver包装类中的onSubscribe,onNext等方法,进而调用下游observer的onSubscribe,onNext等方法。
接下来看下ObserveOnObserver这个下游observer包装类:
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
private static final long serialVersionUID = 6576896619930983584L;
//下游observer
final Observer<? super T> actual;
//调度工作者
final Scheduler.Worker worker;
//当订阅任务执行出错时,是否延迟发送错误消息,默认为false,也就是不延迟
final boolean delayError;
//缓冲区大小,缓存上游发送的事件
final int bufferSize;
//存储上游observable发出的数据队列
SimpleQueue<T> queue;
//存储管理下游observer的订阅状态disposable
Disposable s;
//订阅任务执行出错时,存储错误信息
Throwable error;
//订阅任务是否终止
volatile boolean done;
//订阅任务是否被取消
volatile boolean cancelled;
//任务执行模式----同步还是异步
int sourceMode;
//是否输出融合(通常情况下该选项为false)
boolean outputFused;
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.actual = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
public void onSubscribe(Disposable s) {
//当前的disposable为null,上游subscribe产生的disposable不为null,则验证通过
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
//如果订阅时获取的disposable对象s是QueueDisposable类型的
if (s instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
//新建QueueDisposable队列并将订阅时获取的disposable对象s强转为QueueDisposable,然后赋值给queue
QueueDisposable<T> qd = (QueueDisposable<T>) s;
//获取任务运行模式
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
//判断运行模式,并且调用下游observer的onSubscribe方法将当前observer在订阅时产生的disposable传递给下游observer
if (m == QueueDisposable.SYNC) {
sourceMode = m;
queue = qd;
//为true 使得接下来的onXXX等方法均不会执行
done = true;
actual.onSubscribe(this);
//worker直接调度任务
schedule();
return;
}
if (m == QueueDisposable.ASYNC) {
sourceMode = m;
queue = qd;
actual.onSubscribe(this);
//在异步模式下,等待onXXX方法中的worker调度
return;
}
}
//否则创建一个支持单一生产者单一消费者的队列
queue = new SpscLinkedArrayQueue<T>(bufferSize);
//调用下游observer的onSubscribe方法将当前observer在订阅时产生的disposable传递给下游observer
actual.onSubscribe(this);
}
}
//......代码省略
}
onSubscribe方法调用后,就开始执行onXXX方法了,首先是onNext方法,这个方法可以反复调用:
@Override
public void onNext(T t) {
//订阅模式是同步模式或者执行过onComplete/onError方法时,此时done为true,直接返回
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
//将上游数据源发射的数据添加到缓存队列中
queue.offer(t);
}
//开始worker调度任务
//(这里面调用了handler,将数据发送到主线程所在的消息队列,进而更新UI界面,这里稍后分析)
schedule();
}
其次是onError和onComplete,这两个方法只能执行一次并且是互斥的:
@Override
public void onError(Throwable t) {
//如果任务状态已经是终止状态,再执行该任务是就会抛出异常
if (done) {
RxJavaPlugins.onError(t);
return;
}
error = t;
//设置任务状态为终止状态
done = true;
//worker任务调度
schedule();
}
@Override
public void onComplete() {
//如果任务是终止状态,直接返回
if (done) {
return;
}
//设置任务状态为终止状态
done = true;
//worker任务调度
schedule();
}
这里onNext,onComplete,onError最后都调用schedule()来调度任务:
//调用worker.schedule(this)开始任务调度
void schedule() {
//这里通过getAndIncrement() == 0原子性的保证了worker.schedule(this)在调度完之前不会再次被调度
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
上面在执行worker.schedule(this)时传入了this,也就是当前对象ObserveOnObserver,ObserveOnObserver类实现了Runnable接口,因此worker.schedule(this)调度的任务就是自己run()实现方法中的任务:
@Override
public void run() {
//outputFused通常情况下为false
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
drainFused()通常情况下不会执行,我们只需要关注drainNormal()方法即可,在查看该方法之前,先看下drainNormal()内部调用的一个验证方法checkTerminated(boolean d, boolean empty, Observer<? super T> a)
,该方法主要是检测任务是否已终止:
boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
//如果订阅被取消,清空数据缓存队列
if (cancelled) {
queue.clear();
return true;
}
//这个d就是done
if (d) {
//done为true时,有两种情况,在onNext调度完毕后执行onComplete或onError
Throwable e = error;
if (delayError) {
//如果是延迟发送错误的情况,必须等到queue(缓存上游observable发出的数据)为空的情况下才能发送错误(有错误的情况下)
if (empty) {
if (e != null) {
a.onError(e);
} else {
a.onComplete();
}
//终止worker任务调度
worker.dispose();
return true;
}
} else {
//不延迟发送错误时,直接调用
if (e != null) {
//如果任务执行出错,即调用了onNext方法,清空queue
queue.clear();
//调用下游observer的onError
a.onError(e);
//终止worker调度
worker.dispose();
return true;
} else
if (empty) {
//任务正常执行,未出现错误
//调用下游observer的onComplete,并终止worker调度
a.onComplete();
worker.dispose();
return true;
}
}
}
//否则返回false,任务还未终止
return false;
}
任务结束情况分为以下两种
- 1、订阅被取消,即cancelled==true,此时任务为终止状态。
- 2、任务运行标记done==true,即onNext调用完毕,调用onComplete正常结束;或者在onNext调用过程中出现错误,调用了onNext,此时是异常结束,会发送异常信息。
说完了这个方法,我们继续看drainNormal()这个方法:
void drainNormal() {
//这个变量只是一个控制变量,用来确保drainNormal()方法能被原子性调用
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
//外层死循环
for (;;) {
//根据数据缓存队列是否为空检查任务是否已终止
//为空就会直接跳出外层循环,方法执行结束,内层的循环也不会执行
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
//内层死循环,负责具体的任务处理
for (;;) {
boolean d = done;
T v;
try {
//从队列中获取数据
v = q.poll();
} catch (Throwable ex) {
//任务执行出现错误,抛出致命错误信息,结束循环,终止本次任务
Exceptions.throwIfFatal(ex);
//终止订阅
s.dispose();
//清空缓存队列
q.clear();
a.onError(ex);
//停止worker调度
worker.dispose();
return;
}
boolean empty = v == null;
//判断从队列中取出的数据是否为空,即判断队列是否为空
if (checkTerminated(d, empty, a)) {
return;
}
//队列中没有了数据,直接退出
if (empty) {
break;
}
//调用下游observer的onNext(onComplete和onError均在checkTerminated方法里调用)
a.onNext(v);
}
//这里主要是确保在同一时间只有一个worker.schedule(this)正在执行。
//missed变量在方法最开始初始化为1,这里missed会被重置为0,这样下面的missed==0成立,当前任务结束。
//addAndGet(-missed)方法也会将AtomicInteger内部的VALUE值设置为0。
//同时,run()方法中的判定方法getAndIncrement() == 0成立,继续执行下一个worker.schedule(this)。
//如果程序没有走到这里,那么missed==0也就不成立,相应的run()方法中的getAndIncrement() == 0不成立,也就不会执行下一个worker.schedule(this)。
//这样就原子性的确保了同一时间只有一个worker.schedule(this)正在执行,即同一时间只有一个drainNormal()方法在执行。
missed = addAndGet(-missed);
//程序走到这里,表示当前任务正常结束,退出循环。
//run()方法继续执行,getAndIncrement() == 0成立,开始执行下一个worker.schedule(this)。
if (missed == 0) {
break;
}
}
}
到这里,ObserveOnObserver这个下游observer包装类也就介绍的差不多了,简要总结下:
- 1、构造方法接收了外部创建的工作者worker,负责任务调度。
- 2、内部定义了一个缓存队列,缓存上游observable发射的数据。
- 3、ObserveOnObserver的父类继承了AtomicInteger类,这样在多线程环境中,可以通过AtomicInteger的CAS操作确保线程安全。
- 4、ObserveOnObserver类实现了Runnable接口,在run()方法内,使用worker进行任务调度,并通过getAndIncrement()==0来确保worker任务调度的原子性操作。
- 5、drainNormal()为实际调度执行的方法,其内部声明了一个局部变量missed,并初始化为1。然后开始死循环,在外层循环末尾处(内层循环正常结束,消息队列中的数据都被取了出来并处理完毕,此时消息队列为空,结束内层循环),调用missed = addAndGet(-missed)后,ObserveOnObserver内部的VALUE值被重置为0,最后判定missed==0成立,程序跳出死循环,drainNormal()方法执行完毕。最后run()方法继续执行,此时getAndIncrement()==0成立,开始下一个worker任务调度。
如果在循环途中因为某些原因(任务已执行完毕或出现异常)而中途退出循环没能执行到循环末尾,那么其内部的VALUE值就不为0,getAndIncrement()==0不成立,不会执行worker调度,因此整个订阅也就结束了。
在上一篇文章中,订阅线程最终是通过调度器来执行具体切换过程的。同样的,对于观察者线程切换执行的也是类似的过程。前面分析道,ObservableObserveOn构造方法接收我们传入的调度器scheduler,并通过scheduler创建工作者worker,将其传入到ObserveOnObserver的构造方法中,最后在run()方法中执行具体的任务调度。因此观察者的线程切换肯定是发生在worker的调度过程中。
先从observeOn(AndroidSchedulers.mainThread())
的参数AndroidSchedulers.mainThread()开始,点进去看下:
public final class AndroidSchedulers {
private static final class MainHolder {
//构造一个与UI线程关联的Handler,并将其作为参数构造HandlerScheduler调度器
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
/** A {@link Scheduler} which executes actions on the Android main thread. */
//返回与主线程相关的scheduler调度器
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
/** A {@link Scheduler} which executes actions on {@code looper}. */
//自己指定线程Looper自定义观察者线程切换
public static Scheduler from(Looper looper) {
if (looper == null) throw new NullPointerException("looper == null");
return new HandlerScheduler(new Handler(looper));
}
private AndroidSchedulers() {
throw new AssertionError("No instances.");
}
}
AndroidSchedulers.mainThread()最终返回HandlerScheduler对象,HandlerScheduler也是继承自Scheduler,其构造方法接收一个Handler类型的参数,这个Handler通过Looper.getMainLooper()与UI线程关联起来,这样通过Handler发送的消息就能被UI线程接收,从而更新UI界面。具体返回HandlerScheduler对象的步骤与上文所介绍的订阅线程切换生成IoScheduler一致,不再详述。
熟悉Handler的童鞋都知道,Handler在安卓的消息机制中占有重要的地位,它贯穿着整个安卓的体系,在很多地方我们都能见到它的身影。在刚开始接触安卓开发的时候我们都写过类似下面的代码:
private Handler handler = new Handler(){
@Override
public void handleMessage(Message msg) {
super.handleMessage(msg);
//更新UI线程代码
//获取子线程发送的message消息里面的请求数据,更新UI界面
//操作省略
}
};
public void getData(){
new Thread(new Runnable() {
@Override
public void run() {
//1、网络请求数据并返回
//2、获取Message对象,并将请求到的数据用Message包裹起来
//3、调用handler的sendMessage等方法发送Message
}
}).start();
}
随着现在各种各样网络请求框架的出现,大大简化了我们网络请求更新UI的操作,上面的代码我们很少再去写了,但并不意味着它就不重要了,尤其是Handler。其实很多的网络请求框架都只不过是将上面的操作封装起来了而已,RxJava虽然与网络请求无关,但在观察者线程切换里面同样也是将上面的过程封装起来,方便我们使用。
我们接着来看HandlerScheduler这个类:
final class HandlerScheduler extends Scheduler {
private final Handler handler;
HandlerScheduler(Handler handler) {
this.handler = handler;
}
//......代码省略
@Override
public Worker createWorker() {
return new HandlerWorker(handler);
}
//......代码省略
}
HandlerScheduler继承自Scheduler,并实现了createWorker()方法生成任务调度worker,这里返回的是一个HandlerWorker对象,前面提到的worker.schedule(this)中的worker实际上就是这个HandlerWorker。
private static final class HandlerWorker extends Worker {
//保存外部传进来的Handler,这里保存的是与UI线程关联的Handler,具体参见上面介绍
private final Handler handler;
//事件订阅状态标志位
private volatile boolean disposed;
HandlerWorker(Handler handler) {
this.handler = handler;
}
//worker.schedule(this)最终调用的方法,这里delay为0,表示handler无延迟发送消息
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
//如果订阅已终止,返回带有终止状态的disposable
if (disposed) {
return Disposables.disposed();
}
//对任务做一些自己的处理(默认情况下没做任何处理)
run = RxJavaPlugins.onSchedule(run);
//包装类
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
//获取message,设置其target成员为handler,callback成员为scheduled
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
//将消息发送到UI关联的消息队列中,此时handler中的queue是UI线程中的queue
//(参考new Handler(Looper.getMainLooper()))
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
// Re-check disposed state for removing in case we were racing a call to dispose().
//再次检查订阅是否被终止,若已被终止,移除handler中的callback,并返回带有终止状态的disposable
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
//订阅终止
@Override
public void dispose() {
disposed = true;
handler.removeCallbacksAndMessages(this /* token */);
}
@Override
public boolean isDisposed() {
return disposed;
}
}
在HandlerWorker中,将任务run用ScheduledRunnable包装起来,在设置到Message的callback中,生成一个Message,熟悉handler的都知道,handler通过dispatchMessage(Message msg)方法来进行消息的分发处理,这个方法的处理顺序如下:
- 1、判断message中的callback(该callback是一个Runnable)是否为空,若不为空,调用handleCallback(msg),在handleCallback(msg)中,直接执行message.callback.run()。若为空,进入步骤2。
- 2、判断handler中的callback(该callback是Handler内部的一个内部接口,由我们自己实现)是否为空,若不为空,执行callback.handleMessage(msg)。若为空,进入步骤3。
- 3、若步骤1和步骤2中的判断均不成立,执行handleMessage(msg),也就是我们前面写的样例代码。
在这里由于Message中的callback不为空,因此执行执行callback中的run()方法,也就是前面提到的ObserveOnObserver(实现了Runnable接口)内部的run()方法,进而调用drainNormal()方法,最终调用下游observer的onNext。
生成Message后,调用handler的sendMessageDelayed方法发送消息(这里delay参数为0,因此是立即发送),由于这里的handler是与UI线程关联在一起的,因此ObserveOnObserver(实现了Runnable接口)内部的run()方法就被发送到了UI线程中的消息队列中,最终通过handler的dispatchMessage(Message msg)方法调用handleCallback(msg),最后调用message.callback.run()执行run()方法里面的代码,完成UI线程更新。
在文章的最后,我们来看下这个run任务的包装类ScheduledRunnable:
private static final class ScheduledRunnable implements Runnable, Disposable {
private final Handler handler;
private final Runnable delegate;
private volatile boolean disposed;
//保存外部传入的handler和runnable任务
ScheduledRunnable(Handler handler, Runnable delegate) {
this.handler = handler;
this.delegate = delegate;
}
@Override
public void run() {
try {
//执行run方法,即传入的ObserveOnObserver(实现了Runnable接口)内部的run()方法
delegate.run();
} catch (Throwable t) {
IllegalStateException ie =
new IllegalStateException("Fatal Exception thrown on Scheduler.", t);
RxJavaPlugins.onError(ie);
Thread thread = Thread.currentThread();
thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
}
}
//订阅终止,从handler中移除该runnable
//(实际上是从UI线程内部的队列中将包装这个runnable的message移除,如果这个message还未处理的话)
@Override
public void dispose() {
disposed = true;
handler.removeCallbacks(this);
}
@Override
public boolean isDisposed() {
return disposed;
}
}
至此,整个观察者线程切换也就介绍完了,最后我们再来梳理下思路:
- 1、observeOn(AndroidSchedulers.mainThread())返回一个ObservableObserveOn对象,该对象内部(source)保存了上游传递过来的observable。
- 2、下游执行subscribe时,实际上执行的是ObservableObserveOn对象内部的subscribeActual方法,该方法内部首先判断调度器的类型(具体参看上面介绍),这里传入的调度器类型是HandlerScheduler。
- 3、将下游observer对象用ObserveOnObserver包装起来,生成ObserveOnObserver对象。
- 4、执行source.subscribe方法,并将步骤3中生成的ObserveOnObserver对象作为参数传递进去。
- 5、步骤4中的subscribe方法实际上执行的是上游observable的subscribeActual方法,在该方法中ObserveOnObserver内部的onSubscribe方法首先被执行,在这个onSubscribe方法内部调用下游observer的onSubscribe方法,建立订阅关系。
- 6、执行完onSubscribe方法后,ObserveOnObserver内部的onNext和onComplete或者onNext依次被执行(onNext可被反复调用)。
- 7、在ObserveOnObserver内部的onNext方法中,将上游发射的数据缓存到队列中,然后调用schedule方法,通过HandlerScheduler生成的worker对象(HandlerWorker)开始调度任务,通过步骤1中调用AndroidSchedulers.mainThread()时生成的关联UI线程的handler将Runnable任务发送到UI线程的消息队列中。
- 8、ObserveOnObserver实现了Runnable接口,在其实现方法run()中,最终调用了drainNormal()方法,该方法内有两个嵌套的循环,且都是死循环。外层循环结束的条件是当次订阅任务被终止(外部取消或者任务执行完毕或任务执行出错),内层循环结束的条件是当次订阅任务执行完毕或执行出现错误。在内层循环内部,不断的从缓存队列中取出数据,最后调用下游observer的onNext方法。在步骤7中,Runnable任务最后被发送到了UI线程内的消息队列中,因此这个run()方法也是运行在UI线程,最终在UI线程中我们可以用这些数据更新UI界面。
- 9、ObserveOnObserver内部的onComplete和onError执行过程与onNext类似,区别是onComplete和onNext只会调用一次并且是互斥调用(参见上面源码分析)。同样,最终也会调用drainNormal()方法,只不过在drainNormal()方法的外层死循环内,首先调用的是checkTerminated方法,用来判断当前订阅任务是否已被终止,下游observer的onComplete和onError就是在这个方法里调用的。
下一章RxJava2笔记(五、订阅流程梳理以及线程切换次数有效性)将对前面做一个流程梳理,以此来结束RxJava的学习。
网友评论