美文网首页
RxJava2 线程切换分析

RxJava2 线程切换分析

作者: andev009 | 来源:发表于2018-01-04 17:27 被阅读138次

看了很多学习Rxjava2的Demo,觉得最好的是这个。
https://github.com/amitshekhariitbhu/RxJava2-Android-Samples
这里分析下RxJava2 线程是怎么切换的。分析RxJava2最麻烦的地方在于它的封装类太多,我们只抓主线就行了。首先看RxJava2 线程切换的例子:

getObservable()
.subscribeOn(Schedulers.io())//切换到io线程
.observeOn(AndroidSchedulers.mainThread(), true)//切换到主线程
.subscribe(getObserver());

一、subscribeOn分析
我们从subscribeOn开始分析,传入的参数是Schedulers.io(),源码是:

@NonNull
public static Scheduler io() {
   return RxJavaPlugins.onIoScheduler(IO);
}

RxJava2源码中有大量和RxJavaPlugins相关的代码,我们先不管RxJavaPlugins是干什么的,可以简单认为RxJavaPlugins只是wrap了一层,比如简单认为RxJavaPlugins.onIoScheduler(IO)返回就是IO,这样对主流程没影响。
IO是什么,追踪源码看到:

IO = RxJavaPlugins.initIoScheduler(new Callable<Scheduler>() {
            @Override
            public Scheduler call() throws Exception {
                return IoHolder.DEFAULT;
            }
        });

static final class IoHolder {
     static final Scheduler DEFAULT = new IoScheduler();
}

public final class IoScheduler extends Scheduler{
        ........
}

原来IO就是 IoScheduler,先看到这里,再回到subscribeOn方法本身。

public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>
                 (this, scheduler));//返回一个ObservableSubscribeOn对象
}

进入ObservableSubscribeOn类,

public final class ObservableSubscribeOn<T> extends 
       AbstractObservableWithUpstream<T, T> {
        final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, 
       Scheduler scheduler) {
        super(source);//source就是最初的Observable
        this.scheduler = scheduler;//scheduler就是最初传进来的IoScheduler
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new 
        SubscribeOnObserver<T>(s);
        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
            @Override
            public void run() {
                source.subscribe(parent);
            }
        }));
    }
....................
}

可见ObservableSubscribeOn只是把最初的Observable和IO封装了下。
这里说下本文开头例子的执行流程,当getObservable()最后执行subscribe(getObserver());时,会走到上面的subscribeActual方法,为什么会走到这里可以查源码相关部分,这里不分析。来看下subscribeActual做了什么。方法里重点在这句

parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
       @Override
       public void run() {
             source.subscribe(parent);//重点在这里,封装成了Runnable,发送数据
       }
}));

上面的IoScheduler执行了scheduleDirect方法,并且把source.subscribe(parent);这个发送数据动作封装成了个Runnable。看到这里,猜到IoScheduler里有个线程来执行这个Runnable,这样就把线程给切换了。
来看看IoScheduler的scheduleDirect方法,IoScheduler类没有这个方法,scheduleDirect方法存在于基类Scheduler里,经过一个重载方法,最终走到这里:

public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        w.schedule(new Runnable() {
            @Override
            public void run() {
                try {
                    decoratedRun.run();
                } finally {
                    w.dispose();
                }
            }
        }, delay, unit);
        return w;
    }

看到这里,猜测Worker w里有线程来跑这个wrap过的decoratedRun。
createWorker()是IoScheduler类自身的方法:

public Worker createWorker() {
    return new EventLoopWorker(pool.get());
}

EventLoopWorker类的主要方法如下:

 //构造函数
EventLoopWorker(CachedWorkerPool pool) {
            this.pool = pool;
            this.tasks = new CompositeDisposable();
            this.threadWorker = pool.get();
}
//上面scheduleDirect方法里Worker要调用这个方法
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }
            return threadWorker.scheduleActual(action, delayTime, unit, 
                   tasks);
}

可见把跑Runnable的任务交给了threadWorker,看下threadWorker到底是什么:

static final class ThreadWorker extends NewThreadWorker{
           .............
}
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
         private final ScheduledExecutorService executor;
         public NewThreadWorker(ThreadFactory threadFactory) {
                executor = SchedulerPoolFactory.create(threadFactory);
         }

        //删除了一些判断代码,主干代码如下
        public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
              Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
              ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, 
        parent);

              if (delayTime <= 0) {
                     f = executor.submit((Callable<Object>)sr);
               } else {
                     f = executor.schedule((Callable<Object>)sr, delayTime, unit);
                }
               return sr;
           }
}

可见Runnable经过wrap后,最终交给了ScheduledExecutorService去执行。
到这里可以总结下,所谓线程切换只是将要跑的任务封装成Runnable,然后把Runnable交给ExecutorService去执行,源码看起来复杂的地方就在于一层层的往下传这个Runnable给真正执行的对象。

题外的分析
上面提到Runnable给ThreadWorker去跑,ThreadWorker从哪里来?看上面EventLoopWorker的构造函数源码,从里面看到threadWorker从CachedWorkerPool里得到:

this.threadWorker = pool.get();

CachedWorkerPool的主要代码如下:

    static final class CachedWorkerPool implements Runnable {
        private final long keepAliveTime;
        private final ConcurrentLinkedQueue<ThreadWorker> 
                                   expiringWorkerQueue;
        final CompositeDisposable allWorkers;
 
        ThreadWorker get() {
            if (allWorkers.isDisposed()) {
                return SHUTDOWN_THREAD_WORKER;
            }
            while (!expiringWorkerQueue.isEmpty()) {
                ThreadWorker threadWorker = expiringWorkerQueue.poll();//出队
                if (threadWorker != null) {
                    return threadWorker;
                }
            }
            // No cached worker found, so create a new one.
            ThreadWorker w = new ThreadWorker(threadFactory);//新构建
            allWorkers.add(w);
            return w;
        }
}

可以看到ThreadWorker先从expiringWorkerQueue中取,如果队列是空的,就创建个新的ThreadWorker。
那啥时候把ThreadWorker放入expiringWorkerQueue队列呢?看代码:

 void release(ThreadWorker threadWorker) {
     // Refresh expire time before putting worker back in pool
    threadWorker.setExpirationTime(now() + keepAliveTime);
     expiringWorkerQueue.offer(threadWorker);//加入队列
}

谁调用release呢?就是上面的EventLoopWorker的dispose方法:

    @Override
        public void dispose() {
            if (once.compareAndSet(false, true)) {
                tasks.dispose();

               // releasing the pool should be the last action
                // should prevent pool reuse in case there is a blocking
                // action not responding to cancellation
//                threadWorker.scheduleDirect(() -> {
//                    pool.release(threadWorker);
//                }, 0, TimeUnit.MILLISECONDS);

                pool.release(threadWorker);
            }
        }

二、observeOn分析
observeOn的流程和subscribeOn差不多,也是封装成了Runnable交给Scheduler,Scheduler再找对象去执行,例子里是Scheduler找Handler来实现的,下面来分析。来看AndroidSchedulers.mainThread(),MAIN_THREAD其实被封装过的DEFAULT这个Scheduler:

public static Scheduler mainThread() {
     return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));

可以看到HandlerScheduler传入的参数是个MainLooper的Handler,这里就可以猜到RxJava2是用这个Handler将线程切换到主线程上。来看看HandlerScheduler源码:

final class HandlerScheduler extends Scheduler {
   public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
        if (run == null) throw new NullPointerException("run == null");
        if (unit == null) throw new NullPointerException("unit == null");

        run = RxJavaPlugins.onSchedule(run);
        ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
        handler.postDelayed(scheduled, Math.max(0L, unit.toMillis(delay)));
        return scheduled;
    }

    @Override
   public Worker createWorker() {
        return new HandlerWorker(handler);
   }
   private static final class HandlerWorker extends Worker {
       //删除一些判断代码,只看主干代码
       @Override
        public Disposable schedule(Runnable run, long delay, TimeUnit unit) 
         {
            ScheduledRunnable scheduled = new 
              ScheduledRunnable(handler, run);
            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.
           handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
            return scheduled;
        }
   }
     ................
}

可以看到HandlerWorker结构非常类似在分析subscribeOn时最后碰到的NewThreadWorker,任务Runnable经过封装后最后都是交给这些Worker的schedule方法来执行。这里是通过Handler机制来运行Runnable的,上面代码很多,其实只用看这句:

Message message = Message.obtain(handler, scheduled);//获得Message,scheduled作为这个Message的callback变量

Message被handler发送出去,scheduled这个callback Runnable就在MainLooper所在线程里执行了。
题外的分析-Handler机制
下面简要分析message被handler发送后,scheduled这个callback Runnable怎么执行的。
Handler里有个Looper对象,Looper里有个MessageQueue(消息队列),Looper里有个loop()方法不断从MessageQueue取Message,源码精简后的流程如下:

public final class Looper {
      //通过ThreadLocal来存取Looper,每个线程只有一个Looper
      public static @Nullable Looper myLooper() {
             return sThreadLocal.get();
       }
      public static void loop() {
            final Looper me = myLooper();//获得Looper
            final MessageQueue queue = me.mQueue;//获得Looper里的
            MessageQueue
            for (;;) {
                   Message msg = queue.next();//获取消息队列的每个消息
                   msg.target.dispatchMessage(msg); //重点来了,target就是handler
            }
       }
       ....................
}
public class Handler {
      public void dispatchMessage(Message msg) {
               if (msg.callback != null) {//判断msg有callback先执行
                      handleCallback(msg);
                } else {
                      if (mCallback != null) {
                              if (mCallback.handleMessage(msg)) {
                               return;
                       }
            }
            handleMessage(msg);
      }
      private static void handleCallback(Message message) {
              message.callback.run();//Runnable执行了
       }
        ......................
    }
}

再次提醒,callback就是前面要执行的ScheduledRunnable,这样ScheduledRunnable就在Handler的Looper所在线程中执行了。

相关文章

网友评论

      本文标题:RxJava2 线程切换分析

      本文链接:https://www.haomeiwen.com/subject/izzrnxtx.html