RxJava是一个非常好用的框架,它的作用是将一个复杂的业务逻辑,分拆成一个个小的功能模块,然后将这些功能串起来,达到思路清晰的目的。
关于如何详细使用rxjava,这里就不再介绍了,可以点击 这里
进行查看,rxjava核心代码很简洁巧妙,本文主要介绍核心代码实现。
我们先看一个例子。
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
Log.e(TAG, "call");
subscriber.onNext("hello");
subscriber.onNext("beauty");
subscriber.onCompleted();
}
})
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.e(TAG, "onNext s: " + s);
}
});
05-31 14:36:06.469 29241-29241/? E/rx2: call
05-31 14:36:06.470 29241-29241/? E/rx2: onNext s: hello
05-31 14:36:06.470 29241-29241/? E/rx2: onNext s: beauty
05-31 14:36:06.470 29241-29241/? E/rx2: onCompleted
这个例子创建了一个被观察者和一个订阅者,被观察者产生了两个字符串,然后传递给了订阅者。
我们看一下源码
public class Observable<T> {
//创建被观察者,需要传入订阅事件f,这个是被观察者和订阅者之间的纽带,我们就称呼它为订阅纽带吧,避免混淆
public final static <T> Observable<T> create(OnSubscribe<T> f) {
//hook里面什么都没有做,只是将f返回
return new Observable<T>(hook.onCreate(f));
}
//保存订阅纽带
protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}
//订阅方法
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
//通知订阅者已经开始
subscriber.onStart();
try {
//onSubscribe是订阅纽带,这里直接调用订阅纽带的call方法,并将订阅者传了过去,然后执行subscriber.onNext("hello");方法,那么订阅者也就收到了该事件
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
return Subscriptions.unsubscribed();
}
}
}
这个例子,从代码实现层来看就是订阅者订阅被观察者后,触发订阅纽带的call方法,call方法产生数据,再传给订阅者。
现在将例子改一下,要将产生的字符串后面加上“*”号,我们可以通过map方法
//为了方便后面说明,我们称create返回的被观察者为原始被观察者,这个OnSubscribe我们称为原始订阅纽带
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
Log.e(TAG, "call");
subscriber.onNext("hello");
subscriber.onNext("beauty");
subscriber.onCompleted();
}
})
//map方法里面有map被观察者,map订阅纽带,map订阅者,见后面的代码
.map(new Func1<String, String>() {
@Override
public String call(String s) {
Log.e(TAG, "map call s: " + s);
return s + "*";
}
})
//这个Subscriber我们称为最终订阅者
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.e(TAG, "onNext s: " + s);
}
});
打印结果如下
05-31 14:39:03.707 29692-29692/? E/rx2: call
05-31 14:39:03.707 29692-29692/? E/rx2: map call s: hello
05-31 14:39:03.708 29692-29692/? E/rx2: onNext s: hello*
05-31 14:39:03.708 29692-29692/? E/rx2: map call s: beauty
05-31 14:39:03.708 29692-29692/? E/rx2: onNext s: beauty*
05-31 14:39:03.708 29692-29692/? E/rx2: onCompleted
我们可以看到,在原始订阅纽带里面调用来subscriber.onNext("hello");后,会将字符串传递到map的call方法里面,然后再调用最终订阅者的onNext方法
这个例子将一件事情分拆层两个步骤去实现,产生字符串,加工字符串。我们看一下具体是怎么做到的
这个例子跟上一个的差别是在中间加了map方法,我们看一下map方法的实现
public class Observable<T> {
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
//这里看一下OperatorMap类,func是要加工数据的方法
return lift(new OperatorMap<T, R>(func));
}
//该方法返回的是被观察者,相当于新建了一个观察者,为了方便说明,我们称该返回的被观察者是map被观察者,订阅纽带为map订阅纽带
//注意,下面的call方法是理解整个框架的关键,有点绕,冷静下来分析一下
//我们现在就来梳理这个例子的调用流程,在这之前,我们定义了几个名称:原始被观察者、原始订阅纽带、map被观察者、map订阅纽带、map订阅者,最终订阅者,先熟悉一下这几个词
//1.在最后调用subscribe(new Subscriber<String>())的时候,也是就是执行map被观察者的subscribe方法时,会调用map订阅纽带的call方法,也就是下面lift里面call方法
//2.调用hook.onLift(operator).call(o)得到的是map订阅者,o是最终订阅者,map订阅者是最终订阅者的代理,里面的onNext方法多了一层在转换
//3.onSubscribe是这个类的订阅纽带,也就是初始订阅纽带。调用onSubscribe.call(st)方法执行初始订阅者的call方法,也就是上面代码块的subscriber.onNext("hello")这里。
//这里的subscriber订阅者是map订阅者,调用了它的call方法,在OperatorMap这个类里面,执行的是里面的订阅者的onNext(T t)方法,调用o.onNext(transformer.call(t));。
//t就是传进来的"hello",通过transformer进行转换,这里的o是最终订阅者,t是原始订阅纽带传过来的"hello"字符串,这样就完成了转换,所以最终订阅者o就收到来转化后的数据
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> o) {
Subscriber<? super T> st = hook.onLift(operator).call(o);
st.onStart();
onSubscribe.call(st);
}
});
}
}
public final class OperatorMap<T, R> implements Operator<R, T> {
private final Func1<? super T, ? extends R> transformer;
//加工数据的类,负责转换数据
public OperatorMap(Func1<? super T, ? extends R> transformer{
this.transformer = transformer;
}
//为了方便说明,我们称该方法返回的订阅者是map订阅者,从代码来看,生成的这个订阅者是最终订阅者的代理,唯一不同的是多了一步transformer.call(t)的转化
@Override
public Subscriber<? super T> call(final Subscriber<? super R> o){
return new Subscriber<T>(o) {
@Override
public void onCompleted() {
o.onCompleted();
}
@Override
public void onError(Throwable e) {
o.onError(e);
}
@Override
public void onNext(T t) {
//这里的o是最终订阅者,t是原始订阅纽带传过来的
o.onNext(transformer.call(t));
}
};
}
}
这里有一点绕,我画了一张图来梳理这个关系

上面的说明再结合这张图,关系就能梳理清楚了
总结一下,调用subscribe订阅时,触发map被观察者的call方法,调用onSubscribe.call(st),st又是最终订阅者的代理(该代理获取到数据后加工数据),onSubscribe是原始订阅纽带,触发产生数据的onNext方法,然后将数据传递给map订阅者,加工完数据后,再传递给最终订阅者的onNext方法
每次添加一个map方法时,在原先的连接关系中间插入了一层关系,对上一层的被观察者来说,插入了一个订阅者;对下一层对订阅者而言,插入了一个被观察者,通过这种方式,又将原先对链条连接上了。
对于map方法是这个逻辑,对应其它的方法,也是类似的原理
网友评论