通过实例分析RxJava2源码

作者: 一叶知秋yi | 来源:发表于2017-04-21 14:19 被阅读102次

正文

本文将通过一段实例代码的实际执行顺序来分析RxJava2的源码.这样梳理一遍之后,思路会清晰很多.RxJava版本为2.0.8.

//代码片段一
Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
            e.onNext("1");
            e.onComplete();
        }
    })
            .subscribeOn(Schedulers.io())
            .map(new Function<String, Integer>() {
                @Override
                public Integer apply(@NonNull String s) throws Exception {
                    return Integer.parseInt(s);
                }
            })
            .observeOn(Schedulers.computation())
            .subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(@NonNull Disposable d) {
                    print("onSubscribe called");
                }

                @Override
                public void onNext(@NonNull Integer integer) {
                    print("onNext called " + integer);
                }

                @Override
                public void onError(@NonNull Throwable e) {
                    print("onError called");
                }

                @Override
                public void onComplete() {
                    print("onComplete called");
                }
            });

上面就是实例代码,我们接下来以上面的代码为引线,分析RxJava2源码.首先从create()开始.

//代码片段二
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    //判空,后面很多类似的代码,后面就不在赘述
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

RxJavaPlugins这个类都是hook相关的,本例中不涉及hook,所以可以暂时忽略相关代码,认为onAssembly()传入什么就返回什么就可以了.这里传入的是ObservableCreate,暂且先记着这个类吧,后面回调的时候我们又会回来的

//代码片段三
public ObservableCreate(ObservableOnSubscribe<T> source) {
    this.source = source;
}

ObservableCreate将传入的ObservableOnSubscribe source参数先保存起来.source通过字面意思我们可以把它理解为事件源,也就是事件流中的被观察者.先不管这个回调,我们直接往下看subScribeOn()这个方法

//代码片段四
public final Observable<T> subscribeOn(Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

这个方法跟上面的create()方法结构相似,不过这里保存的是ObservableSubscribeOn

//代码片段五
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
    super(source);
    this.scheduler = scheduler;
}

这里也是把事件源先保存起来,同时把传进来的线程调度也保存起来.接下来看map()

//代码片段六
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}

还是跟上面类似的结构,我们直接看ObservableMap

//代码片段七
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
    super(source);
    this.function = function;
}

这里保存事件源,并且把转换事件流的方法先保存起来.接下来看observeOn()

//代码片段八
public final Observable<T> observeOn(Scheduler scheduler) {
    return observeOn(scheduler, false, bufferSize());
}

public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}

这里暂时不管delayErrorbufferSize,先看ObservableObserveOn

//代码片段九
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
    super(source);
    this.scheduler = scheduler;
    this.delayError = delayError;
    this.bufferSize = bufferSize;
}

保存了事件源即其他的相应参数.总结以下,到目前为止,代码只是创建了很多被观察者,并保存起来.初看源码的时候会被这些类的命名搞的晕头转向,但是等你真正理通了整个流程,会发现这些命名还是很有规律的.接下来就是比较关键的subscribe()方法

//代码片段十
public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);

        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // can't call onError because no way to know if a Disposable has been set or not
        // can't call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);

        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    }
}

subscribe()里面做了一些判空操作之后,调用了最关键的subscribeActual(),这里面就是观察者和被观察者真正建立订阅关系的地方.由于subscribe()是基类Observable里面的抽象方法,所以类似一个模板方法,如果子类没有@Override这个方法,那么subscribe()都将执行这段代码,所以后文中我们会从subscribe()方法直接跳到subscribeActual().这里调用subscribeActual()的是observeOn()返回的ObservableObserveOn对象

//代码片段十一
protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        Scheduler.Worker w = scheduler.createWorker();
        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}

这里我们用的不是TrampolineScheduler,则进入else的流程.里面创建了一个Worker对象, 然后调用了subscribe()方法.这里的source就是之前map()保存的事件源ObservableMap(见代码片段六).我们去看看它的subscribeActual()

//代码片段十二
public void subscribeActual(Observer<? super U> t) {
    source.subscribe(new MapObserver<T, U>(t, function));
}

这里的source是之前subscribeOn()里面保存的事件源ObservableSubscribeOn(见代码片段四),我们去看看它的subscribeActual()

//代码片段十三
public void subscribeActual(final Observer<? super T> s) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    s.onSubscribe(parent);
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

这里我们重点关注最后一句,这里的scheduler就是subscribeOn()(见代码片段五)传进来的线程参数,也就是我们指定的被观察者需要运行的线程.我们看一下`SubscribeTask的定义

//代码片段十四
final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver<T> parent;

    SubscribeTask(SubscribeOnObserver<T> parent) {
        this.parent = parent;
    }

    @Override
    public void run() {
        source.subscribe(parent);
    }
}

这里在run()方法里面调用了subscribe()方法,也就是说,后面的操作都是在我们指定的线程中运行的,直到下一次切换线程.这里的source就是create()里面我们创建的ObservableOnSubscribe(见代码片段一),而它的subscribe()已经在create()里面定义的匿名内部类@Override了,无需调用subscribeActual()了,直接调用定义好的subscribe().直到这里,终于轮到我们的被观察者,也就是事件源运行了.这里又总结以下,运行到这里,代码通过subscribe()方法一步一步往上游回溯,并将上下游之间建立了订阅关系.并且已经完成了subsribeOn()的线程切换.
看到这里忍不住说一下,之前看@扔物线大神的文章的时候,里面说subscribeOn()可以调用多次,但只有最上游的,也就是离事件源最近的subscribeOn()才有效.之前对于这一点不是特别理解,看了这里的源码之后就明白了.这时候的事件源还没有开始发送,所以不管你在后面调用多少次subscribeOn()只是指定了事件应该在哪个线程运行,却不是真正的运行.也就是说,只有最后一次的subscribeOn()才有最终的决定权.所以才有了上面的结论.
被观察者操作也很简单,就在onNext()发送了一个"1"字符串

//代码片段十五
Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
            e.onNext("1");
            e.onComplete();
        }
    })

这里的e就是前面SubscribeTask(见代码片段十四)中传入的parent,parent就是之前创建的SubscribeOnObserver,我们看下它的onNext()

//代码片段十六
public void onNext(T t) {
    actual.onNext(t);
}

onNext()里面调用了actual.onNext(),actual一直往上回溯,其实就是subscribeOn()的观察者,也就是MapObserver(见代码片段十二).这里的t就是一开始传进来的"1".

//代码片段十七
public void onNext(T t) {
    if (done) {
        return;
    }

    if (sourceMode != NONE) {
        actual.onNext(null);
        return;
    }

    U v;

    try {
        v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
    } catch (Throwable ex) {
        fail(ex);
        return;
    }
    actual.onNext(v);
}

MapObserver中的onNext()主要做了一件事件,就是把之前存下来的转换事件流方法mapper执行了,将字符串"1"转成了Integer型的1,并调用了其观察者的onNext(),也就是ObserveOnObserver(见代码片段十一),这里的t已经是经过map转换后的Ingeter型的1了.

//代码片段十八
public void onNext(T t) {
    if (done) {
        return;
    }

    if (sourceMode != QueueDisposable.ASYNC) {
        queue.offer(t);
    }
    schedule();
}

那么observeOn()如何做到切换线程的呢?我们继续往下看,首先看schedule()方法

//代码片段十九
void schedule() {
    if (getAndIncrement() == 0) {
        worker.schedule(this);
    }
}

worker.schedule()参数是Runnable,而ObserveOnObserver实现了Runnable接口,直接直接去看run()方法

//代码片段二十
public void run() {
    if (outputFused) {
        drainFused();
    } else {
        drainNormal();
    }
}

我们先关心正常流程,看看drainNormal()

//代码片段二十一
void drainNormal() {
    int missed = 1;
    final SimpleQueue<T> q = queue;
    final Observer<? super T> a = actual;
    for (;;) {
        if (checkTerminated(done, q.isEmpty(), a)) {
            return;
        }
        for (;;) {
            boolean d = done;
            T v;
            try {
                v = q.poll();
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                s.dispose();
                q.clear();
                a.onError(ex);
                worker.dispose();
                return;
            }
            boolean empty = v == null;
            if (checkTerminated(d, empty, a)) {
                return;
            }
            if (empty) {
                break;
            }
            a.onNext(v);
        }
        missed = addAndGet(-missed);
        if (missed == 0) {
            break;
        }
    }
}

现在这里已经是在onserveOn()里面指定的线程运行了.首先定义了一个队列用来存放上游发来的事件流,然后一个一个取出来,经过一系列的判空处理,调用了a.onNext(v).这里的a由上面的actual赋值而来,而actual就是我们代码中在subscribe()中定义的最终的观察者.看到这里我们就能看出observeOn()subscribeOn()的区别了.observeOn()是在事件发送之后切换线程的,也就是会影响下游的运行线程,并且每次切换都能影响下一个观察者所运行的线程.下图是一个大体的流程图,它表明了整个事件执行的过程.

流程图

总结

之前只是知道怎么用RxJava,觉得用起来很爽,但是一直处于一种知其然不知其所以然的状态.这次梳理了一下源码之后思路清晰了很多.果然啃源码才是学习一个框架最直接的方式

相关文章

网友评论

本文标题:通过实例分析RxJava2源码

本文链接:https://www.haomeiwen.com/subject/jthdzttx.html