美文网首页
RxJava 源码分析系列(五) -线程调度原理

RxJava 源码分析系列(五) -线程调度原理

作者: 琼珶和予 | 来源:发表于2018-09-09 23:45 被阅读0次

      对于RxJava来说,简洁的线程切换操作是它优秀的地方之一。所以了解它的线程调度原理是完全有必要,这个既能帮助我们理解其中的奥妙,同时如果自己在开发当中需要做类似的需求,可以作为一个参考。同时,RxJava的线程调度原理也是RxJava当中比较难以理解的一部分。

    1.概述

      说线程调度原理比较难以理解,其实我觉得就是涉及到的类比较多,然后他们之间的关系又难以梳理清楚,所以显得这部分的代码高深莫测。在这里,我先对线程调度涉及到的比较重要的类做一个小小的说明。

    类名 含义
    Schedulers 此类里面提供了我们需要使用的调度器对象
    Scheduler 线程调度器,Observer或者Observable通过此类进行线程调度,每一个线程对应一个调度器
    Worker 真正执行任务的类,每个调度器都需要获取一个Worker对象来执行任务
    Runnable 在整个调度的过程中,一个任务会包装成一个Runnable来执行,包括线程池和Handler

      整个线程调度的核心在SchedulerWorker类中。接下来,我们重点分析这两个类,看这两个类怎么担任线程调度的重任的。

    2. Scheduler的原理分析

      Scheduler是一个抽象类,我们来看看它的源码:

    public abstract class Scheduler {
        @NonNull
        public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
            final Worker w = createWorker();
    
            final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
            DisposeTask task = new DisposeTask(decoratedRun, w);
    
            w.schedule(task, delay, unit);
    
            return task;
        }
    
        public abstract Worker createWorker();
    }
    

      createWorker方法是一个非常重要的方法,后续的Scheduler都必须在这个方法里面返回一个Worker。所以每一个Scheduler都对应着一个Worker
      其次scheduleDirect方法也是非常重要,当一个观察者被subscribe时,会调用到此方法来。当然这时候可能还不清楚是怎么调用到这里来的,我们也不用太害怕,这里我们只需要知道会调用这个方法来的就行,后续我会详细的追踪源码来解释。
      如果此时的Scheduler是一个子线程的Scheduler,例如接下来要讲的NewThreadScheduler,那么此时提交到Worker去执行,肯定在NewThreadScheduler所在线程执行。同理,如果是Android主线程的Scheduler,肯定就在主线程里面执行了。
      我们发现在这个方法里面,先是调用了createWorker创建一个Worker对象,然后后续经过一系列的包装,将任务包装成为一个Runnable,然后调用Workerschedule方法来执行任务。
      接下来,我们看几个Scheduler的子类。

    (1).NewThreadScheduler

    public final class NewThreadScheduler extends Scheduler {
        final ThreadFactory threadFactory;
        public NewThreadScheduler() {
            this(THREAD_FACTORY);
        }
    
        public NewThreadScheduler(ThreadFactory threadFactory) {
            this.threadFactory = threadFactory;
        }
    
        @NonNull
        @Override
        public Worker createWorker() {
            return new NewThreadWorker(threadFactory);
        }
    }
    

      我简略了一下NewThreadScheduler的代码,让它看起来更加的清晰。整个过程,我们发现在createWorker方法里面返回了一个 NewThreadWorker对象,其他感觉也没什么东西,是不是感觉咱们分析错了。当然不是,核心在Worker里面,当然此时不是分析Worker的时机。我们只需要记住整个NewThreadScheduler工作流程。

    (2).HandlerScheduler

      我们再来看看HandlerScheduler的源码:

    final class HandlerScheduler extends Scheduler {
        private final Handler handler;
        private final boolean async;
    
        HandlerScheduler(Handler handler, boolean async) {
            this.handler = handler;
            this.async = async;
        }
    
        @Override
        public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
            if (run == null) throw new NullPointerException("run == null");
            if (unit == null) throw new NullPointerException("unit == null");
    
            run = RxJavaPlugins.onSchedule(run);
            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
            handler.postDelayed(scheduled, unit.toMillis(delay));
            return scheduled;
        }
        @Override
        public Worker createWorker() {
            return new HandlerWorker(handler, async);
        }
    }
    

      在Android当中,如果想要在主线程中工作,Handler是必不可少的。所以,我们在HandlerScheduler中看到Handler也是理所当然的。
      不过,这里我们发现,HandlerSchedulerscheduleDirect方法并没有去调用createWorker方法来获取一个Worker对象,而是直接将交给了handler来执行。这是为什么?
      在这里,我们得区分scheduleDirect方法的调用时机,scheduleDirect不是每个Scheduler里面都会被调用,而是这有上游的Scheduler才会被调用,也就是subscribeOn里面的Scheduler,在observeOn里面的Scheduler走的是另一个流程。
      所以,当HandlerSchedulerscheduleDirect方法被调用时,此时肯定是上游在工作,这样直接提交给Handler来执行了就是了,不必要去绕圈子。

    3. Worker的工作原理

      现在我们来看看Worker的相关源码:

        public abstract static class Worker implements Disposable {
            @NonNull
            public Disposable schedule(@NonNull Runnable run) {
                return schedule(run, 0L, TimeUnit.NANOSECONDS);
            }
            @NonNull
            public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);
        }
    

      我将很多繁琐的代码省去了,Worker的代码看上就是这样的,其中,我们发现就有一个 schedule方法。这个方法之前在SchedulerscheduleDirect方法里面我们看到过,在scheduleDirect方法里面传递一个Runnable来执行。
      我们来看一个Worker的子类,看看是怎么执行的。

    (1).NewThreadWorker

      先来看看NewThreadWorker的源码:

    public class NewThreadWorker extends Scheduler.Worker implements Disposable {
        private final ScheduledExecutorService executor;
    
        volatile boolean disposed;
    
        public NewThreadWorker(ThreadFactory threadFactory) {
            executor = SchedulerPoolFactory.create(threadFactory);
        }
    
        @NonNull
        @Override
        public Disposable schedule(@NonNull final Runnable run) {
            return schedule(run, 0, null);
        }
    
        @NonNull
        @Override
        public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (disposed) {
                return EmptyDisposable.INSTANCE;
            }
            return scheduleActual(action, delayTime, unit, null);
        }
        public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
            Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
            ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
    
            if (parent != null) {
                if (!parent.add(sr)) {
                    return sr;
                }
            }
    
            Future<?> f;
            try {
                if (delayTime <= 0) {
                    f = executor.submit((Callable<Object>)sr);
                } else {
                    f = executor.schedule((Callable<Object>)sr, delayTime, unit);
                }
                sr.setFuture(f);
            } catch (RejectedExecutionException ex) {
                if (parent != null) {
                    parent.remove(sr);
                }
                RxJavaPlugins.onError(ex);
            }
    
            return sr;
        }
    }
    

      好像代码有点长😅,但是不急,我们一一的来分析。
      首先在构造方法里面,创建了一个线程池的对象,关于线程池相关的知识,这个属于Java基础知识,这里就不进行讲解,我默认大家都懂哈😂。
     然后就是一系列的schedule调用,最终会调用到scheduleActual方法里面去。scheduleActual方法里面看是做了很多的操作,其实根本目的就是调用ScheduledExecutorServicesubmit方法提交一个任务而已。
      对的,NewThreadWorker就是这么简单,是不是之前将它想的太难了?哈哈哈😂
      整个SchedulerWorker的调用差不多就是这样的,但是是不是觉得,现在还是一脸懵逼,看到这里,我们还是不知道RxJava究竟是怎么进行线程调度的。
      不急,接下来,我们将一个例子入手,由浅入深的带领大家熟悉线程调度的奥妙。我们熟悉了SchedulerWorker,对其他的才会很快了解。

    4. subscribeOn的工作流程

      先来举一个栗子:

        Observable.create(new ObservableOnSubscribe<Integer>() {
          @Override
          public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
          }
        }).subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Integer>() {
          @Override
          public void onSubscribe(Disposable d) {
    
          }
    
          @Override
          public void onNext(Integer integer) {
          }
    
          @Override
          public void onError(Throwable e) {
    
          }
    
          @Override
          public void onComplete() {
    
          }
        });
    

      是不是觉得这个栗子非常的熟悉,像极了前世的恋人🤓。熟悉就好,就怕大家淡忘了自己心中那个人。
      这个使用方法,其他的大佬不知道讲解了多少,在这里我就不重复了,直接开门见山的分析subscribeOn方法。
      subscribeOn方法主要是跟ObservableSubscribeOn类有关。我们来看看ObservableSubscribeOn类:

    public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
        final Scheduler scheduler;
    
        public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
            super(source);
            this.scheduler = scheduler;
        }
    
        @Override
        public void subscribeActual(final Observer<? super T> s) {
            final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    
            s.onSubscribe(parent);
    
            parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
        }
    }
    

      首先,ObservableSubscribeOn的构造方法里面多了一个Scheduler参数,这个参数到底是谁的对象得取决于在subscribeOn时。上面的栗子,我们传递的是schedulers.newThread(),所以这里的scheduler就是NewThreadScheduler的对象。
      在subscribeActual方法里面,我们发现:

            parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    

      先来看看SubscribeTask是什么东西:

        final class SubscribeTask implements Runnable {
            private final SubscribeOnObserver<T> parent;
    
            SubscribeTask(SubscribeOnObserver<T> parent) {
                this.parent = parent;
            }
    
            @Override
            public void run() {
                source.subscribe(parent);
            }
        }
    

      SubscribeTask从根本来说就是一个Runnable,当这个Runnable被执行的时候,也就是run方法被调用时,我们发现此时调用了subscribe方法,此时任务就正式启动。假如,我们在向后台获取数据,此时就已经开始获取了。
      然后,我们发现调用了SchedulerscheduleDirect,还记得这个方法在做什么吗?这个方法我们在前面讲解Scheduler时已经讲解,在这个方法里面显示调用onCreateWorker方法来获取一个Worker对象,然后将我们的任务提交给Worker来执行。
      这就是整个subscribeOn的流程,是不是很简单?

    5. observeOn的工作流程

      observeOn方法主要是表示下游的方法执行所在的线程,跟subscribeOn方法差不多,observeOn主要跟ObservableObserveOn,我们来看看ObservableObserveOn的相关代码:

    public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
        final Scheduler scheduler;
        final boolean delayError;
        final int bufferSize;
        public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
            super(source);
            this.scheduler = scheduler;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }
    
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            if (scheduler instanceof TrampolineScheduler) {
                source.subscribe(observer);
            } else {
                Scheduler.Worker w = scheduler.createWorker();
    
                source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
            }
        }
    }
    

      还是先来看看构造方法,同样的,scheduler也是一个调度器,这里由于Android的主线程,所以是HandlerSchedulerdelayError表示是否延迟抛出异常;bufferSize表示缓冲区的大小,如果上游的发射速度比较快的话,这里有可能出现背压问题。
      然后看看subscribeActual方法里面,我们发现,在这里,调用了onCreateWorker方法来获取一个Worker对象,然后subscribe了一个ObserveOnObserver对象,传进去了一些相关参数。看来真正的核心在ObserveOnObserver这里面。
      不过在这里,我们发现ObservableObserveOnObservableSubscribeOnsubscribeActual方法还是有很大的区别,在ObservableObserveOnsubscribeActual方法没有调用Scheduler的相关方法。之前,我们说了,scheduleDirect方法只在上游执行,从这里就可以看出来。
      我们来看看ObserveOnObserver相关代码,ObserveOnObserver比较重要的是onNext方法,看看在方法里面是怎么将数据分发到主线程去执行的。

            @Override
            public void onNext(T t) {
                if (done) {
                    return;
                }
    
                if (sourceMode != QueueDisposable.ASYNC) {
                    queue.offer(t);
                }
                schedule();
            }
    

      onNext方法还是比较简单,先将数据放在队列里面去,然后在调用schedule方法。我们再来看看schedule方法:

            void schedule() {
                if (getAndIncrement() == 0) {
                    worker.schedule(this);
                }
            }
    

      这里getAndIncrement() == 0的意思比较好理解,表示当前队列中只有一个数据,就调用Workerschedule方法。到这里,我们根本没有看到调用Observer的onNext方法,是不是分析错了?肯定不是的,我们来看看Workerschedule方法。这里的WorkerHandlerWorker,所以我们看一下HandlerWorkerschedule方法:

            public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
                if (run == null) throw new NullPointerException("run == null");
                if (unit == null) throw new NullPointerException("unit == null");
    
                if (disposed) {
                    return Disposables.disposed();
                }
    
                run = RxJavaPlugins.onSchedule(run);
    
                ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
    
                Message message = Message.obtain(handler, scheduled);
                message.obj = this; // Used as token for batch disposal of this worker's runnables.
    
                if (async) {
                    message.setAsynchronous(true);
                }
    
                handler.sendMessageDelayed(message, unit.toMillis(delay));
    
                // Re-check disposed state for removing in case we were racing a call to dispose().
                if (disposed) {
                    handler.removeCallbacks(scheduled);
                    return Disposables.disposed();
                }
    
                return scheduled;
            }
    

      看到写了这么多,其实就一句话,将传递进来的Runnable交给Handler来执行。那个这个Runnable是谁?就是ObservableObserveOn自己,Handler会执行Runnablerun方法。
      哈哈,现在得回来ObservableObserveOnrun方法:

            @Override
            public void run() {
                if (outputFused) {
                    drainFused();
                } else {
                    drainNormal();
                }
            }
    

      这里的两个drain方法着实将我吓着了。其实,认真想一想,跟BufferAsyncEmitter里面的drain方法差不多,这里就不深入的讲解,反正这里就是将数据发送到下游的真正操作,而且是在observeOn方法里面传递的那个线程执行。

    6.总结

      RxJava的线程调度这部分的知识着实是不好理解,所以尽管我在这里说的天花乱坠,还是得需要我们亲自来看看这部分的代码。在这里,我对这部分的知识做一个简单的总结。
      1.每种类型线程对一个Scheduler,而每个Scheduler对应一个Worker,真正操作都是Worker来完成的。
      2.子线程执行的根本就是将我们的subscribe方法调用放在一个Runnable里面去执行,类似于代理机制,又像装饰者模式。

    相关文章

      网友评论

          本文标题:RxJava 源码分析系列(五) -线程调度原理

          本文链接:https://www.haomeiwen.com/subject/irjdgftx.html