对于RxJava来说,简洁的线程切换操作是它优秀的地方之一。所以了解它的线程调度原理是完全有必要,这个既能帮助我们理解其中的奥妙,同时如果自己在开发当中需要做类似的需求,可以作为一个参考。同时,RxJava的线程调度原理也是RxJava当中比较难以理解的一部分。
1.概述
说线程调度原理比较难以理解,其实我觉得就是涉及到的类比较多,然后他们之间的关系又难以梳理清楚,所以显得这部分的代码高深莫测。在这里,我先对线程调度涉及到的比较重要的类做一个小小的说明。
类名 | 含义 |
---|---|
Schedulers | 此类里面提供了我们需要使用的调度器对象 |
Scheduler | 线程调度器,Observer或者Observable通过此类进行线程调度,每一个线程对应一个调度器 |
Worker | 真正执行任务的类,每个调度器都需要获取一个Worker对象来执行任务 |
Runnable | 在整个调度的过程中,一个任务会包装成一个Runnable来执行,包括线程池和Handler |
整个线程调度的核心在Scheduler
和Worker
类中。接下来,我们重点分析这两个类,看这两个类怎么担任线程调度的重任的。
2. Scheduler的原理分析
Scheduler
是一个抽象类,我们来看看它的源码:
public abstract class Scheduler {
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
public abstract Worker createWorker();
}
createWorker
方法是一个非常重要的方法,后续的Scheduler
都必须在这个方法里面返回一个Worker
。所以每一个Scheduler
都对应着一个Worker
。
其次scheduleDirect
方法也是非常重要,当一个观察者被subscribe
时,会调用到此方法来。当然这时候可能还不清楚是怎么调用到这里来的,我们也不用太害怕,这里我们只需要知道会调用这个方法来的就行,后续我会详细的追踪源码来解释。
如果此时的Scheduler
是一个子线程的Scheduler
,例如接下来要讲的NewThreadScheduler
,那么此时提交到Worker
去执行,肯定在NewThreadScheduler
所在线程执行。同理,如果是Android主线程的Scheduler
,肯定就在主线程里面执行了。
我们发现在这个方法里面,先是调用了createWorker
创建一个Worker
对象,然后后续经过一系列的包装,将任务包装成为一个Runnable,然后调用Worker
的schedule
方法来执行任务。
接下来,我们看几个Scheduler
的子类。
(1).NewThreadScheduler
public final class NewThreadScheduler extends Scheduler {
final ThreadFactory threadFactory;
public NewThreadScheduler() {
this(THREAD_FACTORY);
}
public NewThreadScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}
@NonNull
@Override
public Worker createWorker() {
return new NewThreadWorker(threadFactory);
}
}
我简略了一下NewThreadScheduler
的代码,让它看起来更加的清晰。整个过程,我们发现在createWorker
方法里面返回了一个 NewThreadWorker
对象,其他感觉也没什么东西,是不是感觉咱们分析错了。当然不是,核心在Worker
里面,当然此时不是分析Worker
的时机。我们只需要记住整个NewThreadScheduler
工作流程。
(2).HandlerScheduler
我们再来看看HandlerScheduler
的源码:
final class HandlerScheduler extends Scheduler {
private final Handler handler;
private final boolean async;
HandlerScheduler(Handler handler, boolean async) {
this.handler = handler;
this.async = async;
}
@Override
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
handler.postDelayed(scheduled, unit.toMillis(delay));
return scheduled;
}
@Override
public Worker createWorker() {
return new HandlerWorker(handler, async);
}
}
在Android当中,如果想要在主线程中工作,Handler
是必不可少的。所以,我们在HandlerScheduler
中看到Handler
也是理所当然的。
不过,这里我们发现,HandlerScheduler
的scheduleDirect
方法并没有去调用createWorker
方法来获取一个Worker
对象,而是直接将交给了handler来执行。这是为什么?
在这里,我们得区分scheduleDirect
方法的调用时机,scheduleDirect
不是每个Scheduler
里面都会被调用,而是这有上游的Scheduler
才会被调用,也就是subscribeOn
里面的Scheduler
,在observeOn
里面的Scheduler
走的是另一个流程。
所以,当HandlerScheduler
的scheduleDirect
方法被调用时,此时肯定是上游在工作,这样直接提交给Handler来执行了就是了,不必要去绕圈子。
3. Worker的工作原理
现在我们来看看Worker的相关源码:
public abstract static class Worker implements Disposable {
@NonNull
public Disposable schedule(@NonNull Runnable run) {
return schedule(run, 0L, TimeUnit.NANOSECONDS);
}
@NonNull
public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);
}
我将很多繁琐的代码省去了,Worker
的代码看上就是这样的,其中,我们发现就有一个 schedule
方法。这个方法之前在Scheduler
的scheduleDirect
方法里面我们看到过,在scheduleDirect
方法里面传递一个Runnable
来执行。
我们来看一个Worker的子类,看看是怎么执行的。
(1).NewThreadWorker
先来看看NewThreadWorker
的源码:
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
volatile boolean disposed;
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
@NonNull
@Override
public Disposable schedule(@NonNull final Runnable run) {
return schedule(run, 0, null);
}
@NonNull
@Override
public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (disposed) {
return EmptyDisposable.INSTANCE;
}
return scheduleActual(action, delayTime, unit, null);
}
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
}
好像代码有点长😅,但是不急,我们一一的来分析。
首先在构造方法里面,创建了一个线程池的对象,关于线程池相关的知识,这个属于Java基础知识,这里就不进行讲解,我默认大家都懂哈😂。
然后就是一系列的schedule
调用,最终会调用到scheduleActual
方法里面去。scheduleActual
方法里面看是做了很多的操作,其实根本目的就是调用ScheduledExecutorService
的submit
方法提交一个任务而已。
对的,NewThreadWorker
就是这么简单,是不是之前将它想的太难了?哈哈哈😂
整个Scheduler
和Worker
的调用差不多就是这样的,但是是不是觉得,现在还是一脸懵逼,看到这里,我们还是不知道RxJava究竟是怎么进行线程调度的。
不急,接下来,我们将一个例子入手,由浅入深的带领大家熟悉线程调度的奥妙。我们熟悉了Scheduler
和Worker
,对其他的才会很快了解。
4. subscribeOn的工作流程
先来举一个栗子:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
}
}).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
是不是觉得这个栗子非常的熟悉,像极了前世的恋人🤓。熟悉就好,就怕大家淡忘了自己心中那个人。
这个使用方法,其他的大佬不知道讲解了多少,在这里我就不重复了,直接开门见山的分析subscribeOn
方法。
subscribeOn
方法主要是跟ObservableSubscribeOn
类有关。我们来看看ObservableSubscribeOn
类:
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
}
首先,ObservableSubscribeOn
的构造方法里面多了一个Scheduler
参数,这个参数到底是谁的对象得取决于在subscribeOn
时。上面的栗子,我们传递的是schedulers.newThread()
,所以这里的scheduler
就是NewThreadScheduler
的对象。
在subscribeActual
方法里面,我们发现:
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
先来看看SubscribeTask
是什么东西:
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
SubscribeTask
从根本来说就是一个Runnable,当这个Runnable被执行的时候,也就是run
方法被调用时,我们发现此时调用了subscribe
方法,此时任务就正式启动。假如,我们在向后台获取数据,此时就已经开始获取了。
然后,我们发现调用了Scheduler
的scheduleDirect
,还记得这个方法在做什么吗?这个方法我们在前面讲解Scheduler
时已经讲解,在这个方法里面显示调用onCreateWorker
方法来获取一个Worker
对象,然后将我们的任务提交给Worker
来执行。
这就是整个subscribeOn
的流程,是不是很简单?
5. observeOn的工作流程
observeOn
方法主要是表示下游的方法执行所在的线程,跟subscribeOn
方法差不多,observeOn
主要跟ObservableObserveOn
,我们来看看ObservableObserveOn
的相关代码:
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
}
还是先来看看构造方法,同样的,scheduler
也是一个调度器,这里由于Android的主线程,所以是HandlerScheduler
;delayError
表示是否延迟抛出异常;bufferSize
表示缓冲区的大小,如果上游的发射速度比较快的话,这里有可能出现背压问题。
然后看看subscribeActual
方法里面,我们发现,在这里,调用了onCreateWorker
方法来获取一个Worker
对象,然后subscribe
了一个ObserveOnObserver
对象,传进去了一些相关参数。看来真正的核心在ObserveOnObserver
这里面。
不过在这里,我们发现ObservableObserveOn
跟ObservableSubscribeOn
的subscribeActual
方法还是有很大的区别,在ObservableObserveOn
的subscribeActual
方法没有调用Scheduler
的相关方法。之前,我们说了,scheduleDirect
方法只在上游执行,从这里就可以看出来。
我们来看看ObserveOnObserver
相关代码,ObserveOnObserver
比较重要的是onNext
方法,看看在方法里面是怎么将数据分发到主线程去执行的。
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
onNext
方法还是比较简单,先将数据放在队列里面去,然后在调用schedule
方法。我们再来看看schedule
方法:
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
这里getAndIncrement() == 0
的意思比较好理解,表示当前队列中只有一个数据,就调用Worker
的schedule
方法。到这里,我们根本没有看到调用Observer的onNext方法,是不是分析错了?肯定不是的,我们来看看Worker
的schedule
方法。这里的Worker
是HandlerWorker
,所以我们看一下HandlerWorker
的schedule
方法:
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
if (async) {
message.setAsynchronous(true);
}
handler.sendMessageDelayed(message, unit.toMillis(delay));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
看到写了这么多,其实就一句话,将传递进来的Runnable
交给Handler
来执行。那个这个Runnable
是谁?就是ObservableObserveOn
自己,Handler
会执行Runnable
的run
方法。
哈哈,现在得回来ObservableObserveOn
的run
方法:
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
这里的两个drain方法着实将我吓着了。其实,认真想一想,跟BufferAsyncEmitter
里面的drain
方法差不多,这里就不深入的讲解,反正这里就是将数据发送到下游的真正操作,而且是在observeOn
方法里面传递的那个线程执行。
6.总结
RxJava的线程调度这部分的知识着实是不好理解,所以尽管我在这里说的天花乱坠,还是得需要我们亲自来看看这部分的代码。在这里,我对这部分的知识做一个简单的总结。
1.每种类型线程对一个Scheduler
,而每个Scheduler
对应一个Worker
,真正操作都是Worker
来完成的。
2.子线程执行的根本就是将我们的subscribe
方法调用放在一个Runnable
里面去执行,类似于代理机制,又像装饰者模式。
网友评论