通过实例分析RxJava2源码

作者: 一叶知秋yi | 来源:发表于2017-04-21 14:19 被阅读102次

    正文

    本文将通过一段实例代码的实际执行顺序来分析RxJava2的源码.这样梳理一遍之后,思路会清晰很多.RxJava版本为2.0.8.

    //代码片段一
    Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                e.onNext("1");
                e.onComplete();
            }
        })
                .subscribeOn(Schedulers.io())
                .map(new Function<String, Integer>() {
                    @Override
                    public Integer apply(@NonNull String s) throws Exception {
                        return Integer.parseInt(s);
                    }
                })
                .observeOn(Schedulers.computation())
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(@NonNull Disposable d) {
                        print("onSubscribe called");
                    }
    
                    @Override
                    public void onNext(@NonNull Integer integer) {
                        print("onNext called " + integer);
                    }
    
                    @Override
                    public void onError(@NonNull Throwable e) {
                        print("onError called");
                    }
    
                    @Override
                    public void onComplete() {
                        print("onComplete called");
                    }
                });
    

    上面就是实例代码,我们接下来以上面的代码为引线,分析RxJava2源码.首先从create()开始.

    //代码片段二
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        //判空,后面很多类似的代码,后面就不在赘述
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
    

    RxJavaPlugins这个类都是hook相关的,本例中不涉及hook,所以可以暂时忽略相关代码,认为onAssembly()传入什么就返回什么就可以了.这里传入的是ObservableCreate,暂且先记着这个类吧,后面回调的时候我们又会回来的

    //代码片段三
    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
    

    ObservableCreate将传入的ObservableOnSubscribe source参数先保存起来.source通过字面意思我们可以把它理解为事件源,也就是事件流中的被观察者.先不管这个回调,我们直接往下看subScribeOn()这个方法

    //代码片段四
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
    

    这个方法跟上面的create()方法结构相似,不过这里保存的是ObservableSubscribeOn

    //代码片段五
    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }
    

    这里也是把事件源先保存起来,同时把传进来的线程调度也保存起来.接下来看map()

    //代码片段六
    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }
    

    还是跟上面类似的结构,我们直接看ObservableMap

    //代码片段七
    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }
    

    这里保存事件源,并且把转换事件流的方法先保存起来.接下来看observeOn()

    //代码片段八
    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }
    
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }
    

    这里暂时不管delayErrorbufferSize,先看ObservableObserveOn

    //代码片段九
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }
    

    保存了事件源即其他的相应参数.总结以下,到目前为止,代码只是创建了很多被观察者,并保存起来.初看源码的时候会被这些类的命名搞的晕头转向,但是等你真正理通了整个流程,会发现这些命名还是很有规律的.接下来就是比较关键的subscribe()方法

    //代码片段十
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);
    
            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
    
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);
    
            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }
    

    subscribe()里面做了一些判空操作之后,调用了最关键的subscribeActual(),这里面就是观察者和被观察者真正建立订阅关系的地方.由于subscribe()是基类Observable里面的抽象方法,所以类似一个模板方法,如果子类没有@Override这个方法,那么subscribe()都将执行这段代码,所以后文中我们会从subscribe()方法直接跳到subscribeActual().这里调用subscribeActual()的是observeOn()返回的ObservableObserveOn对象

    //代码片段十一
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
    

    这里我们用的不是TrampolineScheduler,则进入else的流程.里面创建了一个Worker对象, 然后调用了subscribe()方法.这里的source就是之前map()保存的事件源ObservableMap(见代码片段六).我们去看看它的subscribeActual()

    //代码片段十二
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }
    

    这里的source是之前subscribeOn()里面保存的事件源ObservableSubscribeOn(见代码片段四),我们去看看它的subscribeActual()

    //代码片段十三
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        s.onSubscribe(parent);
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    

    这里我们重点关注最后一句,这里的scheduler就是subscribeOn()(见代码片段五)传进来的线程参数,也就是我们指定的被观察者需要运行的线程.我们看一下`SubscribeTask的定义

    //代码片段十四
    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;
    
        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }
    
        @Override
        public void run() {
            source.subscribe(parent);
        }
    }
    

    这里在run()方法里面调用了subscribe()方法,也就是说,后面的操作都是在我们指定的线程中运行的,直到下一次切换线程.这里的source就是create()里面我们创建的ObservableOnSubscribe(见代码片段一),而它的subscribe()已经在create()里面定义的匿名内部类@Override了,无需调用subscribeActual()了,直接调用定义好的subscribe().直到这里,终于轮到我们的被观察者,也就是事件源运行了.这里又总结以下,运行到这里,代码通过subscribe()方法一步一步往上游回溯,并将上下游之间建立了订阅关系.并且已经完成了subsribeOn()的线程切换.
    看到这里忍不住说一下,之前看@扔物线大神的文章的时候,里面说subscribeOn()可以调用多次,但只有最上游的,也就是离事件源最近的subscribeOn()才有效.之前对于这一点不是特别理解,看了这里的源码之后就明白了.这时候的事件源还没有开始发送,所以不管你在后面调用多少次subscribeOn()只是指定了事件应该在哪个线程运行,却不是真正的运行.也就是说,只有最后一次的subscribeOn()才有最终的决定权.所以才有了上面的结论.
    被观察者操作也很简单,就在onNext()发送了一个"1"字符串

    //代码片段十五
    Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                e.onNext("1");
                e.onComplete();
            }
        })
    

    这里的e就是前面SubscribeTask(见代码片段十四)中传入的parent,parent就是之前创建的SubscribeOnObserver,我们看下它的onNext()

    //代码片段十六
    public void onNext(T t) {
        actual.onNext(t);
    }
    

    onNext()里面调用了actual.onNext(),actual一直往上回溯,其实就是subscribeOn()的观察者,也就是MapObserver(见代码片段十二).这里的t就是一开始传进来的"1".

    //代码片段十七
    public void onNext(T t) {
        if (done) {
            return;
        }
    
        if (sourceMode != NONE) {
            actual.onNext(null);
            return;
        }
    
        U v;
    
        try {
            v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
        } catch (Throwable ex) {
            fail(ex);
            return;
        }
        actual.onNext(v);
    }
    

    MapObserver中的onNext()主要做了一件事件,就是把之前存下来的转换事件流方法mapper执行了,将字符串"1"转成了Integer型的1,并调用了其观察者的onNext(),也就是ObserveOnObserver(见代码片段十一),这里的t已经是经过map转换后的Ingeter型的1了.

    //代码片段十八
    public void onNext(T t) {
        if (done) {
            return;
        }
    
        if (sourceMode != QueueDisposable.ASYNC) {
            queue.offer(t);
        }
        schedule();
    }
    

    那么observeOn()如何做到切换线程的呢?我们继续往下看,首先看schedule()方法

    //代码片段十九
    void schedule() {
        if (getAndIncrement() == 0) {
            worker.schedule(this);
        }
    }
    

    worker.schedule()参数是Runnable,而ObserveOnObserver实现了Runnable接口,直接直接去看run()方法

    //代码片段二十
    public void run() {
        if (outputFused) {
            drainFused();
        } else {
            drainNormal();
        }
    }
    

    我们先关心正常流程,看看drainNormal()

    //代码片段二十一
    void drainNormal() {
        int missed = 1;
        final SimpleQueue<T> q = queue;
        final Observer<? super T> a = actual;
        for (;;) {
            if (checkTerminated(done, q.isEmpty(), a)) {
                return;
            }
            for (;;) {
                boolean d = done;
                T v;
                try {
                    v = q.poll();
                } catch (Throwable ex) {
                    Exceptions.throwIfFatal(ex);
                    s.dispose();
                    q.clear();
                    a.onError(ex);
                    worker.dispose();
                    return;
                }
                boolean empty = v == null;
                if (checkTerminated(d, empty, a)) {
                    return;
                }
                if (empty) {
                    break;
                }
                a.onNext(v);
            }
            missed = addAndGet(-missed);
            if (missed == 0) {
                break;
            }
        }
    }
    

    现在这里已经是在onserveOn()里面指定的线程运行了.首先定义了一个队列用来存放上游发来的事件流,然后一个一个取出来,经过一系列的判空处理,调用了a.onNext(v).这里的a由上面的actual赋值而来,而actual就是我们代码中在subscribe()中定义的最终的观察者.看到这里我们就能看出observeOn()subscribeOn()的区别了.observeOn()是在事件发送之后切换线程的,也就是会影响下游的运行线程,并且每次切换都能影响下一个观察者所运行的线程.下图是一个大体的流程图,它表明了整个事件执行的过程.

    流程图

    总结

    之前只是知道怎么用RxJava,觉得用起来很爽,但是一直处于一种知其然不知其所以然的状态.这次梳理了一下源码之后思路清晰了很多.果然啃源码才是学习一个框架最直接的方式

    相关文章

      网友评论

      本文标题:通过实例分析RxJava2源码

      本文链接:https://www.haomeiwen.com/subject/jthdzttx.html