RxJava 2.x 源码分析(一)

作者: zYoung_Tang | 来源:发表于2018-05-11 09:59 被阅读42次

    主要介绍 Rxjava 是如何利用观察者模式实现通信

    依赖

    implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
    

    核心类和接口

    Observable (被观察者)

    被观察者必须继承的抽象类

    // T: `被观察者`发射的 item 的类型
    public abstract class Observable<T> implements ObservableSource<T>
    
    Observer (观察者)

    观察者必须实现的接口

    public interface Observer<T>
    
    Emitter (发射器)

    用于被观察者发射信息给观察者

    // T: 发射的 item 类型
    public interface ObservableEmitter<T> extends Emitter<T>
    

    例子使用到的的相关类

    ObservableCreate

    被观察者的子类,Observable.create() 方法返回 Observable 就是该类的实例

    public final class ObservableCreate<T> extends Observable<T>
    
    CreateEmitter(发射器)

    ObservableCreate 的内部类,实现了ObservableEmitter

    static final class CreateEmitter<T>
        extends AtomicReference<Disposable>
        implements ObservableEmitter<T>, Disposable
    
    Disposable

    一个提供断开操作的接口,所有需要做断开操作的类都要实现该接口

    /**
     * Represents a disposable resource.
     */
    public interface Disposable {
        /**
         * Dispose the resource, the operation should be idempotent.
         */
        void dispose();
    
        /**
         * Returns true if this resource has been disposed.
         * @return true if this resource has been disposed
         */
        boolean isDisposed();
    }
    
    ObservableOnSubscribe

    一个简单的功能接口,只负责给被观察者提供订阅方法和传一个发射器实例给被观察者

    public interface ObservableOnSubscribe<T> {
    
        /**
         * Called for each Observer that subscribes.
         * @param e the safe emitter instance, never null
         * @throws Exception on error
         */
        void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
    }
    

    从一个简单的例子查看源码:

    private void RxJava() {
        // 第一步:初始化'被观察者' observable
        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
    
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                // 传一个事件给观察者观察
                e.onNext(1);
                e.onNext(2);
                // 调用 onComplete 方法后调用接收不到后面的事件
                e.onComplete();
                // 向观察者传一个异常事件,并且观察者接收不到后面的事件
                //e.onError(new NullPointerException("空指针"));
                e.onNext(3);
                e.onNext(4);
            }
        });
    
        // 第二步:初始化'观察者' observer
        Observer<Integer> observer = new Observer<Integer>() {
            private int i;
            private Disposable mDisposable;
    
            /**
             * 该方法是订阅操作中第一个调用的回调方法
             * 提供了一个可主动切断的对象
             */
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                mDisposable = d;
            }
    
            /**
             * `被观察者`调用该方法并传给`观察者`一个 T 类型的 item
             */
            @Override
            public void onNext(@NonNull Integer integer) {
                Log.e(TAG, "Observer 接收到: " + integer);
                i++;
                if (i == 2) {
                    // Disposable 用来切断观察者与被观察者之间联系,被观察者之后的事件不再传给观察者
                    mDisposable.dispose();
                }
            }
    
            @Override
            public void onError(@NonNull Throwable e) {
                Log.e(TAG, "Observer onError: " + e.getMessage());
            }
    
            @Override
            public void onComplete() {
                Log.e(TAG, "Observer onComplete");
            }
        };
    
        // 第三步:订阅
        observable.subscribe(observer);
    }
    

    第一步中create()中进入源码:

    // Observable.java
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        // 判断传入的 ObservableOnSubscribe 对象是否为 null
        ObjectHelper.requireNonNull(source, "source is null");
        // 通过 RxJavaPlugins 组装并返回 ObservableCreate 的实例
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
    

    Observable 是一个抽象类,当我们使用 Observable.create 方法创建被观察者 observable 的时候实际上是创建一个 ObservableCreate 对象。

    订阅方法 subscribe 中实际进行订阅操作的是一个抽象方法 subscribeActual:

    // Observable.java
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);
            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
            // 实际进行订阅的操作
            subscribeActual(observer);
        } catch (NullPointerException e) {
            throw e;
        } catch (Throwable e) {
             ...
        }
    }
    

    下面是 ObservableCreate 的实现:

    // ObservableCreate.java
    protected void subscribeActual(Observer<? super T> observer) {
        // 创建发射器
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        // 调用'观察者'的 onSubscribe 方法
        observer.onSubscribe(parent);
        try {
            // source : 被观察者
            // parent : 发射器
            // 这里回调我们覆写的 Observable 的 subscribe 方法,开始发射事件操作
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    

    subscribeActual订阅操作方法调用顺序:

    观察者onSubscribe方法 -> 被观察者subscribe方法 -> 被观察者发射操作onNext() -> 观察者onNext()

    从源码中得知,我们需要覆写的subscribe方法中得到的 e 实际上就是CreateEmitter,当我们调用e.onNext(1)的时候实际上就是调用 CreateEmitteronNext

    // ObservableCreate.CreateEmitter.java
    @Override
    public void onNext(T t) {
        if (t == null) {
            onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
            return;
        }
        // 判断被观察者是否已断开了
        if (!isDisposed()) {
            // 回调我们覆写的观察者的 onNext 方法
            observer.onNext(t);
        }
    }
    

    CreateEmitteronNext()中我们可以知道只有当被观察者没有断开的时候才会回调观察者onNext()

    // ObservableCreate.CreateEmitter
    @Override
    public void onComplete() {
        if (!isDisposed()) { //检测是否已断开
            try {
                // 把Complete事件发射给观察者
                observer.onComplete();
            } finally {
                // 断开操作
                dispose();
            }
        }
    }
            
    @Override
    public void onError(Throwable t) {
        if (!tryOnError(t)) {
            RxJavaPlugins.onError(t);
        }
    }
    
    @Override
    public boolean tryOnError(Throwable t) {
        if (t == null) {
            t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
        }
        if (!isDisposed()) {
            try {
                observer.onError(t);
            } finally {
                // onError() 方法最终也会调用该方法
                dispose();
            }
            return true;
        }
        return false;
    }
    
    @Override
    public void dispose() {
        // 断开操作
        DisposableHelper.dispose(this);
    }
    
    

    很多人都知道 onError() 和 onComplete() 不能并存,从上面源码可以看到两个方法最终都会执行 dispose() ,所以无论执行哪个方法最终都会切断被观察者观察者之间的联系,之后调用的方法都会失效。且由DisposableHelper这个枚举类实现具体操作

    public enum DisposableHelper implements Disposable {
        /**
         * The singleton instance representing a terminal, disposed state, don't leak it.
         */
        // 用来标记断开状态
        DISPOSED
        ;
    
        /**
         * Atomically disposes the Disposable in the field if not already disposed.
         * @param field the target field
         * @return true if the current thread managed to dispose the Disposable
         */
        public static boolean dispose(AtomicReference<Disposable> field) {
            /**
            * 还记得 CreateEmitter 继承了 AtomicReference 吗
            * 下面的操作属于原子性操作:
            */
            // field: CreateEmitter
            // 首次调用时获取的 current 为 null
            Disposable current = field.get();
            Disposable d = DISPOSED;
            if (current != d) { 
                // getAndSet 操作是把 d 的值赋给 field,但是返回旧值(null)
                // 下一次执行该 dispose 方法时 field.get()的值就会变成 DISPOSED
                current = field.getAndSet(d);
                if (current != d) {
                    if (current != null) {
                        current.dispose();
                    }
                    return true;
                }
            }
            return false;
        }
        
        /**
         * Checks if the given Disposable is the common {@link #DISPOSED} enum value.
         * @param d the disposable to check
         * @return true if d is {@link #DISPOSED}
         */
        public static boolean isDisposed(Disposable d) {
            return d == DISPOSED;
        }
    }
    

    这里我们注意到断开操作的核心就是让发射器继承原子性引用类 AtomicReference 保存和改变状态,防止多线程并发出现异常


    总结

    • 被观察者subscribe()方法就像是一个触发器(Trigger),调用后被观察者先前准备好的一系列事件按顺序通过发射器传给观察者

    • 如果调用了 onComplete()``onError()或者观察者主动调用dispose()方法,被观察者观察者之间的联系将被切断,观察者不再接受到被观察者后续发射的事件

    参考文章

    相关文章

      网友评论

        本文标题:RxJava 2.x 源码分析(一)

        本文链接:https://www.haomeiwen.com/subject/uqdzrftx.html