美文网首页Android开发Android技术知识Android开发经验谈
RxJava 2.x 源码分析(三)之一步一步了解切换线程原理

RxJava 2.x 源码分析(三)之一步一步了解切换线程原理

作者: zYoung_Tang | 来源:发表于2018-06-05 11:25 被阅读26次

    前文

    Android 开发离不开异步操作,比如网络请求必须在子线程中进行,获取数据后可能需要更改UI,但是UI只能在主线程中被修改,所以我们经常在子线程和主线程中切换,代码就会比较繁琐不方便阅读。RxJava 的诞生是为了更好的处理异步操作,前面两篇文章我们了解到 RxJava 使用了观察者模式,现在继续了解 RxJava 利用观察者模式同时实现了线程切换。

    线程调度

    RxJava 中线程调度的核心方法是subscribeOn()observeOn()

    • subscribeOn 控制被观察者发送事件所在的线程。
    • observeOn 观察者接收事件时的线程。

    在查看源码之前可以先从下面几个例子中看效果

    下面的例子都使用的是相同的被观察者观察者:

    • 不切线程
    • 切换线程,被观察者发射事件操作切换到IO线程中执行,观察者接收事件切换到应用主线程中执行
    • 切换线程,与例2相同,增加调用 FlatMap 操作符

    创建被观察者和观察者

    // 创建被观察者
    Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            Log.d("RXJAVA_LOG", "发射事件1 -- " + Thread.currentThread().getName());
            e.onNext("item 1");
            Log.d("RXJAVA_LOG", "发射事件2 -- " + Thread.currentThread().getName());
            e.onNext("item 2");
            Log.d("RXJAVA_LOG", "发射事件3 -- " + Thread.currentThread().getName());
            e.onNext("item 3");
            Log.d("RXJAVA_LOG", "发射事件4 -- " + Thread.currentThread().getName());
            e.onNext("item 4");
        }
    });
    
    // 创建观察者
    Observer<String> observer = new Observer<String>() {
    
        @Override
        public void onSubscribe(Disposable d) {
        }
    
        @Override
        public void onNext(String s) {
            Log.d("RXJAVA_LOG", "接受事件 : " + s + " -- " + Thread.currentThread().getName());
        }
    
        @Override
        public void onError(Throwable e) {
        }
    
        @Override
        public void onComplete() {
        }
    };
    

    例子1:不切线程

    observable.subscribe(observer);
    

    输出:

    发射 item1     -- main
    接受 item1 -- main
    发射 item2     -- main
    接受 item2 -- main
    发射 item3     -- main
    接受 item3 -- main
    发射 item4     -- main
    接受 item4 -- main
    

    例子2:切换线程

     observable
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(observer);
    

    输出:

    发射 item1     -- RxCachedThreadScheduler-1
    发射 item2     -- RxCachedThreadScheduler-1
    发射 item3     -- RxCachedThreadScheduler-1
    发射 item4     -- RxCachedThreadScheduler-1
    接受 item1 -- main
    接受 item2 -- main
    接受 item3 -- main
    接受 item4 -- main
    

    例子3:使用FlatMap并切换线程

    observable
        .flatMap(new Function<String, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(String s) throws Exception {
                final String str = s;
                return Observable.create(new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(ObservableEmitter<String> e) throws Exception {
                        Log.d("RXJAVA_LOG", "发射 " + str + "-sub -- " + Thread.currentThread().getName());
                        e.onNext(str + "-sub");
                    }
                });
            }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(observer);
    

    输出:

    发射 item1     -- RxCachedThreadScheduler-1
    发射 item1-sub -- RxCachedThreadScheduler-1
    发射 item2     -- RxCachedThreadScheduler-1
    发射 item2-sub -- RxCachedThreadScheduler-1
    发射 item3     -- RxCachedThreadScheduler-1
    发射 item3-sub -- RxCachedThreadScheduler-1
    接受 item1-sub -- main
    发射 item4     -- RxCachedThreadScheduler-1
    发射 item4-sub -- RxCachedThreadScheduler-1
    接受 item2-sub -- main
    接受 item3-sub -- main
    接受 item4-sub -- main
    

    从上面的例子可以看出:

    • subscribeOn()可以切换被观察者和使用FlatMap或者其他操作符的子被观察者发射的事件的线程。
    • observeOn()可以控制观察者接收事件所在的线程。

    那么我们就可以把网络请求操作作为被观察者的事件,并且把发送事件线程切换到IO线程,然后把更新UI操作放到观察者接收事件的回调中,并把接收事件所在线程切换回主线程,这样就可以利用 RxJava 完成我们网络请求的异步操作。


    以例子2为阅读源码的例子,根据subscribeOn()、observeOn()、subscribe()的调用顺序分为三步查看

    1. subscribeOn()

    下面我们开始阅读源码,先从subscribeOn(Schedulers.io())开始看,首先看看io()方法

    // Schedulers.java
    /**
     * Returns a default, shared {@link Scheduler} instance intended for IO-bound work.
        ...
     */
    @NonNull
    public static Scheduler io() {
        return RxJavaPlugins.onIoScheduler(IO);
    }
    

    这里出现了一个变量IO,看看是什么东西

    // Schedulers.java
    public final class Schedulers {
        @NonNull
        static final Scheduler IO;
        
        static {    
            IO = RxJavaPlugins.initIoScheduler(new IOTask());
            ...
        }
        ...
    }
    
    static final class IOTask implements Callable<Scheduler> {
        @Override
        public Scheduler call() throws Exception {
            return IoHolder.DEFAULT;
        }
    }
    

    原来变量IO是一个Scheduler对象且是Schedulers的静态成员变量且在类加载的时候已经初始化了,PS:Schedulers可以看成是一个用来创建Scheduler的工厂,提供多个静态方法如io()newThread()computation等来获取适用于不同场景的Scheduler

    接着看初始化过程,首先创建了一个IOTask实例,然后传给RxJavaPlugins的初始化方法initIoScheduler()

    // RxJavaPlugins.java
    @NonNull
    public static Scheduler initIoScheduler(@NonNull Callable<Scheduler> defaultScheduler) {
        // 检查 IOTask 是否为空
        ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null");
        Function<? super Callable<Scheduler>, ? extends Scheduler> f = onInitIoHandler;
        if (f == null) { // onInitIoHandler 没有被设置,所以为 null
            return callRequireNonNull(defaultScheduler);
        }
        return applyRequireNonNull(f, defaultScheduler);
    }
    
    @NonNull
    static Scheduler callRequireNonNull(@NonNull Callable<Scheduler> s) {
        try {
            // s : IOTask
            return ObjectHelper.requireNonNull(s.call(), "Scheduler Callable result can't be null");
        } catch (Throwable ex) {
            throw ExceptionHelper.wrapOrThrow(ex);
        }
    }
    

    首先检查IOTask对象是否为空,然后调用callRequireNonNull方法调用IOTask.call(),并返回call()的返回值,即返回IoHolder.DEFAULT,IoHolder又是什么呢,原来是Schedulers的内部类,变量IO的本质原来是IoScheduler对象

    // Schedulers.java
    static final class IoHolder {
        static final Scheduler DEFAULT = new IoScheduler();
    }
    

    变量IO的身份我们已经知道了,回到一开始的io()看看RxJavaPlugins.onIoScheduler(IO)做了什么

    // RxJavaPlugins.java
    @NonNull
    public static Scheduler onIoScheduler(@NonNull Scheduler defaultScheduler) {
        Function<? super Scheduler, ? extends Scheduler> f = onIoHandler;
        if (f == null) { // f = null
            return defaultScheduler;
        }
        return apply(f, defaultScheduler);
    }
    

    由于onIoHandler是静态变量且没有初始化,所以为null,直接返回我们的变量IO

    所以io()方法最后返回的是IoScheduler对象。

    接着看subscribeOn()

    // Observable.java
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        // 检查是否为 null
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
    

    返回的是ObservableSubscribeOn对象,且保存了上游的被观察者Scheduler对象

    // ObservableSubscribeOn.java
    public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
        final Scheduler scheduler;
    
        public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
            super(source); // source: 上游的被观察者
            this.scheduler = scheduler; // 本例子是 IoScheduler
        }
        ...
    }
    

    到此,subscribeOn()的方法已经看完了,最终返回一个ObservableSubscribeOn对象。


    2. observeOn()

    首先看看AndroidSchedulers.mainThread()怎么获取主线程操作的 scheduler

    // AndroidSchedulers.java
    public final class AndroidSchedulers {
        /** A {@link Scheduler} which executes actions on the Android main thread. */
        public static Scheduler mainThread() {
            return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
        }
            
        private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override public Scheduler call() throws Exception {
                    return MainHolder.DEFAULT;
                }
        });
        ...
    }
    

    这里出现了一个静态变量MAIN_THREAD,且由RxAndroidPlugins初始化,代码如下

    // RxAndroidPlugins.java
    public static Scheduler initMainThreadScheduler(Callable<Scheduler> scheduler) {
        if (scheduler == null) { // false
            throw new NullPointerException("scheduler == null");
        }
        Function<Callable<Scheduler>, Scheduler> f = onInitMainThreadHandler;
        if (f == null) { // onInitMainThreadHandler 为 null
            return callRequireNonNull(scheduler);
        }
        return applyRequireNonNull(f, scheduler);
    }
    
    static Scheduler callRequireNonNull(Callable<Scheduler> s) {
        try {
            // s : 传入 initMainThreadScheduler 的匿名对象
            Scheduler scheduler = s.call();
            if (scheduler == null) {
                throw new NullPointerException("Scheduler Callable returned null");
            }
            return scheduler;
        } catch (Throwable ex) {
            throw Exceptions.propagate(ex);
        }
    }
    
    • initMainThreadScheduler()中最终执行callRequireNonNull(scheduler)
    • 把之前传入initMainThreadScheduler()的匿名对象作为 scheduler 传进callRequireNonNull
    • 然后返回scheduler.call()的返回值
    • 根据上面代码返回的是MainHolder.DEFAULTMainHolderAndroidSchedulers的静态类,DEFAULT创建代码如下
    // AndroidSchedulers.java
    private static final class MainHolder {
        // 通过 Looper.getMainLooper() 获取主线程的 Looper
        // 创建在主线程 Looper 上的 Handler
        // 用这个 Handler 创建 HandlerScheduler 对象
        static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    }
    

    DEFAULT其实就是一个拥有主线程Handler的HandlerScheduler对象,所以AndroidSchedulers.mainThread()最终返回的是HandlerScheduler对象。

    下面回归到observeOn()

    // Observable.java
    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }
    
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        // this : ObservableSubscribeOn
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }
    

    从上面代码可以看见observeOn最终返回的是ObservableObserveOn对象,且在我们的例子中这个对象拥有的 Scheduler 是HandlerScheduler


    3. subscribe()

    到这里我们知道本例子中被观察者ObservableObserveOn

    这个方法从之前两篇文章中知道它是一个启动方法,一旦调用,被观察者就开始发送事件给观察者,而且实际执行的方法是被观察者subscribeActual(),还记得在我们的例子中最外层的被观察者ObservableObserveOn所以我们从它的subscribeActual()中开始看起

    // ObservableObserveOn.java
    @Override
        protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) { // false
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();
            // source : ObservableSubscribeOn
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
    

    在第二步我们知道ObservableObserveOn中的 scheduler 是HandlerScheduler,所以走 else ,首先执行了Scheduler.Worker w = scheduler.createWorker();,创建了HandlerScheduler.HandlerWorker对象

    // HandlerScheduler.java
    @Override
    public Worker createWorker() {
        return new HandlerWorker(handler);
    }
    
    private static final class HandlerWorker extends Worker {
        private final Handler handler;
        
        private volatile boolean disposed;
        
        HandlerWorker(Handler handler) {
            this.handler = handler;
        }
        ...
    }
    

    在这里先不深入了解HandlerWorker,但是我们可以猜到worker是切换线程的关键。
    接着看

    source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    

    这里的source是什么?还记得第一步调用subscribeOn()返回的ObservableSubscribeOn对象吗,第二步相当于调用了ObservableSubscribeOn.observeOn()到最后创建ObservableObserveOn对象的时候ObservableSubscribeOn赋值给了source

    由于ObservableSubscribeOn没有覆写subscribe()方法,所以source.subscribe()调用的是Observable.subscribe()

    // Observable.java
    @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            // observer : ObserveOnObserver
            observer = RxJavaPlugins.onSubscribe(this, observer);
    
            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
    
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            ...
        }
    }
    

    最终会调用ObservableSubscribeOnsubscribeActual()

    // ObservableSubscribeOn.java
    @Override
    public void subscribeActual(final Observer<? super T> s) {
        // s : ObserveOnObserver
        // 创建 SubscribeOnObserver 包装了 s
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        // 调用我们的回调方法 onSubscribe
        s.onSubscribe(parent);
        // scheduler : IoScheduler
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    
    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
    
        private static final long serialVersionUID = 8094547886072529208L;
        final Observer<? super T> actual;
    
        final AtomicReference<Disposable> s;
    
        SubscribeOnObserver(Observer<? super T> actual) {
            this.actual = actual;
            this.s = new AtomicReference<Disposable>();
        }
    
        @Override
        public void onSubscribe(Disposable s) {
            DisposableHelper.setOnce(this.s, s);
        }
        
        void setDisposable(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }
            
        // 省略以下方法
        // onNext onError onComplete dispose isDisposed
        ...
    }
    
    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;
    
        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }
    
        @Override
        public void run() {
            // source : 我们创建的被观察者
            // parent : SubscribeOnObserver
            source.subscribe(parent);
        }
    }
    

    subscribeActual()只有三行代码,首先创建SubscribeOnObserver包装我们创建的观察者,然后调用我们的观察者回调方法onSubscribe(),而最后一行代码

    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    

    是最重要的,这行代码的执行引起了一连串方法调用,激活了被观察者开始发射事件以及后面的一连串的操作,之后我们会在多个类里不停的跳转,见证着一系列动作。

    这行代码分三步,但只需要关心前两步,第三步parent.setDisposable是设置Disposable,提供可断开被观察者观察者连接的操作:

    1. new SubscribeTask()

    创建SubscribeTask,SubscribeTask 继承了 Runnable ,重点看下 run 方法里面是

    source.subscribe(parent);
    

    根据前面两篇文章我们知道这行代码会调用Observable.subscribeActual(observer),然后调用ObservableCreate. subscribeActual()

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);
    
        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    

    最后调用我们创建被观察者时覆写的 subscribe 方法,从而激活了被观察者发射事件

    Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            ...
    });
    

    所以我们可以预知随着代码的深入,SubscribeTask.run()将会在子线程中被调用,从而实现了被观察者在子线程中发射事件。

    2. IoScheduler.scheduleDirect()

    实际上调用的是IoScheduler父类SchedulerscheduleDirect()

    // Scheduler.java
    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }
    
    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        // w : EventLoopWorker
        final Worker w = createWorker();
        // run : SubscribeTask
        // decoratedRun : SubscribeTask
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
            
        DisposeTask task = new DisposeTask(decoratedRun, w);
            
        w.schedule(task, delay, unit);
            
        return task;
    }
    
    // IoScheduler.java
    @NonNull
    @Override
    public Worker createWorker() {
        return new EventLoopWorker(pool.get());
    }
    
    • 先创建 worker
    • 把 SubscribeTask 包装后与 worker 组成 DisposeTask
    • 然后执行
    w.schedule(task, delay, unit);
    

    时执行的是 EventLoopWorker.schedule(),看看EventLoopWorker

    // IoScheduler.java
    static final class EventLoopWorker extends Scheduler.Worker {
        private final CompositeDisposable tasks;
        private final CachedWorkerPool pool;
        private final ThreadWorker threadWorker;
    
        final AtomicBoolean once = new AtomicBoolean();
    
        EventLoopWorker(CachedWorkerPool pool) {
            this.pool = pool;
            this.tasks = new CompositeDisposable();
            this.threadWorker = pool.get();
        }
    
        @Override
        public void dispose() {
            if (once.compareAndSet(false, true)) {
                tasks.dispose();
    
                // releasing the pool should be the last action
                pool.release(threadWorker);
            }
        }
    
        @Override
        public boolean isDisposed() {
            return once.get();
        }
    
        @NonNull
        @Override
        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (tasks.isDisposed()) { // false
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }
    
            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }
    }
    
    static final class ThreadWorker extends NewThreadWorker {
        private long expirationTime;
    
        ThreadWorker(ThreadFactory threadFactory) {
            super(threadFactory);
            this.expirationTime = 0L;
        }
    
        public long getExpirationTime() {
            return expirationTime;
        }
    
        public void setExpirationTime(long expirationTime) {
            this.expirationTime = expirationTime;
        }
    }
    

    注意到构造方法里成员threadWorker是从CachedWorkerPool缓存池中获取的,EventLoopWorker.schedule()最终会调用NewThreadWorker.scheduleActual

    // NewThreadWorker.java
    @NonNull
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
    
        if (parent != null) {
            if (!parent.add(sr)) {  
                return sr;
            }
        }
    
        Future<?> f;
        try {
            if (delayTime <= 0) {  // delayTime = 0
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }
    
        return sr;
    }
    

    scheduleActual()内不做深入了解,只需要知道会执行下面这行代码

    f = executor.submit((Callable<Object>)sr);
    

    executor.submit()内部也不做深入介绍,只需要知道最终会在子线程调用SubscribeTask.run()证实了上文的猜测即可。

    到了这里已经到达了我们的被观察者中发射事件的操作了,假设从网络请求中返回了数据可以通过onNext方法发射给观察者,那么接下来就看看在子线程中发射事件之后观察者是如何在主线程中接收事件并作出处理操作的。

    // ObservableCreate.java
    @Override
    public void onNext(T t) {
        if (t == null) {
            onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
            return;
        }
        if (!isDisposed()) {
            observer.onNext(t);
        }
    }
    

    到这里我们会有疑问这个观察者是哪个?刚刚证实了被观察者发射事件的过程是放在SubscribeTask,现在回顾一下创建SubscribeTask时的代码

    // ObservableSubscribeOn.java
    @Override
    public void subscribeActual(final Observer<? super T> s) {
        // s : ObserveOnObserver
        // 创建 SubscribeOnObserver 包装了s
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        // 调用我们的回调方法 onSubscribe
        s.onSubscribe(parent);
        // scheduler : IoScheduler
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    

    看完之后我们明白了原来顶层的观察者SubscribeOnObserver,先我们看看它的构造方法,用变量actual保存了ObserveOnObserver对象

    // ObservableSubscribeOn.SubscribeOnObserver
    SubscribeOnObserver(Observer<? super T> actual) {
        this.actual = actual;
        this.s = new AtomicReference<Disposable>();
    }
    

    看看SubscribeOnObserveronNext()

    @Override
    public void onNext(T t) {
        actual.onNext(t);
    }
    

    里面调用了ObserveOnObserver.onNext()

    // ObservableObserveOn
    @Override
    public void onNext(T t) {
        if (done) {
            return;
        }
    
        if (sourceMode != QueueDisposable.ASYNC) {
            queue.offer(t);
        }
        schedule();
    }
    
    void schedule() {
        if (getAndIncrement() == 0) {
            // worker : HandlerWorker
            // this : ObserveOnObserver
            worker.schedule(this);
        }
    }
    

    ObserveOnObserver内有一个队列 queue 用来保存收到的数据,然后调用schedule(),根据之前所说的观察者的worker是HandlerWorker,前面贴的代码并没有放schedule()这个方法,下面重新贴出来:

    先调用父类Scheduler.workerschedule()

    // Scheduler.worker
    @NonNull
    public Disposable schedule(@NonNull Runnable run) {
        return schedule(run, 0L, TimeUnit.NANOSECONDS);
    }
    

    再调用HandlerWorkerschedule()

    // HandlerScheduler.HandlerWorker
    @Override
    public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
        if (run == null) throw new NullPointerException("run == null");
        if (unit == null) throw new NullPointerException("unit == null");
        
        if (disposed) {
            return Disposables.disposed();
        }
        
        run = RxJavaPlugins.onSchedule(run);
        
        ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
        
        Message message = Message.obtain(handler, scheduled);
        message.obj = this; // Used as token for batch disposal of this worker's runnables.
        
        handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
        
        // Re-check disposed state for removing in case we were racing a call to dispose().
        if (disposed) {
            handler.removeCallbacks(scheduled);
            return Disposables.disposed();
        }
        
        return scheduled;
    }
    

    其主要操作就是把ObserveOnObserver包装成ScheduledRunnable放进Message,然后通过HandlerWorker绑定的Handler(即主线程Handler)发送到消息队列等待处理。

    这里有个疑问就是schedule()接收的是 Runnable 类型的参数,为什么我们可以把ObserveOnObserver传进去呢?之前没有贴完整的ObserveOnObserver代码,现在看看到底是什么东西

    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable {
        ...
        @Override
        public void run() {
            if (outputFused) { // false
                drainFused();
            } else {
                drainNormal();
            }
        }
            
        void drainNormal() {
            int missed = 1;
        
            final SimpleQueue<T> q = queue;
            final Observer<? super T> a = actual;
        
            for (;;) {
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }
        
                for (;;) {
                    boolean d = done;
                    T v;
        
                    try {
                        v = q.poll();
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        s.dispose();
                        q.clear();
                        a.onError(ex);
                        worker.dispose();
                        return;
                    }
                    boolean empty = v == null;
        
                    if (checkTerminated(d, empty, a)) {
                        return;
                    }
        
                    if (empty) {
                        break;
                    }
        
                    a.onNext(v);
                }
        
                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }
        ...
    }
    

    原来ObserveOnObserver也实现了Runnable,且run()里会调用drainNormal,就是把上面用来保存事件的队列 queue 都发送到观察者


    到这里我们已经把例子2切换线程的过程看了一遍,之前只是知道简单的调用两个方法就能随意切换线程,现在终于通过例子了解到里面的原理了。

    相关文章

      网友评论

        本文标题:RxJava 2.x 源码分析(三)之一步一步了解切换线程原理

        本文链接:https://www.haomeiwen.com/subject/vnnysftx.html