RxJava源码解析(一)从一个例子开始

作者: 非墨Zero | 来源:发表于2017-09-15 15:07 被阅读227次

注:本篇文章代码基于Rxjava1.x
RxJava是目前非常流行的一个响应是编程框架,它用了Java的语法特性来模拟出一套流式过程化的写法,并可以通过线程调度器,非常方便的实现线程切换。本系列文章假设读者已经是使用过Rxjava或者RxAndroid的开发者,如果你还未使用过,不妨看下下面的几篇文章:
1.谜之RxJava
2.给 Android 开发者的 RxJava 详解
3.深入浅出RxJava

本篇将使用一个非常简单的例子做引线,引出在Rxjava中一些核心类和核心对象,如果你尚未使用过Rxjava,请在阅读过上面几篇文章后,编写过一些Rxjava相关代码后再阅读本文章。
我们来看下我们的例子:

Observable.just("str1", "str2", "str3", "str4")
        .map(new Func1<String, String>() {
            @Override
            public String call(String t) {
                // TODO Auto-generated method stub
                return "[" + t + "]";
            }
        }).subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted ");
            }
            @Override
            public void onNext(String t) {
                System.out.println("onNext " + t);
            }

            @Override
            public void onError(Throwable e) {}
        });

上面的例子非常简单我们换成我们的自然语言可以分成以下步骤:
1.构建一个String[]数组的Observable
2.通过映射方法map,将返回值映射成为"["+t+"]"格式
3.被一个订阅者所订阅
最后我们将在控制台输出:

output:
onNext [str1]
onNext [str2]
onNext [str3]
onNext [str4]
onCompleted

虽然是短短几行代码,简单几个类,但是已经包含了大部分RxJava中的核心元素,本章就以这个简单的例子为引子,引出RxJava的基本体系结构和一些核心功能类。我们先来看下

Observable.just("str1", "str2", "str3", "str4")

这个方法的输入是一堆数组,输出是一个Observable对象。实际上它就是一个静态工厂,我们看下它的源码:

public static <T> Observable<T> just(T t1, T t2, T t3, T t4) {
        return from((T[])new Object[] { t1, t2, t3, t4 });
}// call from

public static <T> Observable<T> from(T[] array) {
        int n = array.length;
        if (n == 0) {
            return empty();
        } else
        if (n == 1) {
            return just(array[0]);//选择构造方法
        }
        return unsafeCreate(new OnSubscribeFromArray<T>(array));//使用OnSubscribeFromArray为参数构造
    }

just代码中调用了from方法来执行构造,而在from方法中,会先进行数组空判断和长度判断,目的是为了选择不同的构造方法。最后我们所传入的数组对象,将被包装成为一个OnSubscribeFromArray对象。而这个对象作为一个参数被unsafeCreate方法所调用。unsafeCreate是构造Observable最核心的方法,不论哪种方式的构造器最终都会调用到这个方法。我们现在看下这个方法的声明:

public static <T> Observable<T> unsafeCreate(OnSubscribe<T> f) {
        return new Observable<T>(RxJavaHooks.onCreate(f));
    }

首先,我们传入的OnSubscribe对象将被RxJavaHooks的onCreate给hook住,转化为一个OnSubscribe对象。这里,如果你对aop不陌生的话,相信这块很好理解,实际上相当于你在构造Observable的时候做了一层拦截,或者说一次hook。我们不妨深入一点,看下RxJavaHooks里面究竟到底做了什么转换:

//code RxJavaHooks.java
public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) {
        Func1<Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableCreate;
        if (f != null) {
            return f.call(onSubscribe);
        }
        return onSubscribe;
    }

RxJavaHooks.onCreate方法用到了一个函数对象onObservableCreate。这里所定义的函数对象很类似我们在动态语言中定义的闭包对象。我们看下onObservableCreate对象是怎么被赋值的:

public class RxJavaHooks {
  static {
        init();
    }
  static void init() { 
      ...
      initCreate();
  }
static void initCreate() {
        ...
        onObservableCreate = new Func1<Observable.OnSubscribe, Observable.OnSubscribe>() {
            @Override
            public Observable.OnSubscribe call(Observable.OnSubscribe f) {
                return RxJavaPlugins.getInstance().getObservableExecutionHook().onCreate(f);
            }
        };
     ...
}

RxJavaHooks类在类初始化的时候通过调用init->initCreate方法给onObservableCreate函数对象赋值。而赋值函数会调用

RxJavaPlugins.getInstance().getObservableExecutionHook().onCreate(f);

也就是说RxJavaHooks只是提供一个简单的接口和初始化操作。实际调用者在RxJavaPlugins中。我们看下RxJavaPlugins.getObservableExecutionHook函数:

public RxJavaObservableExecutionHook getObservableExecutionHook() {
        if (observableExecutionHook.get() == null) {
            // check for an implementation from System.getProperty first
            Object impl = getPluginImplementationViaProperty(RxJavaObservableExecutionHook.class, System.getProperties());//通过系统配置获取一个RxJavaObservableExecutionHook对象。
            if (impl == null) {
                // nothing set via properties so initialize with default
                observableExecutionHook.compareAndSet(null, RxJavaObservableExecutionHookDefault.getInstance());
//如果没有配置对象则使用默认对象
                // we don't return from here but call get() again in case of thread-race so the winner will always get returned
            } else {
                // we received an implementation from the system property so use it
                observableExecutionHook.compareAndSet(null, (RxJavaObservableExecutionHook) impl);
            }
        }
        return observableExecutionHook.get();
    }

实际上, getObservableExecutionHook()方法得到的对象也是一个单利,但是是非线程安全的,这段代码主要做以下事情:

1.如果RxJavaObservableExecutionHook对象不存在,会先通过调用getPluginImplementationViaProperty方法,也就是通过查看系统配置参数查看是否有实现类,如果有,将生成一个具体的RxJavaObservableExecutionHook实例返回
2.如果通过步骤1无法生成一个RxJavaObservableExecutionHook对象,将返回一个默认的RxJavaObservableExecutionHookDefault. getInstance()对象
3.最后将通过1,2获取的对象记录在全局变量中

getObservableExecutionHook函数流程图

这里引出一个问题,就是我们如何注入一个hook函数呢?这就需要深入到getPluginImplementationViaProperty的具体实现中去:

{//code getPluginImplementationViaProperty()
final String classSimpleName = pluginClass.getSimpleName();
        String pluginPrefix = "rxjava.plugin.";
        String defaultKey = pluginPrefix + classSimpleName + ".implementation";
...
}

首先,getPluginImplementationViaProperty会先定义一个key,这个key的基本结构为:rxjava.plugin.[classSimpleName].implementation。而这里的classSimpleName依赖于我们传入的pluginClass对象。我们回到刚才的调用链:

 Object impl = getPluginImplementationViaProperty(RxJavaObservableExecutionHook.class, System.getProperties());
            

在调用getPluginImplementationViaProperty函数的时候,我们传入的是一个RxJavaObservableExecutionHook类型,因此这里的classSimpleName 值对应的应该是"RxJavaObservableExecutionHook",所以我们就得到了配置的key为:

"rxjava.plugin.RxJavaObservableExecutionHook.implementation"

之后,getPluginImplementationViaProperty函数会通过这个key,从System.property中寻找具体的实现类,然后通过反射构建出具体的实现对象。

//code getPluginImplementationViaProperty()
{
...
String implementingClass = props.getProperty(defaultKey);
 try {
                Class<?> cls = Class.forName(implementingClass);
                // narrow the scope (cast) to the type we're expecting
                cls = cls.asSubclass(pluginClass);
                return cls.newInstance();
            }
...
}

我们不妨来试一下这种写法,还是基于上面的简单例子,我们在代码前增加一段话:

{
System.setProperty("rxjava.plugin.RxJavaObservableExecutionHook.implementation", 
                "demos.rx.RxJavaObservableExecutionHookImpl");
//配置hook实现类
Observable.just("str1", "str2", "str3", "str4")...
}

RxJavaObservableExecutionHookImpl是我们实现的一个RxJavaObservableExecutionHook类型:

public class RxJavaObservableExecutionHookImpl extends  RxJavaObservableExecutionHook{
    
    @Override
    public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f) {
        System.out.println("perform intercept onCreate");
        return super.onCreate(f);
    }
}

我们执行以下输出:

output:
perform intercept onCreate //被我们的hook函数拦截
perform intercept onCreate //被我们的hook函数拦截
onNext [str1]
onNext [str2]
onNext [str3]
onNext [str4]
onCompleted
我们可以从输出日志看出,我们所配置的hook类,确实被构造,并且成功实现了hook操作。根据上面所述,如果我们不采用配置Hook类的方式,RxJava将调用一个默认的实现类:RxJavaObservableExecutionHookDefault.getInstance()。而这个类的主要操作实际上就是直接返回,不进行任何的拦截:

//code RxJavaObservableExecutionHookDefault
public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f) {
        return f;//直接返回,不进行任何拦截和转换
    }

好的,我们花了很大的篇幅就是讲了RxJavaHooks的onCreate函数,我们在没有配置任何的hook函数的情况下,返回值就是我们所传入的OnSubscribe对象。那么什么是OnSubscribe对象呢?我们先来看下OnSubscribe这个类吧:

public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
        // cover for generics insanity
    }
public interface Action1<T> extends Action {
    void call(T t);
}

OnSubscribe继承于Action1,OnSubscribe限定了在Action1声明中的泛型变量T,是一个Subscriber类型,而T变量声明应用在Action1的call函数中,所以,OnSubscribe实际上是限定了OnSubscribe中的call方法的参数类型是一个Subscriber类型。但这并没有解释OnSubscribe是个什么东西,我们来看下OnSubscribe的继承树:

OnSubscribe的继承树

在OnSubscribe类型的顶端是一个Function。Function就是一个函数或者说一个过程,那么OnSubscribe是一个什么样的过程呢?OnSubscribe是一个当订阅者订阅的时候,执行的一个过程。正如OnSubscribe这个类名所描述的那样,这个过程的触发在Subscribe的时候。这实际上是一种策略的模式,根据不同的需求构建不同的过程策略,比如我们回到上面说的例子中,当我们传入一个数组对象的时候:

public static <T> Observable<T> from(T[] array) {
        ....
        return unsafeCreate(new OnSubscribeFromArray<T>(array));
    }

RxJava将采用一个叫做OnSubscribeFromArray的策略对象传递给unsafeCreate函数。为了继续说明这点我们不妨在来看下map函数:

public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
        return unsafeCreate(new OnSubscribeMap<T, R>(this, func));
    }

正如我上面所说的一样,在map函数中,基于我们上次构造的Observable对象又生成了一个新的Observable对象,而新生成的对象,将采用OnSubscribeMap策略来处理订阅事件。这种包装的写法实际上是一种职责链模式。回顾一下我们上面简单例子的那个流程:
1.通过just生成一个数组Observable对象-Observable1
2.通过map完成映射,在Observable1之上包装,生成一个新的Observable对象Observable2
3.通过subscribe函数订阅Observable2对象

引用关系图

通过上面的"引用关系图"我们可以很清楚的看到Observable类型的整条职责链,那么当我们调用Observable.subscribe的时候发生了什么呢?

public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
    }

这个方法中调用了一个静态方法subscribe(subscriber, this)来生成这种订阅关系。

static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
        ....pre check
        subscriber.onStart();
        ....
        try {
            // allow the hook to intercept and/or decorate
            RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
            return RxJavaHooks.onObservableReturn(subscriber);
        } catch (Throwable e) {
              ....
            }
            return Subscriptions.unsubscribed();
        }
    }

这个函数中,会:
1.进行pre check检查参数是否合法
2.回调subscriber.onStart方法,告诉订阅者我这边已经准备开始了
3.之后就是我们的老朋友RxJavaHooks对象执行onObservableStart,用来在onSubscribe函数执行前做一次hook。(如何hook根据我们上面的方法可以实现,不再赘述)
4.通过调用onSubscribe对象的call方法执行函数操作
5.通过RxJavaHooks的onObservableReturn去hook订阅操作执行结束以后的返回值

根据我们上面的"引用关系图",我们可以知道订阅者发生订阅的时候,最初执行的onSubscribe对象是OnSubscribeMap类型,我们来看下这个类型的实现:

//code OnSubscribeMap.java

public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
        this.source = source;
        this.transformer = transformer;
    }
@Override
    public void call(final Subscriber<? super R> o) {
        MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
        o.add(parent);
        source.unsafeSubscribe(parent);
    }

OnSubscribeMap在构造的时候需要传递两个参数
1.输入源Observable对象source
2.映射函数:transformer

当调用call方法的时候OnSubscribeMap会生成一个新的订阅对象MapSubscriber,然后注册到source对象(对应例子中的Observable1)的订阅者中。unsafeSubscribe执行代码:

public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
        try {
            subscriber.onStart();
            RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
            return RxJavaHooks.onObservableReturn(subscriber);
}

这时候unsafeSubscribe中的订阅Observable的onSubscribe函数对象就是"引用关系图"中的Observable1.OnSubscribeFromArray对象。一样,我们看下OnSubscribeFromArray的call方法:

 @Override
    public void call(Subscriber<? super T> child) {
        child.setProducer(new FromArrayProducer<T>(child, array));
    }

这里我们引入了一个新的类Producer,而OnSubscribeFromArray中所引用的实现类是FromArrayProducer。我们根据上面的调用链可以知道此时,传入OnSubscribeFromArray.call方法中的参数child对象,对应着已经被OnSubscribeMap装饰过的MapSubscriber对象。而在OnSubscribeFromArray.call方法中调用了Subscriber的setProducer方法,我们看下这个方法是干什么的:

public void setProducer(Producer p) {
        long toRequest;
        boolean passToSubscriber = false;
        synchronized (this) {
            toRequest = requested;
            producer = p;
            if (subscriber != null) {
                if (toRequest == NOT_SET) {
                    passToSubscriber = true;
                }
            }
        }
        // do after releasing lock
        if (passToSubscriber) {
            subscriber.setProducer(producer);
        } else {
            if (toRequest == NOT_SET) {
                producer.request(Long.MAX_VALUE);
            } else {
                producer.request(toRequest);
            }
        }
    }

Producer顾名思义,就是对一个生产者的一个抽象,而生产什么东西呢?生产的是数据,Producer.request(int n)函数中的n参数代表让生产者生产多少的数据对象。为什么需要这个方法呢?toRequest变量又是从何而来呢?toRequest由Subscriber的成员变量requested,而requested通过Subscriber的request函数进行赋值:

protected final void request(long n) {
        if (n < 0) {
            throw new IllegalArgumentException("number requested cannot be negative: " + n);
        }
        Producer producerToRequestFrom;
        synchronized (this) {
            if (producer != null) {
                producerToRequestFrom = producer;
            } else {
                addToRequested(n);//如果没有producer需要计数
                return;
            }
        }
        // after releasing lock (we should not make requests holding a lock)
        producerToRequestFrom.request(n);
    }

代码写的很清楚,当你的这个订阅者对象Subscriber并没有对应的producer的时候,每一次请求数据的操作都会被记录到你的requested变量中,这样,当你进行设置了producer的时候,就可以知道自己请求了多少次,需要多少个数据对象。那么我们回到Subscriber的setProducer方法中去,当代码执行到最后,Subscriber会调用Producer的request方法来请求数据,而这里所对应的Producer对象,就是在OnSubscribeFromArray.call方法中传递进来的FromArrayProducer类型对象。

public void call(Subscriber<? super T> child) {
        child.setProducer(new FromArrayProducer<T>(child, array));
    }

根据我们最终输出的日志,我们可以推测FromArrayProducer是进行了一次数组的迭代遍历,那么是不是这样呢?我们看下FromArrayProducer的request方法:

@Override
        public void request(long n) {
            if (n < 0) {//异常参数检查
                throw new IllegalArgumentException("n >= 0 required but it was " + n);
            }
            if (n == Long.MAX_VALUE) {
                if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
                    fastPath();
                }
            } else
            if (n != 0) {
                if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
                    slowPath(n);
                }
            }
        }
订阅函数调用时序图

这里,在FromArrayProducer的request处理的时候执行了两个分支分别对应执行fastPath方法和slowPath方法。而在执行之前有判断了一个条件:

BackpressureUtils.getAndAddRequest(this, n)
//code BackpressureUtils.getAndAddRequest()
public static long getAndAddRequest(AtomicLong requested, long n) {
        // add n to field but check for overflow
        while (true) {
            long current = requested.get();
            long next = addCap(current, n);
            if (requested.compareAndSet(current, next)) {
                return current;
            }
        }
    }

注意这里的传递对象:
1.requested参数对应的是FromArrayProducer对象
2.n对应的就是我们所传递的请求总数
requested初始值为0,通过addCap将数值加入到requested对象中,这样就完成了生成对象的统计操作。

public static long addCap(long a, long b) {
        long u = a + b;
        if (u < 0L) {//防止越界
            u = Long.MAX_VALUE;
        }
        return u;
    }

同时我们也可以看到,由于产生请求以后,FromArrayProducer统计数增加,因此返回的(BackpressureUtils.getAndAddRequest(this, n) 必不为0。所以每一个数据生产者FromArrayProducer对象只能被使用一次,这时候有人会问了如果我用以下的代码,数据可以被回调两次的。

Observable<String> ob = Observable.just("str1", "str2", "str3", "str4");
        ob.subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted ");
            }
            @Override
            public void onNext(String t) {
                System.out.println("onNext " + t);
            }

            @Override
            public void onError(Throwable e) {}
        });
        System.out.println("----------");
        ob.subscribe(new Subscriber<String>() {
            @Override
            public void onCompleted() {
                System.out.println("onCompleted ");
            }
            @Override
            public void onNext(String t) {
                System.out.println("onNext " + t);
            }

            @Override
            public void onError(Throwable e) {}
        });

最后输出:

output:
//第一次输出
onNext str1
onNext str2
onNext str3
onNext str4
onCompleted
onNext str1//第二次输出
onNext str2
onNext str3
onNext str4
onCompleted

这是为什么呢?
回答这个问题,我们需要回到我们的OnSubscribeFromArray类中:

@Override
    public void call(Subscriber<? super T> child) {
        child.setProducer(new FromArrayProducer<T>(child, array));
    }

因为每发生一次订阅操作就生成一个全新的FromArrayProducer对象,因此你用到数据自然是在的。我们最后再来看下fastPath函数和slowPath函数,字面意思上好像是代表快慢的路径搜索,我们现在看下fastPath函数:

void fastPath() {
            final Subscriber<? super T> child = this.child;
            for (T t : array) {
                if (child.isUnsubscribed()) {
                    return;
                }
                child.onNext(t);
            }
            if (child.isUnsubscribed()) {
                return;
            }
            child.onCompleted();
        }

代码非常简单,就是遍历内存中的数组array,然后执行订阅者的onNext回调和onCompleted回调函数。而slowPath方法呢?

void slowPath(long r) {
            final Subscriber<? super T> child = this.child;
            final T[] array = this.array;
            final int n = array.length;
            long e = 0L;
            int i = index;//当前数据流索引
            for (;;) {
                while (r != 0L && i != n) {
                    child.onNext(array[i]);
                    i++;
                    if (i == n) {
                        return;
                    }
                    r--;
                    e--;
                }
                r = get() + e;
                if (r == 0L) {
                    index = i;
                    r = addAndGet(e);
                    if (r == 0L) {
                        return;
                    }
                    e = 0L;
                }
            }
        }

这个函数中几个重要的参数:
1.参数r代表你的请求数
2.e代表数据消耗数量
3.n代表你的数组长度
4.index代表你的数组数据流索引
这个函数执行的时候会先执行一个大循环,而这个大循环中包含着一个小循环:

while (r != 0L && i != n) {
                    child.onNext(array[i]);
                    i++;
                    if (i == n) {
                        return;
                    }
                    r--;
                    e--;
                }

用来判断数据流i是否结束,或者请求数r是否满足。当满足一个条件以后跳出循环执行addAndGet方法将请求数目加入到计数器中:

 r = get() + e;
                if (r == 0L) {
                    index = i;
                    r = addAndGet(e);
                    if (r == 0L) {
                        return;
                    }
                    e = 0L;
                }

按照"订阅函数调用时序图"我们知道,此时我们的订阅者类是被Observable.map函数装饰过的MapSubscriber类,前面我们说过,这个类是一个Subscriber类的装饰器,我们来看下它的基本实现:

 public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
            this.actual = actual;
            this.mapper = mapper;
        }

从构造器上看,在构建MapSubscriber类的时候需要指定它的被装饰对象和映射函数mapper,而当我们回调到MapSubscriber的onNext回调的时候:

@Override
        public void onNext(T t) {
            R result;
            try {
                result = mapper.call(t);//映射转换
            } catch (Throwable ex) {
                ....
                return;
            }
            actual.onNext(result);//回调被装饰对象
        }

我们传入的原始数据t,将被mapper映射函数处理,转化为一个R类型的结果result,然后把这个结果回调给被装饰对象。按照我们上面的例子,这里面我们的映射函数就是将t外面增加"[]"的Func1接口函数,actual被装饰对象就是我们代码中的匿名订阅者对象。

好了我们总结一下,我们通过上面一个非常非常简单例子我们接触到RxJava这个大家族中的很多核心类:
1.Observable是一个被观察者对象,每个订阅者需要通过subscribe方法与Observable对象签订订阅契约
2.Observable的构建是一系列OnSubscribe对象职责链式处理过程
3.RxJava中可以在观察的每个阶段配置hook函数
4.链式处理过程中,订阅者Subscriber对象可能会被链条中的中间环节所包装
5.Producer是用来定义生产数据的类型
6.Subscriber在函数setProducer中调用Producer的request(int n)方法用于请求n个数据

相关文章

网友评论

  • 泡泡大脚:写的很详细,自己第一遍看比较绕,第二遍按照作者的思路看了下去,感觉就清晰了很多

本文标题:RxJava源码解析(一)从一个例子开始

本文链接:https://www.haomeiwen.com/subject/dwudsxtx.html