美文网首页
RxJava2 源码三:线程调度

RxJava2 源码三:线程调度

作者: MxsQ | 来源:发表于2018-09-10 17:14 被阅读0次

    前言

    前两篇文章中,分别对RxJava2的基本路程与链式调用分别做了阐述,如有遗忘,此为传送门

    其实,从Rx调用链上,线程调度仅是遵守运转机制的一环,但因其便捷、高频的特点,并在项目中很可能需要切换到自己的线程里,故将其选出,理解如何实现。

    正文

    案例

    ob.observeOn(Schedulers.newThread())
    

    由于流程、链式均做了阐述,因此案例直接定位到线程调度线程。上面的案例中,下游被切换到了新的线程里进行响应。

    调度器与工作线程

    Rx中,由调度器负责提供工作线程,而工作线程则负责具体的运转。以下为两者的简要信息。

    public abstract class Scheduler {
        ......
        public abstract Worker createWorker();
    }
    
    public abstract static class Worker implements Disposable {
        ......
        public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);
    }
    
    • Scheduler.createWorker(): 获取用来执行任务的线程
    • Worker.schedule() : 具体的调度逻辑

    当前案例中,ob.observeOn()拿到的Observable为ObservableObserveOn。

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            // 一般会执行到这里
            // 通过Scheduler拿到执行线程任务的Worker
            Scheduler.Worker w = scheduler.createWorker();
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
    

    在Rx构建调用链构建时,将会来到subscribeActual(),且对于当前案例来说,事件推送到对应的下游节点才会进行线程调度,因此,调度信息由ObserveOnObserver保存,具体信息如下:

    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
            implements Observer<T>, Runnable {
        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.actual = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }
    }
    

    当事件推送到当前节点,即ObserveOnObserver将进行处理,取onNext()来看

    @Override
    public void onNext(T t) {
        ......
        schedule();
    }
    
    void schedule() {
        if (getAndIncrement() == 0) {
            worker.schedule(this);
        }
    }
    
    @Override
    public void run() {
        if (outputFused) {
            drainFused();
        } else {
            drainNormal();
        }
    }
    

    ObserveOnObserver将作为Runnable的交给工作线程Worker处理,当ObserveOnObserver作为线程任务获得运转时机后,drainFused()或drainNormal()根据具体情况向调用链上的下一节点推送相应事件,代码不贴。而线程调度也就完成了。

    为何这么短

    当前节点下,关注的事情是Rx如何进行线程调度,而核心为,将当前节点作为Runnable运行在相应的线程里,步骤为:

    • 由Scheduler提供工作线程Worker
    • 在当前节点的响应时机,将此节点作为Runnable交给Worker处理,在run()时机将事件推送知下一节点

    至于Worker是如何处理Runnable,何时获得执行时机,这就是Rx机制之外的问题了,因为已能保证接下来的响应,运行在Worker线程里,即完成了线程调度,如下图:


    线程调度.jpg

    下一篇:响应式拉取(未发布,待续)

    相关文章

      网友评论

          本文标题:RxJava2 源码三:线程调度

          本文链接:https://www.haomeiwen.com/subject/rcasgftx.html