美文网首页
RxJava2源码浅析(一)

RxJava2源码浅析(一)

作者: 最美的谣言 | 来源:发表于2017-06-16 14:21 被阅读0次

    前言

    我们经常看RxJava的文章,很多都是API性的介绍.今天我们就用一段来理解它吧,了解它的内幕

    本文编译需要:
    compile 'io.reactivex.rxjava2:rxjava:2.0.1'
    compile 'io.reactivex.rxjava2:rxandroid:2.0.1'

    public static void debug() {
       1.     Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
       2.        @Override
       3.         public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
       4.            emitter.onNext(1);
       5.        }
       6.     });
       7.     Consumer<Integer> consumer = new Consumer<Integer>() {
       8.         @Override
       9.        public void accept(Integer integer) throws Exception {
     10.               Log.d(TAG, "accept: " + integer);
     11.          }
     12.     };
     13.    observable.subscribe(consumer);
    }
    

    这篇文章的目的就是分析这段代码,去挖掘RxJava到底做了什么

    让我们开始吧

    我们都知道RxJava最重要的东西就是Obervable(被观察者)和Observer(观察者), 或者针对事件来说就是上游和下游, 上游发送事件下游处理事件. 为了便于分析,我把代码标了行号,便于引用.

    粗略的分析上面这段代码就是一个Observable订阅了一个Consumer

    弄清Obervable的来源

    来看Observable.create() 方法的原型:

    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
            ObjectHelper.requireNonNull(source, "source is null");
            return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
        }
    

    它的参数是ObservableOnSubscribe类型,它只有一个方法待实现:

    void subscribe(ObservableEmitter<T> e) throws Exception;
    

    为什么需要ObservableEmitter形参呢.我们不用管,因为被new进ObservableCreate构造函数里去了,肯定是回调作用,这在我们平时写代码时见多了.实际上我们点进ObservableCreate里看到ObservableEmitter是一个内部类, 那问题是RxJavaPlugins.onAssembly()又是干嘛的,

    public static <T> Observable<T> onAssembly(Observable<T> source) {
            Function<Observable, Observable> f = onObservableAssembly;
            if (f != null) {
                return apply(f, source);
            }
            return source;
        }
    

    可以看到把onObservableAssembly赋给f, onObservableAssembly是什么 , 其实就是一个属性而已, 含有getter和setter.而我们也没看到调用set函数,所以该函数就直接返回了source,所以我们得到最终的Observable就是ObservableCreate对象.为什么用这个代替Observabel呢,看下类声明

    public final class ObservableCreate<T> extends Observable<T> {
    ...
    }
    

    原来就是Observable的子类.
    到了这里,终于清楚了Observable的来龙去脉.

    弄清Observer的来源

    我们看到这片代码用的是Consumer,它跟Observer有什么关系呢. 看下它的声明

    public interface Consumer<T> {
        /**
         * Consume the given value.
         * @param t the value
         * @throws Exception on error
         */
        void accept(T t) throws Exception;
    }
    

    这不就是一个普普通通的接口嘛,实现accept的方法就ok了,是的,consumer就是这么简单

    是时候让Observable和Consumer发生关联了

    只剩下observable.subscribe(consumer); 这句了, 顾名思义,被观察者订阅了消费者(观察者),我们跟进去看看代码怎么写的

    @SchedulerSupport(SchedulerSupport.NONE)
        public final Disposable subscribe(Consumer<? super T> onNext) {
            return subscribe(onNext, Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION, Functions.emptyConsumer());
        }
    

    它调用了subscribe方法,用了我们传进去的consumer对象即onNext,而后面三个参数是系统提供默认的实现,这也说明,肯定还能传进我们自己想要的参数,先不管.继续跟:

    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
                Action onComplete, Consumer<? super Disposable> onSubscribe) {
            ObjectHelper.requireNonNull(onNext, "onNext is null");
            ObjectHelper.requireNonNull(onError, "onError is null");
            ObjectHelper.requireNonNull(onComplete, "onComplete is null");
            ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
    
            LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
    
            subscribe(ls);
    
            return ls;
        }
    

    这片代码先是进行了一些非空判断,然后把我们的onNext转换成LambdaObserver对象了,LambdaObserver是什么呢:

    public final class LambdaObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
    ...
    }
    

    它实现了Observer, 到了这里,终于明白了,原来我们传进来的Consumer被处理成LambdaObserver对象了,继续跟进subscribe(ls);

    public final void subscribe(Observer<? super T> observer) {
            ObjectHelper.requireNonNull(observer, "observer is null");
            try {
                observer = RxJavaPlugins.onSubscribe(this, observer);
    
                ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
    
                subscribeActual(observer);
            } catch (NullPointerException e) { // NOPMD
                throw e;
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                // can't call onError because no way to know if a Disposable has been set or not
                // can't call onSubscribe because the call might have set a Subscription already
                RxJavaPlugins.onError(e);
    
                NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
                npe.initCause(e);
                throw npe;
            }
        }
    

    记住,我们手里现在握着的Observable对象是什么,Observer是什么, 当然是ObservableCreate和LambdaObserver的对象.

    我们又看到了RxJavaPlugins的调用,跟我们上一步讲的onAssembly是类似的,直接返回了observer.往下就是一个非空的判断了.接着是subscribeActual方法的调用.
    看下改方法的声明:

    protected abstract void subscribeActual(Observer<? super T> observer);
    

    它是Observable类的一个抽象方法,问题是谁来实现它,就是ObservableCreate了.因为是他实现了Observable.

    public ObservableCreate(ObservableOnSubscribe<T> source) {
            this.source = source;
        }
    
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            CreateEmitter<T> parent = new CreateEmitter<T>(observer);
            observer.onSubscribe(parent);
    
            try {
                source.subscribe(parent);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
        }
    

    这是ObservableCreate的代码片段.看到它的构造函数没,参数source不就是我们开始传进来的嘛.
    继续看subscribeActual方法, 第一行用我们的LambdaObserver对象为参,构造了CreateEmitter对象,继续跟进. 因为observer被LambdaObserver实现了,所以这里当然跳进它的onSubscribe方法

    public LambdaObserver(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
                Action onComplete,
                Consumer<? super Disposable> onSubscribe) {
            super();
            this.onNext = onNext;
            this.onError = onError;
            this.onComplete = onComplete;
            this.onSubscribe = onSubscribe;
        }
    @Override
        public void onSubscribe(Disposable s) {
            if (DisposableHelper.setOnce(this, s)) {
                try {
                    onSubscribe.accept(this);
                } catch (Throwable ex) {
                    Exceptions.throwIfFatal(ex);
                    onError(ex);
                }
            }
        }
    

    这里先是用一个帮助类判断了一下,下一步的onSubscribe是什么呢,我们看到它的构造函数,就是我们之前讲的系统给我们提供的默认实现,默认实现是空实现.所以这个方法我们认为什么都没做.但是,如果onSubscribe不是系统提供的呢,那么它的执行时间点是比onNext早的.
    我们继续跟进subscribeActual方法.现在要处理try块了

    source.subscribe(parent);
    

    注意这个source是ObservableOnSubscribe对象.这个对象又是啥呢?这就不是我们一开始new出来的嘛

          ...    new ObservableOnSubscribe<Integer>() {
       2.        @Override
       3.         public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
       4.            emitter.onNext(1);
       5.        }
       6.     }
    

    参数刚好也是ObservableEmitter, 第4行的emitter参数就是

    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    

    传进去的. 紧跟着emitter.onNext(1);就是parent对象的事了

    static final class CreateEmitter<T>
        extends AtomicReference<Disposable>
        implements ObservableEmitter<T>, Disposable {
    
    
            private static final long serialVersionUID = -3434801548987643227L;
    
            final Observer<? super T> observer;
    
            CreateEmitter(Observer<? super T> observer) {
                this.observer = observer;
            }
    
            @Override
            public void onNext(T t) {
                if (t == null) {
                    onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                    return;
                }
                if (!isDisposed()) {
                    observer.onNext(t);
                }
            }
    ...
    

    还记得吧,这里的observer就是LambdaObserver对象也就是一开始的Consumer了.
    isDisposed()是什么,表示是否取消被观察者和观察者的关联,我们一路跟进来,没有关于它的操作,肯定返回true
    然后就是observer.onNext(t);即回调了Consumer的onNext方法.

    结束

    到了这里一个简单的流程终于完成了,谢谢大家!

    相关文章

      网友评论

          本文标题:RxJava2源码浅析(一)

          本文链接:https://www.haomeiwen.com/subject/myapqxtx.html