Rxjava2源码浅析(三)

作者: Sp_WannaSing | 来源:发表于2017-03-23 19:32 被阅读0次

    首先开始填坑,上篇文章最后的问题还没有解决,subscribeOn是如何切换线程的。

    先回顾一下:
    使用方法:

     observable.subscribeOn(Schedulers.newThread());
    

    我们直接看它重写的abstract方法subscribeActual

    @Override
        public void subscribeActual(final Observer<? super T> s) {
            final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    
            s.onSubscribe(parent);
    
            parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
                @Override
                public void run() {
                    source.subscribe(parent);
                }
            }));
        }
    

    看到了一个熟悉的Runnable是不是瞬间热泪盈眶?这里就看到了在一个Runnable中订阅了事件,由于是接口回调,所以observable中的事件是运行在这个线程的,而observer回调接口的时候就要看具体的observeOn是什么参数了。

    跟踪这里的schedulerDirect方法。

     @NonNull
        public Disposable scheduleDirect(@NonNull Runnable run) {
            return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
        }
    
    @NonNull
        public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
            final Worker w = createWorker();
    
            final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
            w.schedule(new Runnable() {
                @Override
                public void run() {
                    try {
                        decoratedRun.run();
                    } finally {
                        w.dispose();
                    }
                }
            }, delay, unit);
    
            return w;
        }
    

    而这个creatework()的具体实现类在HandlerScheduler中。

    @Override
        public Worker createWorker() {
            return new HandlerWorker(handler);
        }
    
        private static final class HandlerWorker extends Worker {
            private final Handler handler;
    
            private volatile boolean disposed;
    
            HandlerWorker(Handler handler) {
                this.handler = handler;
            }
    
            @Override
            public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
                if (run == null) throw new NullPointerException("run == null");
                if (unit == null) throw new NullPointerException("unit == null");
    
                if (disposed) {
                    return Disposables.disposed();
                }
    
                run = RxJavaPlugins.onSchedule(run);
    
                ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
    
                Message message = Message.obtain(handler, scheduled);
                message.obj = this; // Used as token for batch disposal of this worker's runnables.
    
                handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
    
                // Re-check disposed state for removing in case we were racing a call to dispose().
                if (disposed) {
                    handler.removeCallbacks(scheduled);
                    return Disposables.disposed();
                }
    
                return scheduled;
            }
    
            @Override
            public void dispose() {
                disposed = true;
                handler.removeCallbacksAndMessages(this /* token */);
            }
    
            @Override
            public boolean isDisposed() {
                return disposed;
            }
        }
    

    而这里的handler是ObserveOn的时候new Handler(Looper.getMainLooper())时候创建的,运行在主线程。
    所以这里通过handler发送一个带有Runnable的消息,完成了new Thread和Main Thread的线程切换。

    相关文章

      网友评论

        本文标题:Rxjava2源码浅析(三)

        本文链接:https://www.haomeiwen.com/subject/qimtottx.html