RxJava2 源码解析——线程调度 Scheduler

作者: Robin_Lrange | 来源:发表于2017-04-24 22:21 被阅读1054次

    RxJava源码解析第二篇。
    我们知道,在使用RxJava的时候,线程的调度是其内部帮我们实现的,这让我们可以便捷的实现函数式编程。
    本文主要从源码的角度来分析RxJava的线程调度机制
    = =最近被项目搞疯都没什么时间写笔记了。


    引入

    我们知道,线程调度主要通过observeOnsubscribeOn这两个方法,以及Schedular来指定使用的线程。
    还是以上一次的代码为例:

    Observable.create(new ObservableOnSubscribe<LoginApiResult>() {
            @Override
            public void subscribe(ObservableEmitter<LoginApiResult> e) throws Exception {
                e.onNext(login());
            }
        }) //调用登录接口
        .map(new Function<LoginApiBean, UserInfoBean>() {
            @Override
            protected UserInfoBean decode(LoginApiBean loginApiBean) {
                //处理登录结果,返回UserInfo
                if (loginApiBean.isSuccess()) {
                    return loginApiBean.getUserInfoBean();
                } else {
                    throw new RequestFailException("获取网络请求失败");
                }
            }
        })
        .doOnNext(new Consumer<UserInfoBean>() {    //保存登录结果UserInfo
            @Override
            public void accept(@NonNull UserInfoBean bean) throws Exception {
                saveUserInfo(bean);
            }
        })
        .subscribeOn(Schedulers.io())   //调度线程
        .observeOn(AndroidSchedulers.mainThread())  //调度线程
        .subscribe(new Consumer<UserInfoBean>() {
            @Override
            public void accept(@NonNull UserInfoBean bean) throws Exception {LoginApiBean
                //整个请求成功,根据获取的UserInfo更新对应的View
                showSuccessView(bean);
            }
        }, new Consumer<Throwable>() {
            @Override
            public void accept(@NonNull Throwable throwable) throws Exception {
                //请求失败,显示对应的View
                showFailView();
            }
        });
    

    我们知道,通过:

    .subscribeOn(Schedulers.io())   //调度线程
    .observeOn(AndroidSchedulers.mainThread())  //调度线程
    

    这两句代码,就使我们上半部分的请求和保存数据都执行在io线程中,而下半部的ui更新则执行在主线程。

    通过这段代码,我们引入几个问题:

    1. observeOn和subscribeOn是如何实现线程调度的?
    2. observeOn和subscribeOn之间是否存在冲突?

    observeOn源码

    首先解决第一个问题,我们先了解一下ObserveOn的实现原理:

    首先看一下调用:

    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }
    
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        return new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize);
    }
    

    我们可以看到,ObserveOn最终是返回了一个ObservableObserveOn对象,并将scheduler传入。

    根据上一篇文的思路:


    任务链.png

    ObservableObserveOn会被我们最后subscribe的时候传入的Observer订阅。

    让我们跟进看一下ObservableObserveOn被订阅时会执行什么逻辑:

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        //TrampolineScheduler 表示当前线程
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            //根据scheduler创建worker
            Scheduler.Worker w = scheduler.createWorker();
            //通过ObservableObserveOnObserver代理
            source.subscribe(new ObservableObserveOn.ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
    

    这里的逻辑并不难理解,(如果看了上一篇文章),

    首先是判断了scheduler是不是表示当前线程的TrampolineScheduler,如果是就直接让observer订阅上一级的Observable,也就是跳过当前这一层,即图中的Observer直接订阅ObservableSubscribeOn

    然后根据schedular生成对应的worker,交由ObservableObserveOnObserver代理,订阅上一级的Observable

    根据我们引入的案例,我们以observeOn(AndroidSchedulers.mainThread()) 为例,当完成逆向订阅,执行任务链到ObservableObserveOnObserver时:

    @Override
    public void onNext(T t) {
        // 上一级的模式如果不是异步的,加入队列
        if (sourceMode != QueueDisposable.ASYNC) {
            queue.offer(t);
        }
        //进行线程调度
        schedule();
    }
    
    void schedule() {
        // 判断当前正在执行的任务数目
        if (getAndIncrement() == 0) {
            worker.schedule(this);
        }
    }
    

    这里首先是判断了sourceMode,这里先不跟踪这个变量,只需要知道大多数情况下,这个判断是成立,所以会把数据加入队列。

    然后转而让worker执行接下去的步骤。

    我们跟踪看看,可以发现,这是个抽象方法,可以找到他在不同类中有不同实现,分别对应了几种不同的线程调度机制,我们挑选案例中的AndroidSchedulers.mainThread()来跟踪。

    首先我们跟踪mainThread方法,可以发现内部转到了这里:

    static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    

    我们再跟进HandlerScheduler,我们知道worker是通过createWorker方法产生的:

    public Worker createWorker() {
       return new HandlerWorker(handler);
    }
    

    可以看到直接生成了HandlerWorker,并传入了一开始创建的绑定了MainLooperHandler。看到这里也能大致猜出,后续会把任务传给这个handler执行:

    @Override
    public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
        //省略部分代码
        ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
    
        Message message = Message.obtain(handler, scheduled);
        message.obj = this; // Used as token for batch disposal of this worker's runnables.
    
        handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
    
        return scheduled;
    }
    

    可以看到,这里将传进来的runnable包装成ScheduledRunnable,然后提交给绑定的handler

    我们知道,后续Handler会调用ScheduledRunnable的run方法:

    ScheduledRunnable(Handler handler, Runnable delegate) {
        this.handler = handler;
        this.delegate = delegate;
    }
    
    @Override
    public void run() {
        try {
            delegate.run();
        } catch (Throwable t) {
            //……
        }
    }
    

    可以看到,只是简单的调用了我们传入的runnablerun方法,也就是刚才我们在ObservableObserveOnObserver中通过schedule方法传入的runnable,我们回去看看:

    void schedule() {
        // 判断当前正在执行的任务数目
        if (getAndIncrement() == 0) {
            worker.schedule(this);
        }
    }
    

    可以看到其实本身就是个runnable

    @Override
    public void run() {
        //输出结果是否融合
        if (outputFused) {
            drainFused();
        } else {
            drainNormal();
        }
    }
    

    可以看到,根据outputFused来跳转方法,这里先不跟踪这个变量,后面会再提到。
    现在只需要知道当连续两个observable都需要线程调度时(比如从observeOnobserveOn),这个outputFused才会发生变化,默认为false。

    那么这里,我们先进入drainNormal方法:

    void drainNormal() {
        int missed = 1;
        final SimpleQueue<T> q = queue;
        final Observer<? super T> a = actual;
        //第一层循环
        for (;;) {
            // 检查异常处理
            if (checkTerminated(done, q.isEmpty(), a)) {
                return;
            }
            //第二层循环
            for (;;) {
                boolean d = done;
                T v;
                //从队列中获取数据
                v = q.poll();
                boolean empty = v == null;
                // 检查异常
                if (checkTerminated(d, empty, a)) {
                    return;
                }
                //如果没有数据了,跳出
                if (empty) {
                    break;
                }
                //执行下一次操作。
                a.onNext(v);
            }
            //减掉执行的次数,并获取剩于任务数量,然后再次循环
            //直到获取剩余任务量为0,跳出循环
            missed = addAndGet(-missed);
            if (missed == 0) {
                break;
            }
        }
    }
    

    这里的逻辑其实也不难,具体可以看注释。

    到这里其实已经切换了线程,然后就是分发数据,逐个调用onNext操作了。直到没有数据就跳出循环。(总觉得这里missed的设计很奇怪- -为什么是初始化1而不是missed=get()呢。望有大神解答~)

    看到这里也就大致明白了ObserveOn的流程呢。

    总结一下:
    ObserveOn会用一个queue保存上一级传下来的数据,然后通过scheduler创建一个worker,提交数据,并将任务执行在worker设置的线程中。

    subscribeOn源码

    看完ObserveOn,我们看一下subscribeOn,
    首先看一下当他被订阅时会执行什么操作:

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        //创建对应的Observer
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    
        //执行线程调度,内部会订阅上一级的Observable
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    

    可以看到,这里直接进行了线程调度,创建了SubscribeTask任务,然后交由Scheduler执行。

    我们先看看scheduleDirect会执行什么操作:

    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Scheduler.Worker w = createWorker();
    
        Scheduler.DisposeTask task = new Scheduler.DisposeTask(run, w);
    
        w.schedule(task, delay, unit);
    
        return task;
    }
    

    可以看到,这里和我们刚才追踪ObserveOn时的逻辑一样。都是将任务交给了Worker处理。我们刚才已经分析了,Worker会将任务提交给对应的线程执行。

    所以我们回过头看一下我们提交了什么任务:

    @Override
    public void run() {
        source.subscribe(parent);
    }
    

    可以看出,这里将订阅的操作提交给了Worker执行。

    总结一下:
    subscribeOn会将订阅上一级的操作调交给worker中对应的线程执行。

    ObserveOn和subscribeOn

    我们还是以上述引入的例子为例,可以看出,整个过程进行了两次线程调度,首先是subscribeOn,然后是ObserveOn,这个过程比较简单,先解析这个过程。

    根据上一篇文章的分析,RxJava的整个流程分为三个步骤:

    1. 创建任务链,这里没有涉及线程调度。默认执行在当前线程,在这里也就是主线程

    2. 逆向订阅,这里当遇到ObserveOn的时候,ObserveOn直接进行了订阅操作,所以没有影响。
      但是但我们订阅ObservableSubscribeOn的时候,其便将订阅操作提交到了对应线程,所以后续的订阅操作都执行在对应线程,在这里便是IO线程

    3. 执行任务链,受到ObservableSubscribeOn的影响,这里也会继续执行在IO线程
      但是当我们执行到ObserveOnObserver的时候,onNext操作会执行在对应的线程中,在这里也就是切换到主线程

    线程调度.png

    图中,紫色的箭头表示执行在默认线程(主线程),红色的箭头表示执行在IO线程,绳蓝色的线表示执行在切换后的主线程

    observeOn和subscribeOn之间是否存在冲突

    其实从上述的例子我们可以看出并不存在冲突的问题,一个影响的subscribe之后的操作,一个影响的是doNext之后的操作。

    从图中可以看出,不管subscribeObserveOn怎么变化,都不会发生冲突的情况。

    相关文章

      网友评论

      本文标题:RxJava2 源码解析——线程调度 Scheduler

      本文链接:https://www.haomeiwen.com/subject/datkottx.html