Rxjava2源码浅析(三)

作者: Sp_WannaSing | 来源:发表于2017-03-23 19:32 被阅读0次

首先开始填坑,上篇文章最后的问题还没有解决,subscribeOn是如何切换线程的。

先回顾一下:
使用方法:

 observable.subscribeOn(Schedulers.newThread());

我们直接看它重写的abstract方法subscribeActual

@Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
            @Override
            public void run() {
                source.subscribe(parent);
            }
        }));
    }

看到了一个熟悉的Runnable是不是瞬间热泪盈眶?这里就看到了在一个Runnable中订阅了事件,由于是接口回调,所以observable中的事件是运行在这个线程的,而observer回调接口的时候就要看具体的observeOn是什么参数了。

跟踪这里的schedulerDirect方法。

 @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }
@NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        w.schedule(new Runnable() {
            @Override
            public void run() {
                try {
                    decoratedRun.run();
                } finally {
                    w.dispose();
                }
            }
        }, delay, unit);

        return w;
    }

而这个creatework()的具体实现类在HandlerScheduler中。

@Override
    public Worker createWorker() {
        return new HandlerWorker(handler);
    }

    private static final class HandlerWorker extends Worker {
        private final Handler handler;

        private volatile boolean disposed;

        HandlerWorker(Handler handler) {
            this.handler = handler;
        }

        @Override
        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            if (run == null) throw new NullPointerException("run == null");
            if (unit == null) throw new NullPointerException("unit == null");

            if (disposed) {
                return Disposables.disposed();
            }

            run = RxJavaPlugins.onSchedule(run);

            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.

            handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));

            // Re-check disposed state for removing in case we were racing a call to dispose().
            if (disposed) {
                handler.removeCallbacks(scheduled);
                return Disposables.disposed();
            }

            return scheduled;
        }

        @Override
        public void dispose() {
            disposed = true;
            handler.removeCallbacksAndMessages(this /* token */);
        }

        @Override
        public boolean isDisposed() {
            return disposed;
        }
    }

而这里的handler是ObserveOn的时候new Handler(Looper.getMainLooper())时候创建的,运行在主线程。
所以这里通过handler发送一个带有Runnable的消息,完成了new Thread和Main Thread的线程切换。

相关文章

  • RxJava2 源码浅析

    RxJava2 源码浅析 ReactiveX 历史:ReactiveX是Reactive Extensions的缩...

  • Rxjava2源码浅析(三)

    首先开始填坑,上篇文章最后的问题还没有解决,subscribeOn是如何切换线程的。 先回顾一下:使用方法: 我们...

  • RxJava2线程调度源码分析(二)

    在RxJava2源码浅析(一) 里我们分析RxJava2最简单的用法,实际上就是复杂一点的回调.今天一起来看看线程...

  • SRWebSocket源码浅析(上)

    SRWebSocket源码浅析(上) SRWebSocket源码浅析(上)

  • SRWebSocket源码浅析(下)

    SRWebSocket源码浅析(下) SRWebSocket源码浅析(下)

  • 源码浅析 RxSwift 5.0 - Subscription

    源码浅析 RxSwift 5.0 - Subscription源码浅析 RxSwift 5.0 - Subscri...

  • RxJava2源码浅析(一)

    前言 我们经常看RxJava的文章,很多都是API性的介绍.今天我们就用一段来理解它吧,了解它的内幕 本文编译需要...

  • Rxjava2源码浅析(一)

    面试的时候被问道各种框架的原理架构,也是很尴尬,自以为写的代码不少,用过的框架也不少,深入的去研究源码的还真是不多...

  • Rxjava2源码浅析(二)

    上一篇文章:Rxjava2源码浅析(一)分析了最基础的一套流程,今天呢就略加一些常用的操作吧。 使用范例上次我们在...

  • RxJava2 源码分析二

    文章目录 前言 RxJava2 线程调度 RxJava2 怎么进行线程调度 总结 前言 经过RxJava2源码分析...

网友评论

    本文标题:Rxjava2源码浅析(三)

    本文链接:https://www.haomeiwen.com/subject/qimtottx.html