看了很多学习Rxjava2的Demo,觉得最好的是这个。
https://github.com/amitshekhariitbhu/RxJava2-Android-Samples
这里分析下RxJava2 线程是怎么切换的。分析RxJava2最麻烦的地方在于它的封装类太多,我们只抓主线就行了。首先看RxJava2 线程切换的例子:
getObservable()
.subscribeOn(Schedulers.io())//切换到io线程
.observeOn(AndroidSchedulers.mainThread(), true)//切换到主线程
.subscribe(getObserver());
一、subscribeOn分析
我们从subscribeOn开始分析,传入的参数是Schedulers.io(),源码是:
@NonNull
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
RxJava2源码中有大量和RxJavaPlugins相关的代码,我们先不管RxJavaPlugins是干什么的,可以简单认为RxJavaPlugins只是wrap了一层,比如简单认为RxJavaPlugins.onIoScheduler(IO)返回就是IO,这样对主流程没影响。
IO是什么,追踪源码看到:
IO = RxJavaPlugins.initIoScheduler(new Callable<Scheduler>() {
@Override
public Scheduler call() throws Exception {
return IoHolder.DEFAULT;
}
});
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
public final class IoScheduler extends Scheduler{
........
}
原来IO就是 IoScheduler,先看到这里,再回到subscribeOn方法本身。
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>
(this, scheduler));//返回一个ObservableSubscribeOn对象
}
进入ObservableSubscribeOn类,
public final class ObservableSubscribeOn<T> extends
AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source,
Scheduler scheduler) {
super(source);//source就是最初的Observable
this.scheduler = scheduler;//scheduler就是最初传进来的IoScheduler
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new
SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
source.subscribe(parent);
}
}));
}
....................
}
可见ObservableSubscribeOn只是把最初的Observable和IO封装了下。
这里说下本文开头例子的执行流程,当getObservable()最后执行subscribe(getObserver());时,会走到上面的subscribeActual方法,为什么会走到这里可以查源码相关部分,这里不分析。来看下subscribeActual做了什么。方法里重点在这句
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
source.subscribe(parent);//重点在这里,封装成了Runnable,发送数据
}
}));
上面的IoScheduler执行了scheduleDirect方法,并且把source.subscribe(parent);这个发送数据动作封装成了个Runnable。看到这里,猜到IoScheduler里有个线程来执行这个Runnable,这样就把线程给切换了。
来看看IoScheduler的scheduleDirect方法,IoScheduler类没有这个方法,scheduleDirect方法存在于基类Scheduler里,经过一个重载方法,最终走到这里:
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
w.schedule(new Runnable() {
@Override
public void run() {
try {
decoratedRun.run();
} finally {
w.dispose();
}
}
}, delay, unit);
return w;
}
看到这里,猜测Worker w里有线程来跑这个wrap过的decoratedRun。
createWorker()是IoScheduler类自身的方法:
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
EventLoopWorker类的主要方法如下:
//构造函数
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get();
}
//上面scheduleDirect方法里Worker要调用这个方法
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit,
tasks);
}
可见把跑Runnable的任务交给了threadWorker,看下threadWorker到底是什么:
static final class ThreadWorker extends NewThreadWorker{
.............
}
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
//删除了一些判断代码,主干代码如下
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun,
parent);
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
return sr;
}
}
可见Runnable经过wrap后,最终交给了ScheduledExecutorService去执行。
到这里可以总结下,所谓线程切换只是将要跑的任务封装成Runnable,然后把Runnable交给ExecutorService去执行,源码看起来复杂的地方就在于一层层的往下传这个Runnable给真正执行的对象。
题外的分析
上面提到Runnable给ThreadWorker去跑,ThreadWorker从哪里来?看上面EventLoopWorker的构造函数源码,从里面看到threadWorker从CachedWorkerPool里得到:
this.threadWorker = pool.get();
CachedWorkerPool的主要代码如下:
static final class CachedWorkerPool implements Runnable {
private final long keepAliveTime;
private final ConcurrentLinkedQueue<ThreadWorker>
expiringWorkerQueue;
final CompositeDisposable allWorkers;
ThreadWorker get() {
if (allWorkers.isDisposed()) {
return SHUTDOWN_THREAD_WORKER;
}
while (!expiringWorkerQueue.isEmpty()) {
ThreadWorker threadWorker = expiringWorkerQueue.poll();//出队
if (threadWorker != null) {
return threadWorker;
}
}
// No cached worker found, so create a new one.
ThreadWorker w = new ThreadWorker(threadFactory);//新构建
allWorkers.add(w);
return w;
}
}
可以看到ThreadWorker先从expiringWorkerQueue中取,如果队列是空的,就创建个新的ThreadWorker。
那啥时候把ThreadWorker放入expiringWorkerQueue队列呢?看代码:
void release(ThreadWorker threadWorker) {
// Refresh expire time before putting worker back in pool
threadWorker.setExpirationTime(now() + keepAliveTime);
expiringWorkerQueue.offer(threadWorker);//加入队列
}
谁调用release呢?就是上面的EventLoopWorker的dispose方法:
@Override
public void dispose() {
if (once.compareAndSet(false, true)) {
tasks.dispose();
// releasing the pool should be the last action
// should prevent pool reuse in case there is a blocking
// action not responding to cancellation
// threadWorker.scheduleDirect(() -> {
// pool.release(threadWorker);
// }, 0, TimeUnit.MILLISECONDS);
pool.release(threadWorker);
}
}
二、observeOn分析
observeOn的流程和subscribeOn差不多,也是封装成了Runnable交给Scheduler,Scheduler再找对象去执行,例子里是Scheduler找Handler来实现的,下面来分析。来看AndroidSchedulers.mainThread(),MAIN_THREAD其实被封装过的DEFAULT这个Scheduler:
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
可以看到HandlerScheduler传入的参数是个MainLooper的Handler,这里就可以猜到RxJava2是用这个Handler将线程切换到主线程上。来看看HandlerScheduler源码:
final class HandlerScheduler extends Scheduler {
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
handler.postDelayed(scheduled, Math.max(0L, unit.toMillis(delay)));
return scheduled;
}
@Override
public Worker createWorker() {
return new HandlerWorker(handler);
}
private static final class HandlerWorker extends Worker {
//删除一些判断代码,只看主干代码
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit)
{
ScheduledRunnable scheduled = new
ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
return scheduled;
}
}
................
}
可以看到HandlerWorker结构非常类似在分析subscribeOn时最后碰到的NewThreadWorker,任务Runnable经过封装后最后都是交给这些Worker的schedule方法来执行。这里是通过Handler机制来运行Runnable的,上面代码很多,其实只用看这句:
Message message = Message.obtain(handler, scheduled);//获得Message,scheduled作为这个Message的callback变量
Message被handler发送出去,scheduled这个callback Runnable就在MainLooper所在线程里执行了。
题外的分析-Handler机制
下面简要分析message被handler发送后,scheduled这个callback Runnable怎么执行的。
Handler里有个Looper对象,Looper里有个MessageQueue(消息队列),Looper里有个loop()方法不断从MessageQueue取Message,源码精简后的流程如下:
public final class Looper {
//通过ThreadLocal来存取Looper,每个线程只有一个Looper
public static @Nullable Looper myLooper() {
return sThreadLocal.get();
}
public static void loop() {
final Looper me = myLooper();//获得Looper
final MessageQueue queue = me.mQueue;//获得Looper里的
MessageQueue
for (;;) {
Message msg = queue.next();//获取消息队列的每个消息
msg.target.dispatchMessage(msg); //重点来了,target就是handler
}
}
....................
}
public class Handler {
public void dispatchMessage(Message msg) {
if (msg.callback != null) {//判断msg有callback先执行
handleCallback(msg);
} else {
if (mCallback != null) {
if (mCallback.handleMessage(msg)) {
return;
}
}
handleMessage(msg);
}
private static void handleCallback(Message message) {
message.callback.run();//Runnable执行了
}
......................
}
}
再次提醒,callback就是前面要执行的ScheduledRunnable,这样ScheduledRunnable就在Handler的Looper所在线程中执行了。
网友评论