美文网首页
RxJava2 线程切换分析

RxJava2 线程切换分析

作者: andev009 | 来源:发表于2018-01-04 17:27 被阅读138次

    看了很多学习Rxjava2的Demo,觉得最好的是这个。
    https://github.com/amitshekhariitbhu/RxJava2-Android-Samples
    这里分析下RxJava2 线程是怎么切换的。分析RxJava2最麻烦的地方在于它的封装类太多,我们只抓主线就行了。首先看RxJava2 线程切换的例子:

    getObservable()
    .subscribeOn(Schedulers.io())//切换到io线程
    .observeOn(AndroidSchedulers.mainThread(), true)//切换到主线程
    .subscribe(getObserver());
    

    一、subscribeOn分析
    我们从subscribeOn开始分析,传入的参数是Schedulers.io(),源码是:

    @NonNull
    public static Scheduler io() {
       return RxJavaPlugins.onIoScheduler(IO);
    }
    

    RxJava2源码中有大量和RxJavaPlugins相关的代码,我们先不管RxJavaPlugins是干什么的,可以简单认为RxJavaPlugins只是wrap了一层,比如简单认为RxJavaPlugins.onIoScheduler(IO)返回就是IO,这样对主流程没影响。
    IO是什么,追踪源码看到:

    IO = RxJavaPlugins.initIoScheduler(new Callable<Scheduler>() {
                @Override
                public Scheduler call() throws Exception {
                    return IoHolder.DEFAULT;
                }
            });
    
    static final class IoHolder {
         static final Scheduler DEFAULT = new IoScheduler();
    }
    
    public final class IoScheduler extends Scheduler{
            ........
    }
    

    原来IO就是 IoScheduler,先看到这里,再回到subscribeOn方法本身。

    public final Observable<T> subscribeOn(Scheduler scheduler) {
            ObjectHelper.requireNonNull(scheduler, "scheduler is null");
            return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>
                     (this, scheduler));//返回一个ObservableSubscribeOn对象
    }
    

    进入ObservableSubscribeOn类,

    public final class ObservableSubscribeOn<T> extends 
           AbstractObservableWithUpstream<T, T> {
            final Scheduler scheduler;
    
        public ObservableSubscribeOn(ObservableSource<T> source, 
           Scheduler scheduler) {
            super(source);//source就是最初的Observable
            this.scheduler = scheduler;//scheduler就是最初传进来的IoScheduler
        }
    
        @Override
        public void subscribeActual(final Observer<? super T> s) {
            final SubscribeOnObserver<T> parent = new 
            SubscribeOnObserver<T>(s);
            s.onSubscribe(parent);
    
            parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
                @Override
                public void run() {
                    source.subscribe(parent);
                }
            }));
        }
    ....................
    }
    

    可见ObservableSubscribeOn只是把最初的Observable和IO封装了下。
    这里说下本文开头例子的执行流程,当getObservable()最后执行subscribe(getObserver());时,会走到上面的subscribeActual方法,为什么会走到这里可以查源码相关部分,这里不分析。来看下subscribeActual做了什么。方法里重点在这句

    parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
           @Override
           public void run() {
                 source.subscribe(parent);//重点在这里,封装成了Runnable,发送数据
           }
    }));
    

    上面的IoScheduler执行了scheduleDirect方法,并且把source.subscribe(parent);这个发送数据动作封装成了个Runnable。看到这里,猜到IoScheduler里有个线程来执行这个Runnable,这样就把线程给切换了。
    来看看IoScheduler的scheduleDirect方法,IoScheduler类没有这个方法,scheduleDirect方法存在于基类Scheduler里,经过一个重载方法,最终走到这里:

    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
            final Worker w = createWorker();
            final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
            w.schedule(new Runnable() {
                @Override
                public void run() {
                    try {
                        decoratedRun.run();
                    } finally {
                        w.dispose();
                    }
                }
            }, delay, unit);
            return w;
        }
    

    看到这里,猜测Worker w里有线程来跑这个wrap过的decoratedRun。
    createWorker()是IoScheduler类自身的方法:

    public Worker createWorker() {
        return new EventLoopWorker(pool.get());
    }
    

    EventLoopWorker类的主要方法如下:

     //构造函数
    EventLoopWorker(CachedWorkerPool pool) {
                this.pool = pool;
                this.tasks = new CompositeDisposable();
                this.threadWorker = pool.get();
    }
    //上面scheduleDirect方法里Worker要调用这个方法
    public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
                if (tasks.isDisposed()) {
                    // don't schedule, we are unsubscribed
                    return EmptyDisposable.INSTANCE;
                }
                return threadWorker.scheduleActual(action, delayTime, unit, 
                       tasks);
    }
    

    可见把跑Runnable的任务交给了threadWorker,看下threadWorker到底是什么:

    static final class ThreadWorker extends NewThreadWorker{
               .............
    }
    public class NewThreadWorker extends Scheduler.Worker implements Disposable {
             private final ScheduledExecutorService executor;
             public NewThreadWorker(ThreadFactory threadFactory) {
                    executor = SchedulerPoolFactory.create(threadFactory);
             }
    
            //删除了一些判断代码,主干代码如下
            public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
                  Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
                  ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, 
            parent);
    
                  if (delayTime <= 0) {
                         f = executor.submit((Callable<Object>)sr);
                   } else {
                         f = executor.schedule((Callable<Object>)sr, delayTime, unit);
                    }
                   return sr;
               }
    }
    

    可见Runnable经过wrap后,最终交给了ScheduledExecutorService去执行。
    到这里可以总结下,所谓线程切换只是将要跑的任务封装成Runnable,然后把Runnable交给ExecutorService去执行,源码看起来复杂的地方就在于一层层的往下传这个Runnable给真正执行的对象。

    题外的分析
    上面提到Runnable给ThreadWorker去跑,ThreadWorker从哪里来?看上面EventLoopWorker的构造函数源码,从里面看到threadWorker从CachedWorkerPool里得到:

    this.threadWorker = pool.get();
    

    CachedWorkerPool的主要代码如下:

        static final class CachedWorkerPool implements Runnable {
            private final long keepAliveTime;
            private final ConcurrentLinkedQueue<ThreadWorker> 
                                       expiringWorkerQueue;
            final CompositeDisposable allWorkers;
     
            ThreadWorker get() {
                if (allWorkers.isDisposed()) {
                    return SHUTDOWN_THREAD_WORKER;
                }
                while (!expiringWorkerQueue.isEmpty()) {
                    ThreadWorker threadWorker = expiringWorkerQueue.poll();//出队
                    if (threadWorker != null) {
                        return threadWorker;
                    }
                }
                // No cached worker found, so create a new one.
                ThreadWorker w = new ThreadWorker(threadFactory);//新构建
                allWorkers.add(w);
                return w;
            }
    }
    

    可以看到ThreadWorker先从expiringWorkerQueue中取,如果队列是空的,就创建个新的ThreadWorker。
    那啥时候把ThreadWorker放入expiringWorkerQueue队列呢?看代码:

     void release(ThreadWorker threadWorker) {
         // Refresh expire time before putting worker back in pool
        threadWorker.setExpirationTime(now() + keepAliveTime);
         expiringWorkerQueue.offer(threadWorker);//加入队列
    }
    

    谁调用release呢?就是上面的EventLoopWorker的dispose方法:

        @Override
            public void dispose() {
                if (once.compareAndSet(false, true)) {
                    tasks.dispose();
    
                   // releasing the pool should be the last action
                    // should prevent pool reuse in case there is a blocking
                    // action not responding to cancellation
    //                threadWorker.scheduleDirect(() -> {
    //                    pool.release(threadWorker);
    //                }, 0, TimeUnit.MILLISECONDS);
    
                    pool.release(threadWorker);
                }
            }
    

    二、observeOn分析
    observeOn的流程和subscribeOn差不多,也是封装成了Runnable交给Scheduler,Scheduler再找对象去执行,例子里是Scheduler找Handler来实现的,下面来分析。来看AndroidSchedulers.mainThread(),MAIN_THREAD其实被封装过的DEFAULT这个Scheduler:

    public static Scheduler mainThread() {
         return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }
    static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    

    可以看到HandlerScheduler传入的参数是个MainLooper的Handler,这里就可以猜到RxJava2是用这个Handler将线程切换到主线程上。来看看HandlerScheduler源码:

    final class HandlerScheduler extends Scheduler {
       public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
            if (run == null) throw new NullPointerException("run == null");
            if (unit == null) throw new NullPointerException("unit == null");
    
            run = RxJavaPlugins.onSchedule(run);
            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
            handler.postDelayed(scheduled, Math.max(0L, unit.toMillis(delay)));
            return scheduled;
        }
    
        @Override
       public Worker createWorker() {
            return new HandlerWorker(handler);
       }
       private static final class HandlerWorker extends Worker {
           //删除一些判断代码,只看主干代码
           @Override
            public Disposable schedule(Runnable run, long delay, TimeUnit unit) 
             {
                ScheduledRunnable scheduled = new 
                  ScheduledRunnable(handler, run);
                Message message = Message.obtain(handler, scheduled);
                message.obj = this; // Used as token for batch disposal of this worker's runnables.
               handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
                return scheduled;
            }
       }
         ................
    }
    

    可以看到HandlerWorker结构非常类似在分析subscribeOn时最后碰到的NewThreadWorker,任务Runnable经过封装后最后都是交给这些Worker的schedule方法来执行。这里是通过Handler机制来运行Runnable的,上面代码很多,其实只用看这句:

    Message message = Message.obtain(handler, scheduled);//获得Message,scheduled作为这个Message的callback变量
    

    Message被handler发送出去,scheduled这个callback Runnable就在MainLooper所在线程里执行了。
    题外的分析-Handler机制
    下面简要分析message被handler发送后,scheduled这个callback Runnable怎么执行的。
    Handler里有个Looper对象,Looper里有个MessageQueue(消息队列),Looper里有个loop()方法不断从MessageQueue取Message,源码精简后的流程如下:

    public final class Looper {
          //通过ThreadLocal来存取Looper,每个线程只有一个Looper
          public static @Nullable Looper myLooper() {
                 return sThreadLocal.get();
           }
          public static void loop() {
                final Looper me = myLooper();//获得Looper
                final MessageQueue queue = me.mQueue;//获得Looper里的
                MessageQueue
                for (;;) {
                       Message msg = queue.next();//获取消息队列的每个消息
                       msg.target.dispatchMessage(msg); //重点来了,target就是handler
                }
           }
           ....................
    }
    public class Handler {
          public void dispatchMessage(Message msg) {
                   if (msg.callback != null) {//判断msg有callback先执行
                          handleCallback(msg);
                    } else {
                          if (mCallback != null) {
                                  if (mCallback.handleMessage(msg)) {
                                   return;
                           }
                }
                handleMessage(msg);
          }
          private static void handleCallback(Message message) {
                  message.callback.run();//Runnable执行了
           }
            ......................
        }
    }
    

    再次提醒,callback就是前面要执行的ScheduledRunnable,这样ScheduledRunnable就在Handler的Looper所在线程中执行了。

    相关文章

      网友评论

          本文标题:RxJava2 线程切换分析

          本文链接:https://www.haomeiwen.com/subject/izzrnxtx.html