Rxjava2源码浅析(一)

作者: Sp_WannaSing | 来源:发表于2017-03-17 10:00 被阅读0次

    面试的时候被问道各种框架的原理架构,也是很尴尬,自以为写的代码不少,用过的框架也不少,深入的去研究源码的还真是不多,也是给自己敲了一个警钟,今天就来尝试剖析一下Rxjava2的源码,水平有限,就先看一下基础的用法相关,一些难度更高的操作符就慢慢来分析吧。
    就按照平时使用的顺序来分析:

    一、初始化Observerble

    基本使用实例:

    Observable<String> observable= Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> e) throws Exception {
                    e.onNext("aaa");
                }
            });
    

    先看一下内部的参数 ObservableOnSubscribe<>() 。

    public interface ObservableOnSubscribe<T> {
        void subscribe(ObservableEmitter<T> e) throws Exception;
    }
    

    就是一个接口,这里用的就是它的一个匿名实现类。而接口内部的方法中我们看到ObservableEmitter<> 是一个Rxjava2新推出的类,俗称发射器。

    public interface ObservableEmitter<T> extends Emitter<T> {
    
        /**
         * Sets a Disposable on this emitter; any previous Disposable
         * or Cancellation will be unsubscribed/cancelled.
         * @param d the disposable, null is allowed
         */
        void setDisposable(Disposable d);
    
        /**
         * Sets a Cancellable on this emitter; any previous Disposable
         * or Cancellation will be unsubscribed/cancelled.
         * @param c the cancellable resource, null is allowed
         */
        void setCancellable(Cancellable c);
    
        /**
         * Returns true if the downstream disposed the sequence.
         * @return true if the downstream disposed the sequence
         */
        boolean isDisposed();
    
        /**
         * Ensures that calls to onNext, onError and onComplete are properly serialized.
         * @return the serialized ObservableEmitter
         */
        ObservableEmitter<T> serialize();
    }
    
    

    这里面几个回调方法的作用注释也说的很清楚了就不多说了。
    它继承自Emitter

    public interface Emitter<T> {
    
        /**
         * Signal a normal value.
         * @param value the value to signal, not null
         */
        void onNext(@NonNull T value);
    
        /**
         * Signal a Throwable exception.
         * @param error the Throwable to signal, not null
         */
        void onError(@NonNull Throwable error);
    
        /**
         * Signal a completion.
         */
        void onComplete();
    }
    
    

    可以看到,这里面就是我们比较熟悉的next、complete、error三个回调方法了。其实这个create方法内部的参数就是两个接口的回调,理解就行了,然后看一下create方法。

     public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
            ObjectHelper.requireNonNull(source, "source is null");
            return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
        }
    

    requireNonNull是很好理解的,看名字也能猜测出是测试传进来的ObservableOnSubscribe是否为空

      public static <T> T requireNonNull(T object, String message) {
            if (object == null) {
                throw new NullPointerException(message);
            }
            return object;
        }
    

    而源码也验证了我们的想法。关键是后面一句,先看一下具体的方法实现。

    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
            Function<? super Observable, ? extends Observable> f = onObservableAssembly;
            if (f != null) {
                return apply(f, source);
            }
            return source;
        }
    

    一句一句来分析:

    public interface Function<T, R> {
        /**
         * Apply some calculation to the input value and return some other value.
         * @param t the input value
         * @return the output value
         * @throws Exception on error
         */
        @NonNull
        R apply(@NonNull T t) throws Exception;
    }
    

    这里的Function也是一个接口,作用也很明显,将T类型的数据转化成R类型数据。那是我们在使用到

            observable.map(new Function<String, Object>() {
                @Override
                public Object apply(@NonNull String s) throws Exception {
                    return null;
                }
            })
    

    类似这种类型转换的语句时候才会用到,这里我们先不管它,一开始是默认为null的,所以这个方法最后就会return source;就是将括号中的new ObservableCreate<T>(source)原样返回。这个ObservableCreate又是什么呢?

    public ObservableCreate(ObservableOnSubscribe<T> source) {
            this.source = source;
        }
    
    

    源码比较长我们就之看一下它的构造函数就可以了,目前只需要知道这是一个Observerble的子类就可以了,至于Observerble这个类,等到大概摸清楚了事件流程再回头来分析。所以到现在我们的第一步初始化就算是分析完了流程。

    二、初始化一个Observer

    用法示例:

    Observer<String> observer=new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {
                    subscription=d;
                }
    
                @Override
                public void onNext(String value) {
                    LogUtil.log(TAG,"  "+value);
                }
    
                @Override
                public void onError(Throwable e) {
                }
    
                @Override
                public void onComplete() {
                    LogUtil.log(TAG,"complete");
                }
            };
    

    这个分析就要简单很多了,Observer只是一个简单的接口,这里也只是具体实现了一下接口回调。

    public interface Observer<T> {
    
        /**
         * Provides the Observer with the means of cancelling (disposing) the
         * connection (channel) with the Observable in both
         * synchronous (from within {@link #onNext(Object)}) and asynchronous manner.
         * @param d the Disposable instance whose {@link Disposable#dispose()} can
         * be called anytime to cancel the connection
         * @since 2.0
         */
        void onSubscribe(Disposable d);
    
        /**
         * Provides the Observer with a new item to observe.
         * <p>
         * The {@link Observable} may call this method 0 or more times.
         * <p>
         * The {@code Observable} will not call this method again after it calls either {@link #onComplete} or
         * {@link #onError}.
         *
         * @param t
         *          the item emitted by the Observable
         */
        void onNext(T t);
    
        /**
         * Notifies the Observer that the {@link Observable} has experienced an error condition.
         * <p>
         * If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or
         * {@link #onComplete}.
         *
         * @param e
         *          the exception encountered by the Observable
         */
        void onError(Throwable e);
    
        /**
         * Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
         * <p>
         * The {@link Observable} will not call this method if it calls {@link #onError}.
         */
        void onComplete();
    
    }
    

    不过这里和Rxjava1也是有些区别的,多了一个onSubscribe 注释也说的很清楚,用于随时取消订阅。
    第二步很轻松,下面看一下第三步

    三、建立订阅关系

    用法示例:

    observable.subscribe(observer);
    

    这里我们就只分析最简单的一种,看一下源码:

    @SchedulerSupport(SchedulerSupport.NONE)
        @Override
        public final void subscribe(Observer<? super T> observer) {
            ObjectHelper.requireNonNull(observer, "observer is null");
            try {
                observer = RxJavaPlugins.onSubscribe(this, observer);
    
                ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
    
                subscribeActual(observer);
            } catch (NullPointerException e) { // NOPMD
                throw e;
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                // can't call onError because no way to know if a Disposable has been set or not
                // can't call onSubscribe because the call might have set a Subscription already
                RxJavaPlugins.onError(e);
    
                NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
                npe.initCause(e);
                throw npe;
            }
        }
    

    第一句还是一样,判断是否为空,平时自己写代码也要像这样注意代码的健壮性。
    重点就是这三句了。

     observer = RxJavaPlugins.onSubscribe(this, observer);
    
     ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
    
     subscribeActual(observer);
    

    一句一句来看:

     public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
            BiFunction<? super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe;
            if (f != null) {
                return apply(f, source, observer);
            }
            return observer;
        }
    

    在这个onsubscribe中是不是觉得有些眼熟?就跟刚刚的onAssenmbly几乎一样,由于我们没有其它的功能,所以这里onObservableSubscribe也是null,也是返回原值,下面的requireNonNull我们也见过了,又验证一遍是否为空,因为如果我们加入了Function函数,上面就不会返回原来的observer了,所以还要再验证一遍。
    于是就到了最后一句

    protected abstract void subscribeActual(Observer<? super T> observer);
    

    ???
    怎么是个abstract方法?那么它是在哪实现的呢?
    回想看我们的observable初始化过程。哪里出的问题呢?就是我们一开始没有分析的ObservableCreate,我们在初始化的时候就将一个ObservableCreate类向上转型赋值给了Observerble,所以方法的具体实现也就在ObservableCreate里了。
    继续跟进。果不其然:

    @Override
        protected void subscribeActual(Observer<? super T> observer) {
            CreateEmitter<T> parent = new CreateEmitter<T>(observer);
            observer.onSubscribe(parent);
    
            try {
                source.subscribe(parent);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
        }
    

    还是一句一句来看

    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    

    又是一个新的类

        static final class CreateEmitter<T>
        extends AtomicReference<Disposable>
        implements ObservableEmitter<T>, Disposable {
    
    
            private static final long serialVersionUID = -3434801548987643227L;
    
            final Observer<? super T> observer;
    
            CreateEmitter(Observer<? super T> observer) {
                this.observer = observer;
            }
    
            @Override
            public void onNext(T t) {
                if (t == null) {
                    onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                    return;
                }
                if (!isDisposed()) {
                    observer.onNext(t);
                }
            }
    
            @Override
            public void onError(Throwable t) {
                if (t == null) {
                    t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
                }
                if (!isDisposed()) {
                    try {
                        observer.onError(t);
                    } finally {
                        dispose();
                    }
                } else {
                    RxJavaPlugins.onError(t);
                }
            }
    
            @Override
            public void onComplete() {
                if (!isDisposed()) {
                    try {
                        observer.onComplete();
                    } finally {
                        dispose();
                    }
                }
            }
    
            @Override
            public void setDisposable(Disposable d) {
                DisposableHelper.set(this, d);
            }
    
            @Override
            public void setCancellable(Cancellable c) {
                setDisposable(new CancellableDisposable(c));
            }
    
            @Override
            public ObservableEmitter<T> serialize() {
                return new SerializedEmitter<T>(this);
            }
    
            @Override
            public void dispose() {
                DisposableHelper.dispose(this);
            }
    
            @Override
            public boolean isDisposed() {
                return DisposableHelper.isDisposed(get());
            }
        }
    

    所以我们的前两句

    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);
    

    所以我们的前两局就是回调了onSubscribe接口,从而将这个CreateEmitter类型转型成Disposable输出了。而CreateEmitter的初始化参数又是observer本身,所以大体上可以看成回调了另一个格式的自己。。。然后一般可用于自杀(取消订阅)。。。
    然后就来到的最后一句

    source.subscribe(parent);
    

    这里的source就是

    new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> e) throws Exception {
                    e.onNext("test");
                }
            }
    

    刚刚我们初始化observable传入的。这个parent->这里的参数e。于是就这样完成了Observerble和Observer的绑定,也就能实现接口回调了。

    没有任何其它功能,只是走了一边最基本流程的Rxjava源码,后面还会继续更新的。

    相关文章

      网友评论

        本文标题:Rxjava2源码浅析(一)

        本文链接:https://www.haomeiwen.com/subject/srhemttx.html