RxJava源码分析

作者: 导火索 | 来源:发表于2018-05-26 09:02 被阅读27次

RxJava源码分析基于RxJava1.3.8。

题外话:
在写这边博客之前曾犹豫过很久,因为RxJava的源码非常复杂,感觉不是在一两篇博客中就能讲解清楚。但是正式因为RxJava的源码非常复杂,我也想通过写博客的方式来加深自己对RxJava源码的认识,同时也算是做笔记了,能时不时的翻出来看看,以防止忘记。

概述

虽然RxJava2也发布了很长时间了,但是对于分析RxJava的工作原理,RxJava1的源码相对于RxJava2相对而言会容易很多(原因想必大家都很清楚,其实是因为RxJava2我也没有怎么用过,哈哈),所以对RxJava的源码分析依然选择使用RxJava1。

在本次RxJava的源码分析中,只针对开发中常用的也是必须掌握的操作符做具体分析,主要包括三方面:

  • 基本使用
  • 变换的原理
  • 线程切换的原理

至于其他的一些不常用的操作符(RxJava中有大量的操作)就不过多分析了。

这里非常推荐扔物线的博客:给 Android 开发者的 RxJava 详解

基本使用

Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        subscriber.onNext("Hello");
        subscriber.onNext("RxJava!");
        subscriber.onCompleted();
    }
}).subscribe(new Subscriber<String>() {

    @Override
    public void onNext(String s) {
        System.out.println(s);
    }
    @Override
    public void onCompleted() {
        System.out.println("onCompleted");
    }

    @Override
    public void onError(Throwable e) {

    }
});

<code></code>

上述代码就是RxJava中的最基本使用,这个基本使用主要包括:

  1. 通过<code>Observable.create()</code>创建一个Observable(被观察者)对象。
  2. 创建一个Observer(观察者)对象,也就是Subscriber对象。
  3. 通过<code>Observable.subscribe()</code>实现Observable-Observer的订阅。
  4. 在<code>Observable.subscribe()</code>方法内部产生一个事件,通过<code>OnSubscribe.call</code>产生事件。
  5. 最终通过<code>Observer</code>对象处理事件。

基本订阅

源码分析

针对上文RxJava中的基本使用,这里会针对这个流程做源码分析。

subscribe方法

首先,在RxJava的调用流程中,<code>Observable.subscribe()</code>方法实现<code>Observable</code>和<code>Observer</code>的订阅,同时该方法也是整个RxJava链式调用的起点,所以首先看看<code>Observable.subscribe()</code>的内部逻辑。

public final Subscription subscribe(Subscriber<? super T> subscriber) {
    return Observable.subscribe(subscriber, this);
}

在上面方法中,仅仅只是调用了<code>subscribe(Subscriber<? super T> subscriber, Observable<T> observable)</code>重载方法,对这个重载方法其实不仅在该方法中调用了该重载方法,subscribe()有很多的重载方法,最终都是通过调用<code>subscribe(Subscriber<? super T> subscriber, Observable<T> observable)</code>该方法完成整个订阅流程的。

接下来就看下这个最终被调用的重载方法,如下:

static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
    // 省略部分代码
    
    // 1、初始化方法,内部没有任何实现
    subscriber.onStart();

    // 2、将参数subscriber包装成SafeSubscriber
    //    SafeSubscriber主要保证subscriber完成调用后能被及时取消订阅,并且在onComplete()调用后无法在被调用
    if (!(subscriber instanceof SafeSubscriber)) {
        // assign to `observer` so we return the protected version
        subscriber = new SafeSubscriber<T>(subscriber);
    }

    try {
        
        /**
         * 核心流程:
         * 
         *   1、首先, RxJavaHooks.onObservableStart()方法获取OnSubscribe对象
         *   2、然后,根据获取的OnSubscribe,调用该对象的call()方法,这里可以看到在call()方法中传入的subscriber对象就是在subscribe()方法传入的subscriber对象
         * 
         *      上面获取的OnSubscribe其实就是在创建Observable时通过create()方法创建的OnSubscribe对象
         */
        RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
        return RxJavaHooks.onObservableReturn(subscriber);

    } catch (Throwable e) {
        // 省略部分代码
    }
}

在subscribe()方法中,基本就完成了Observable对象和Subscriber的订阅,并通过<code>OnSubscribe.call(Subscriber)</code>开始整个链式调用。

在代码中,注释非常清晰,这里就不重复了。

这里需要注意的是通过RxJavaHooks获取OnSubscribe对象的流程,这里对<code>RxJavaHooks.onObservableStart()</code>的调用流程做了下梳理,如下:

public final class RxJavaHooks {
    
    // 1、定义一个Func2对象onObservableStart
    static volatile Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe> onObservableStart;


    // 2、静态代码块调用,实现初始化
    static {
        init();
    }

    // 3、初始化onObservableStart对象
    static void init() {
        // 省略部分代码

        onObservableStart = new Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe>() {
            @Override
            public Observable.OnSubscribe call(Observable t1, Observable.OnSubscribe t2) {
                // 在该代码中,默认返回方法参数的OnSubscribe对象
                return RxJavaPlugins.getInstance().getObservableExecutionHook().onSubscribeStart(t1, t2);
            }
        };

    }

    // 4、调用onObservableStart(),获取具体的OnSubscribe
    public static <T> Observable.OnSubscribe<T> onObservableStart(Observable<T> instance, Observable.OnSubscribe<T> onSubscribe) {
        Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableStart;
        // 变量onObservableStart不会为空,所以会执行if判断语句
        if (f != null) {
            // 该方法会返回参数onSubscribe,也就是Observable.OnSubscribe对象
            return f.call(instance, onSubscribe);
        }
        return onSubscribe;
    }

}

所以,通过上述流程发现最终获取的OnSubscribe对象,还是创建Observable对象时创建的那个OnSubscribe对象。

总结

这样关于RxJava的基本使用的源码分析就到这里了,这里面的核心流程也就是<code>Subscriber.subscribe()</code>方法,该方法是实现响应式编程的核心方法。

当然,在基本使用中对于<code>Observerable.OnSubscribe</code>类的作用并没有很突出,在下一章节中将主要讲解RxJava变换的原理。在变换的流程中就是要对<code>Subscriber.subscribe()</code>和<code>Observerable.OnSubscribe</code>两个方法的方法

相关文章

网友评论

    本文标题:RxJava源码分析

    本文链接:https://www.haomeiwen.com/subject/kdosjftx.html