美文网首页
RxJava2 源码总结

RxJava2 源码总结

作者: SScience | 来源:发表于2017-12-31 22:02 被阅读26次

    总感觉跟着源码走过程过段时间又会忘记,又得翻一遍源码,所以在第一次学习源码时,把领悟的关键点记录下来,以后回看只要稍微浏览下源码,就能迅速明白它的思想(也许以后还会有更深刻的理解😂)。

    本学习源码基于 RxJava 2.1.7
    源码地址:https://github.com/ReactiveX/RxJava

    一,事件产生和消费--create()

    首先,以最简单的使用方式来说:

    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            e.onNext(1);
            e.onComplete();
        }
    }).subscribe(new Observer<Integer>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.e(">>>>>>>>", "onSubscribe");
        }
        @Override
        public void onNext(Integer integer) {
            Log.e(">>>>>>>>", "onNext:" + integer);
        }
        @Override
        public void onError(Throwable e) {
             Log.e(">>>>>>>>", "onError:" + e.toString());
        }
        @Override
        public void onComplete() {
            Log.e(">>>>>>>>", "onComplete");
        }
    });
    
    // ObservableCreate#subscribeActual
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent); 
        source.subscribe(parent);
        ...
    }
    
    • Observable 是个抽象类,而 Observable.create() 方法返回的对象 ObservableCreate(继承于Observable),可以看作是事件统一管理者。
    • ObservableCreate 在创建的时候保存了用于事件实际生产者的 ObservableOnSubscribe ,在事件订阅 Observable#subscribe() 时,传入了事件消费者 Observer
    • 在事件订阅时,ObservableCreate#subscribeActual() 方法中创建中间者 CreateEmitterCreateEmitter 把事件消费者 Observer 保存起来,然后调用事件生产者 ObservableOnSubscribe#subscribe() 方法传入 CreateEmitterCreateEmitter 是方法 subscribe() 的第一个参数的 ObservableEmitter 子类)。
    • 然后 CreateEmitter 调用自己的 onNext() 方法生产事件,在 onNext()方法中把事件传给消费者 Observer#onNext()
    • 由于 CreateEmitter 又实现了 Disposable ,在 ObservableCreate#subscribeActual() 方法中调用了事件消费者 Observer#onSubscribe() 把自己传入消费者,所以在消费者的回调方法中,可以实现终止事件传给消费者 dispose()(因为 CreateEmitter#onNext() 等方法中会首先判断 isDisposed())。

    二,操作符--map()

    Observable.create(...) 
            .map(new Function<Integer, String>() {
                 @Override
                 public String apply(Integer integer) throws Exception {
                     return String.valueOf(integer);
                 }
             })
             .subscribe(···);
    
    // ObservableMap#subscribeActual
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }
    
    • map() 方法返回的是新的包装 ObservableObservableMap<T, U> (装饰着模式?),ObservableMap<T, U> 中保存了 create() 返回了上一节的 ObservableCreate
    • 然后调用 Observable#subscribe() 开始订阅事件,实际是首先调用 ObservableMap<T, U>#subscribeActual() 方法,该方法里再调用 create() 方法返回的 ObservableCreate#subscribeActual()
    • ObservableMap<T, U>#subscribeActual() 方法把传入的消费者 Observer 包装为 MapObserver ,再传给 ObservableCreate#subscribeActual() 开始生产事件。
    • 在新的 ObserverMapObserver 中,主要任务是把事件生产者生产的事件通过 Function 转化为对应的类型。

    三,线程调度--subscribeOn()

    Observable.create(...) 
            .subscribeOn(Schedulers.io())
            .subscribe(···);
    
    // ObservableSubscribeOn#subscribeActual
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        s.onSubscribe(parent);
        parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
            @Override
            public void run() { 
                source.subscribe(parent);
            }
        }));
    }
    
    • subscribeOn() 返回的又是新的包装 ObservableObservableSubscribeOn(和上一节很像),ObservableSubscribeOn自然又是保存了 create() 返回了的 ObservableCreate
    • 开始订阅事件时,实际是首先调用 ObservableSubscribeOn#subscribeActual() 方法,方法里又包装新的 ObserverSubscribeOnObserver
    • 通过 scheduler.scheduleDirect(runnable)subscribeOn() 指定的线程直接调用 run() 方法,方法里再调用 ObservableSubscribeOn 保存的最初的 ObservableObservableCreate#subscribeActual() 开始生产事件,然后返回一个 worker 调度管理者(实现 Disposable 接口)。
    • 新的包装 ObserverSubscribeOnObserver 把返回的 worker (子线程)加入 dispose() 管理。
    • 可以发现,当 subscribeOn() 多次调用时,最终只有 ObservableCreate 调用的 subscribeOn()(即第一个)起作用,因为每次 subscribeOn() 时新创建的包装 ObservableObservableSubscribeOn 都会保存上一个 Observable ,然后订阅时(在 run() 方法调用),会调用上一个 Observable.subscribe() ,一直调用到最开始的 ObservableObservableCreate 才开始切换线程并生产事件。

    三,线程调度--observeOn()

    Observable.create(...) 
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(···);
    
    // ObservableObserveOn#subscribeActual
    protected void subscribeActual(Observer<? super T> observer) {
        ...
        Scheduler.Worker w = scheduler.createWorker();
        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); 
    }
    
    • 也是和上两节类似,新的包装 ObservableObservableObserveOn
    • 创建一个 AndroidSchedulers.mainThread() 对应的 Worker 。
    • 创建新的包装 ObserverObserveOnObserver (又实现了Runnable)传给最开始的 ObservableCreate ,这样生产的事件会由 ObserveOnObserver
      对应的onXXX()处理。
    • onNext() 里,首先把生产的数据加入队列,然后切换会 observeOn() 指定的线程,最后才把数据取出来传给消费者。
    • 可以看到,每次切换线程后会立刻发送数据,所以调用 observeOn() 会生效多次,与 subscribeOn() 相反。
    新年快乐,2018

    本人水平有限,如有错误,欢迎批评指出😊

    相关文章

      网友评论

          本文标题:RxJava2 源码总结

          本文链接:https://www.haomeiwen.com/subject/yhpwgxtx.html