前言
前两篇文章中,分别对RxJava2的基本路程与链式调用分别做了阐述,如有遗忘,此为传送门
其实,从Rx调用链上,线程调度仅是遵守运转机制的一环,但因其便捷、高频的特点,并在项目中很可能需要切换到自己的线程里,故将其选出,理解如何实现。
正文
案例
ob.observeOn(Schedulers.newThread())
由于流程、链式均做了阐述,因此案例直接定位到线程调度线程。上面的案例中,下游被切换到了新的线程里进行响应。
调度器与工作线程
Rx中,由调度器负责提供工作线程,而工作线程则负责具体的运转。以下为两者的简要信息。
public abstract class Scheduler {
......
public abstract Worker createWorker();
}
public abstract static class Worker implements Disposable {
......
public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);
}
- Scheduler.createWorker(): 获取用来执行任务的线程
- Worker.schedule() : 具体的调度逻辑
当前案例中,ob.observeOn()拿到的Observable为ObservableObserveOn。
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
// 一般会执行到这里
// 通过Scheduler拿到执行线程任务的Worker
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
在Rx构建调用链构建时,将会来到subscribeActual(),且对于当前案例来说,事件推送到对应的下游节点才会进行线程调度,因此,调度信息由ObserveOnObserver保存,具体信息如下:
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.actual = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
}
当事件推送到当前节点,即ObserveOnObserver将进行处理,取onNext()来看
@Override
public void onNext(T t) {
......
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
ObserveOnObserver将作为Runnable的交给工作线程Worker处理,当ObserveOnObserver作为线程任务获得运转时机后,drainFused()或drainNormal()根据具体情况向调用链上的下一节点推送相应事件,代码不贴。而线程调度也就完成了。
为何这么短
当前节点下,关注的事情是Rx如何进行线程调度,而核心为,将当前节点作为Runnable运行在相应的线程里,步骤为:
- 由Scheduler提供工作线程Worker
- 在当前节点的响应时机,将此节点作为Runnable交给Worker处理,在run()时机将事件推送知下一节点
至于Worker是如何处理Runnable,何时获得执行时机,这就是Rx机制之外的问题了,因为已能保证接下来的响应,运行在Worker线程里,即完成了线程调度,如下图:
线程调度.jpg
下一篇:响应式拉取(未发布,待续)
网友评论