Rxjava 之 create操作符 源码解析

作者: 103style | 来源:发表于2019-05-15 22:43 被阅读3次

    转载请以链接形式标明出处:
    本文出自:103style的博客

    本文基于 RxJava 2.x 版本

    create操作符例子:
    Observable
            .create(new ObservableOnSubscribe<Object>() {
                @Override
                public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
    
                }
            })
            .subscribe(new Observer<Object>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(Object o) {
    
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            });
    

    首先我们看create 方法:

    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
    

    RxJavaPlugins 类的 onAssembly 方法:

    static volatile Function<? super Observable, ? extends Observable> onObservableAssembly;
    
    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }
    

    在源码中查看引用可知 onObservableAssembly 只有在测试的时候才不为 null
    所以Observable.create(ObservableOnSubscribe<T> source)实际上就是返回了 ObservableCreate对象


    ObservableCreate 类,可以看到 ObservableCreateObservable 的子类,并实现了父类的 subscribeActual 方法。

    public final class ObservableCreate<T> extends Observable<T> {
        final ObservableOnSubscribe<T> source;
    
        public ObservableCreate(ObservableOnSubscribe<T> source) {
            this.source = source;
        }
    
        @Override
        protected void subscribeActual(Observer<? super T> observer) {...}
        ...
    }
    

    然后我们看subscribe方法: 实际上是调用了 Observable 的抽象方法 subscribeActual(observer);

    public final void subscribe(Observer<? super T> observer) {
        ...
        subscribeActual(observer);
        ...
    }
    
    protected abstract void subscribeActual(Observer<? super T> observer);
    

    又因为 create操作符返回的 ObservableCreateObservable 的子类,
    所以实际上调用的是ObservableCreatesubscribeActual(observer);


    ObservableCreatesubscribeActual(observer)方法:

    • 首先创建了 CreateEmitter对象,
    • 然后调用了 subscribe 方法传进来的 Observer 对象的 onSubscribe() 方法
    • 然后调用了create 操作符 传进来的 ObservableOnSubscribe 对象的 subscribe(ObservableEmitter<T> emitter)方法
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);
    
        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    

    因为 CreateEmitter 类实现了 ObservableEmitter<T>Disposable 接口,
    所以我们可以在 create 操作符 传进来的 ObservableOnSubscribe 对象的 subscribe(ObservableEmitter<T> emitter)方法里调用onNextonErroronComplete等方法。

    static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {
        ...
        final Observer<? super T> observer;
    
        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }
        ...
    }
    

    ObservableEmitter 接口:

    public interface ObservableEmitter<T> extends Emitter<T> {
        void setDisposable(@Nullable Disposable d);
        void setCancellable(@Nullable Cancellable c);
        boolean isDisposed();
        ObservableEmitter<T> serialize();
        boolean tryOnError(@NonNull Throwable t);
    }
    
    public interface Emitter<T> {
        void onNext(@NonNull T value);
        void onError(@NonNull Throwable error);
        void onComplete();
    }
    

    Disposable 接口:

    public interface Disposable {
        void dispose();
        boolean isDisposed();
    }
    

    因为CreateEmitter 又重写了onNextonErroronComplete等方法。
    所以 create 操作符 传进来的 ObservableOnSubscribe 对象的 subscribe(ObservableEmitter<T> emitter)方法里调用onNextonErroronComplete等方法实际上调用了 CreateEmitteronNextonErroronComplete等方法。


    CreateEmitteronNextonErroronComplete方法:

    • onNextonError传进来的值做了空判断。
    • 如果 !isDisposed() 则继续执行 observer 对象的 onNextonErroronComplete等方法。
      ( observer 对象为 create操作符 之后的 subscribe()方法传进来的 Observer<T> 对象)
    • 并在 onCompleteonError 方法最后执行 dispose() 方法。
        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("..."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }
    
        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }
    
        @Override
        public boolean tryOnError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("...");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
                return true;
            }
            return false;
        }
    
        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }
    

    接下来我们来看 CreateEmitterdispose()isDisposed()方法

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }
    
        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }
    

    继续看 get()方法,看下面代码可知 get() 返回的是一个 Disposable 对象

    static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {...}
    
    
    public class AtomicReference<V> implements Serializable {
        private volatile V value;
    
        public AtomicReference(V var1) {
            this.value = var1;
        }
    
        public AtomicReference() {
        }
    
        public final V get() {
            return this.value;
        }
    

    继续看 DisposableHelperisDisposed(Disposable d)dispose(AtomicReference<Disposable> field)方法

    • isDisposed(Disposable d) 则是判断 d 是否和枚举值 DISPOSED 相等。
    • dispose(AtomicReference<Disposable> field) 方法即是 将 CreateEmitterisDisposed() 中调用 get() 获取的对象赋值为 DisposableHelper 的枚举值 DISPOSED
      所以调用dispose(AtomicReference<Disposable> field)方法后,则理论上 isDisposed(Disposable d)即返回true
    public enum DisposableHelper implements Disposable {
        /**
         * The singleton instance representing a terminal, disposed state, don't leak it.
         */
        DISPOSED
        ;
    
        public static boolean isDisposed(Disposable d) {
            return d == DISPOSED;
        }
        ...
        public static boolean dispose(AtomicReference<Disposable> field) {
            Disposable current = field.get();
            Disposable d = DISPOSED;
            if (current != d) {
                current = field.getAndSet(d);
                if (current != d) {
                    if (current != null) {
                        current.dispose();
                    }
                    return true;
                }
            }
            return false;
        }
        ...
    }
    

    DisposableHelper 类中 dispose(AtomicReference<Disposable> field)
    field.getAndSet(d);之后,如果 Disposable对象的值还不等于 DISPOSED
    则会调用current.dispose();

    current 为 以下例子的 disposableObserver对象

    DisposableObserver disposableObserver;
    
    private void test() {
        disposableObserver = Observable
                .create(new ObservableOnSubscribe<Object>() {...})
                .subscribeWith(new DisposableObserver<Object>() {...});
    }
    

    create 操作符 返回的是 ObservableCreate,因为 ObservableCreate未重写 subscribeWith 方法,所以调用的是 ObservablesubscribeWith方法:

    public final <E extends Observer<? super T>> E subscribeWith(E observer) {
        subscribe(observer);
        return observer;
    }
    

    所以我们知到 disposableObserver 即为subscribeWith 传进来的 DisposableObserver 对象


    DisposableObserver类:

    public abstract class DisposableObserver<T> implements Observer<T>, Disposable {
    
        final AtomicReference<Disposable> upstream = new AtomicReference<Disposable>();
    
        @Override
        public final void dispose() {
            DisposableHelper.dispose(upstream);
        }
    }
    

    我们可以看到 dispose()方法继续调用了DisposableHelper.dispose(AtomicReference<Disposable> field);

    所以我们是否可以得出结论,
    dispose(AtomicReference<Disposable> field)在设置值为DISPOSED 失败之后会一直重复调用直到设置成功为止?

    参考文章

    相关文章

      网友评论

        本文标题:Rxjava 之 create操作符 源码解析

        本文链接:https://www.haomeiwen.com/subject/htnmaqtx.html