美文网首页
RxJava2源码(二)

RxJava2源码(二)

作者: 原件 | 来源:发表于2018-01-06 21:36 被阅读44次

阅读Observable的xxx操作符的步骤

  1. 找到Observable的子类ObservableXXX
    RxJavaPlugins.onAssembly(new ObservableXXX<T>());
  2. 查看ObservableXXX的subscribeActual(Observer<? super T> s)函数,一般做下面三件事
    1. 一般会创建一个Disposable接口的实现类d
    2. 调用s.onSubscribe(d);
    3. 具体subscribe实现代码
  3. 具体subscribe实现代码需要关注的几个点
    1. disposed的实现
    2. observer的onNext,OnComplete,OnError何时被调用

以Observable.just为例

  1. 找到ObservableFromArray

  2. 查看ObservableFromArray的subscribeActual函数,发现主要逻辑都在FromArrayDisposable的run方法里

    public final class ObservableFromArray<T> extends Observable<T> {
        final T[] array;
        public ObservableFromArray(T[] array) {
            this.array = array;
        }
        @Override
        public void subscribeActual(Observer<? super T> s) {
            FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);
    
            s.onSubscribe(d);
    
            if (d.fusionMode) {
                // 根据fusion mode值得来的,具体看QueueFuseable,默认为false,暂时先不管 
                return;
            }
    
            d.run();
        }
    
  3. 查看FromArrayDisposable的run

    static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> {
    
        final Observer<? super T> actual;
    
        final T[] array;
    
        int index;
    
        boolean fusionMode;
    
        volatile boolean disposed;
    
        FromArrayDisposable(Observer<? super T> actual, T[] array) {
            this.actual = actual;
            this.array = array;
        }
    
        @Override
        public void dispose() {
            // 用一个boolean变量来标记
            disposed = true;
        }
    
        @Override
        public boolean isDisposed() {
            return disposed;
        }
    
        void run() {
            T[] a = array;
            int n = a.length;
    
            for (int i = 0; i < n && !isDisposed(); i++) {
                // 如果没有Disposed则遍历array数组
                T value = a[i];
                if (value == null) {
                    // 如果value为null走error,跳出for循环
                    actual.onError(new NullPointerException("The " + i + "th element is null"));
                    return;
                }
                // 走onNext
                actual.onNext(value);
            }
            if (!isDisposed()) {
                // 如果没有Disposed,走onComplete
                actual.onComplete();
            }
        }
    }
    
  4. 所以总结下:Just操作符依次发送数组中的事件,并且碰到null就中断;并且除非手动dispose状态一直都不会变

PS

我的github:https://github.com/nppp1990/MyTips

相关文章

网友评论

      本文标题:RxJava2源码(二)

      本文链接:https://www.haomeiwen.com/subject/erdunxtx.html