美文网首页
RxJava 源码浅析

RxJava 源码浅析

作者: Parallel_Lines | 来源:发表于2019-08-20 13:17 被阅读0次

    本文只分析 RxJava 的基本原理与流程,不深入探讨具体操作符的实现细节。

    背景

    为什么使用 RxJava?

    解决异步回调多重嵌套

    比如类似的代码:

    new Thread(new Runnable() {
        @Override
        public void run() {
            //do something
            handler.post(new Runnable() {
                @Override
                public void run() {
                    //getResult do something
                }
            });
        }
    }).start();
    

    RxJava 的实现

    Observable.create((ObservableOnSubscribe<String>) e -> {
        //do something
    })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(s -> {
            //getResult do something
        });
    

    链式调用,解决代码多层嵌套问题,在逻辑越发复杂时优势也将越发明显。

    解决回调地狱

    当有俩个及以上要求顺序访问的网络请求,往往会出现下面的代码:

    Net.mockNet(TOKEN_URL, new Net.Callback() {
        @Override
        public void onResponse(String result) {
            // getResult do something
            Net.mockNet(CART_URL, new Net.Callback() {
                @Override
                public void onResponse(String result) {
                    // getResult do something
                }
            });
        }
    });
    

    这种不优雅的实现,可以用 RxJava 解决:

    Observable<String> o1 = Observable.create((ObservableOnSubscribe<String>) e -> e.onNext(Net.mockNet(TOKEN_URL, 2000)));
    
    o1.concatMap((Function<String, ObservableSource<?>>) s -> {
        return Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                //getResult do something
                e.onNext(Net.mockNet(CART_URL, 2000));
            }
        });
    })
            .cast(String.class)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(s -> {
            //getResult do something
            });
    

    看似并不简洁,但是如果我们把创建 Observable 对象的过程(o1、o2)封装为方法(暂不考虑封装的实现),代码就会变为如下效果:

    askToken().concatMap(s -> askCart((String) s)) // getResult do something
            //.concatMap(s -> askAnother((String) s)) // 你还可以接着续接
            .cast(String.class)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(s ->
                    // getResult do something
            );
    

    实际上使用 Retrofit 框架后,代码就会变得像上述一样简洁,而且就逻辑而言也更加清晰易懂。

    其它

    配合其它框架实现更多优势功能。

    使用

    RxJava 的使用教程网上有很多,这里不作具体说明。

    源码分析

    基本原理分析 - 1

    Rxjava 2.0 和 1.0 的核心原理区别不大,都是基于观察者模式。

    关于观察者模式的核心原理,可以参考下面的演示代码(初级 demo,仅用作启发):

    
    /**
     * ⭐️ 观察者将实例交给被观察者,并且当被观察者数据变化时,被观察者'主动'去调用观察者实例的方法。
     */
    
    //被观察者
    public class MyObservable {
    
        private MyObserver myObserver;
    
        public void setObserver(MyObserver observer) {
            this.myObserver = observer;
        }
    
        public void deleteObserver() {
            this.myObserver = null;
        }
    
        protected void notifyData(Object obj) {
            if (myObserver != null) {
                myObserver.updateData(obj);
            }
        }
    }
    
    
    //观察者
    public interface MyObserver {
    
        void updateData(Object obj);
    }
    
    

    首先我们从 RxJava 最基础的实现开始,研究其原理:

    Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            // todo something
            e.onNext("test");
        }
    })
            .subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(String value) {
                    //回调
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            });
    

    Observable 是被观察者,Observer 是观察者,根据观察者模式原理,消息是从被观察者发送给观察者的,所以我们从 Observable 的代码开始分析,先看 create 方法:

    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
    

    阅读源码注意不能被细节牵绊,要充分利用命名的意义。这里我们可以大致推断出来,我们得到的 Observable 就是 new ObservableCreate<T>(source),阅读 RxJavaPlugins.onAssembly 源码也能印证这一点,这里就不深入讨论,直接看 ObservableCreate 源码。

    public final class ObservableCreate<T> extends Observable<T> {
        final ObservableOnSubscribe<T> source;
    
        public ObservableCreate(ObservableOnSubscribe<T> source) {
            this.source = source;
        }
    
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            CreateEmitter<T> parent = new CreateEmitter<T>(observer);
            observer.onSubscribe(parent);
    
            try {
                source.subscribe(parent);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
        }
    
        static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
    
            final Observer<? super T> observer;
    
            CreateEmitter(Observer<? super T> observer) {
                this.observer = observer;
            }
    
            @Override
            public void onNext(T t) {
                if (t == null) {
                    onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                    return;
                }
                if (!isDisposed()) {
                    observer.onNext(t);
                }
            }
    
            @Override
            public void onError(Throwable t) {
                if (t == null) {
                    t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
                }
                if (!isDisposed()) {
                    try {
                        observer.onError(t);
                    } finally {
                        dispose();
                    }
                } else {
                    RxJavaPlugins.onError(t);
                }
            }
    
            @Override
            public void onComplete() {
                if (!isDisposed()) {
                    try {
                        observer.onComplete();
                    } finally {
                        dispose();
                    }
                }
            }
            ...
        }
        ...
    

    这里截取了类的部分代码,先分析它的关键方法 subscribeActual()

        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            CreateEmitter<T> parent = new CreateEmitter<T>(observer);
            observer.onSubscribe(parent);
    
            try {
                source.subscribe(parent);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
        }
    

    subscribeActual() 最终调用了 source.subscribe(parent);,从上下文可知 source 即我们重写并传入的 ObservableOnSubscribe 的参数,所以 subscribeActual() 最终就是执行我们 Observable.create 实现的 subscribe() 方法

    而 parent,就是在 Observable.create 中调用 emitter.onNext()ObservableEmitter 的具体实现。它的代码也在上述源码中:

    @Override
    public void onNext(T t) {
        if (t == null) {
            onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
            return;
        }
        if (!isDisposed()) {
            observer.onNext(t);
        }
    }
    

    也就是说 Emitter 作为发射器,最终还是将 onNext 还是交给了观察者 observer.onNext()

    至此下游链路通了,代码的执行顺序为 ObservableCreate.subscribeActual() → ObservableOnSubscribe.subscribe() → CreateEmitter.onNext() → Observer.onNext()

    那么是什么时候调用的 subscribeActual() 的呢?

    我们知道被观察者通知观察者需要持有观察者的实例,所以在这个问题之前,我们先分析观察者 observer 的实例是多会交给被观察者 Observable 的。

    在最初的示例代码中,Observable 在 create 之后,紧接着调用 subscribe(new Observer()) 将观察者传给被观察者:

    obervable.subscribe(new Observer<String>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(String value) {
                            //回调
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onComplete() {
    
                        }
                    });
    

    阅读 Observable.subscribe 源码:

        public final void subscribe(Observer<? super T> observer) {
            ObjectHelper.requireNonNull(observer, "observer is null");
            try {
                observer = RxJavaPlugins.onSubscribe(this, observer);
    
                ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
    
                subscribeActual(observer); //⭐️
            } catch (NullPointerException e) { // NOPMD
                throw e;
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                // can't call onError because no way to know if a Disposable has been set or not
                // can't call onSubscribe because the call might have set a Subscription already
                RxJavaPlugins.onError(e);
    
                NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
                npe.initCause(e);
                throw npe;
            }
        }
    

    就是说 Observable.subscribe() 直接调用了 Observable.subscribeActual()

    所以对于这个简单模型,代码执行的完整顺序为:

    基础模型代码调用顺序

    基本原理分析 - 2

    上面理清了 RxJava 最基础代码的实现原理 -- 观察者模式。下面稍加扩展,通过分析 subscribeOnconcatMap 等操作符的基本实现,研究整个 Rx 链路的串行原理。这里不会分析 subscribeOn 如何跨线程、concatMap 如何分发,有兴趣可以自行研究。

    subscribeOn 是指定被观察者所在线程,看下源码:

    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
    

    这里,最终返回了 new ObservableSubscribeOn<T>(this, scheduler);

    先来看下 ObservableSubscribeOn 以及它的参数究竟是谁。

    1.类 ObservableSubscribeOn
    
    【ObservableSubscribeOn】 extends 【AbstractObservableWithUpstream】 extends 【Observable】 implements 【ObservableSource】
    
    2.参数 this
    
    ObservableSource
    
    3.参数 scheduler
    
    指定线程类型,不具体分析。
    

    subscribeOn() 的返回值 ObservableSubscribeOn 对象也是 Observable 类型,且参数是 ObservableSource 类型,而 Observable 实现了 ObservableSource 接口。

    这种实现是不是很熟悉,类似于代理模式,通过嵌套同一接口,实现功能的链式叠加。也就是说 subscribeOn() 方法返回了上层 Observable 的代理对象,我们看下 ObservableSubscribeOn 的源码:

    public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
        final Scheduler scheduler;
    
        public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
            super(source);
            this.scheduler = scheduler;
        }
    
        @Override
        public void subscribeActual(final Observer<? super T> s) {
            final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); //2
    
            s.onSubscribe(parent);
    
            parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
                @Override
                public void run() {
                    source.subscribe(parent); //1.
                }
            }));
        }
    
        static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
    
            private static final long serialVersionUID = 8094547886072529208L;
            final Observer<? super T> actual;
    
            final AtomicReference<Disposable> s;
    
            SubscribeOnObserver(Observer<? super T> actual) {
                this.actual = actual;
                this.s = new AtomicReference<Disposable>();
            }
    
            @Override
            public void onSubscribe(Disposable s) {
                DisposableHelper.setOnce(this.s, s);
            }
    
            @Override
            public void onNext(T t) {
                actual.onNext(t);
            }
    
            ...
        }
    }
    

    注释 1 处,在线程调度之后,ObservableSubscribeOn 最终调用了上层的 ObservableSource(即原 Observable)。

    同样的,注释 2 处,Observer 也使用了类似的嵌套。

    所以对于基础模型上加 subscribeOn(),代码执行顺序为:

    ObservableSubscribeOn.subscribe() → ObservableSubscribeOn.subscribeActual() → ObservableCreate.subscribe() → ObservableCreate.subscribeActual() → ObservableOnSubscribe.subscribe() 你的被观察者实现 → CreateEmitter.onNext() → SubscribeOnObserver.onNext() → Observer.onNext() 你的观察者实现

    从上面分析可以推测,正是这种层层代理实现了 Rxjava 的链式调用,实际代码调用顺序可以对比参考如下模型,在把握宏观流程的情况下,有助于阅读代码细节,如具体操作符的实现等。

    Rx 链式调用模型

    简单看下 concatMap 的源码:

    public final class ObservableConcatMap<T, U> extends AbstractObservableWithUpstream<T, U> {
        final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
        final int bufferSize;
    
        final ErrorMode delayErrors;
    
        public ObservableConcatMap(ObservableSource<T> source, Function<? super T, ? extends ObservableSource<? extends U>> mapper,
                int bufferSize, ErrorMode delayErrors) {
            super(source);
            this.mapper = mapper;
            this.delayErrors = delayErrors;
            this.bufferSize = Math.max(8, bufferSize);
        }
        @Override
        public void subscribeActual(Observer<? super U> s) {
    
            if (ObservableScalarXMap.tryScalarXMapSubscribe(source, s, mapper)) {
                return;
            }
    
            if (delayErrors == ErrorMode.IMMEDIATE) {
                SerializedObserver<U> serial = new SerializedObserver<U>(s);
                source.subscribe(new SourceObserver<T, U>(serial, mapper, bufferSize));
            } else {
                source.subscribe(new ConcatMapDelayErrorObserver<T, U>(s, mapper, bufferSize, delayErrors == ErrorMode.END));
            }
        }
        ...
    

    可以看出大同小异遵循上述模式。其余细节有兴趣可以自行研究。

    总结

    阅读源码旨在学习框架结构,从中获得启发,细节实现并非研究重点。

    源码阅读遵循从宏观到微观、从简单到复杂逐步拆解分析的原则,千万不要被细节所牵绊。

    后面接着分析 Retrofit。

    相关文章

      网友评论

          本文标题:RxJava 源码浅析

          本文链接:https://www.haomeiwen.com/subject/dqxpsctx.html