在RxJava中,map(),flatMap()虽然功能各有不同,但实质上都是针对事件序列的处理和再发送。而在它们内部,其实都是基于同一个基础的变换方法,即lift(Operator)。lift(Operator)方法是RxJava中所有操作符的基础,可以通过它做各种各样的变化,弄清楚它的原理,也方便我们理解其他操作符。
1.Func1接口 和 Operator接口
/**
* Represents a function with one argument.
*/
public interface Func1<T, R> extends Function {
R call(T t);
}
/**
* Operator function for lifting into an Observable.
*/
public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {
// cover for generics insanity
}
Func1接口会按照泛型参数的顺序传入T,并返回R。按照Func1接口的定义,Operator接口会传入一个Subscriber<? super R>参数,并返回一个Subscriber<? super T>。
看到Observable<T>,Operator<R, T>,Func1<T, R>这2个类的泛型参数,可能会有这样的疑问,即Operator的泛型参数顺序为什么是<R, T>,而不是<T, R>?其实,这里我们并不需要关心顺序是什么,只需要记住Operator<R, T>是按照泛型参数的顺序,传入一个Subscriber<R>参数,并返回一个Subscriber<T>。
2.lift(Operator)内部实现原理
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
return new Observable<R>(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> o) {
try {
//也可以看作是Subscriber<? super T> st = operator.call(o);
Subscriber<? super T> st = hook.onLift(operator).call(o);
try {
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
// 这里的onSubscribe是调用lift方法的Observable中的onSubscribe,即Observable<T>中的onSubscribe
onSubscribe.call(st);
} catch (Throwable e) {
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
Exceptions.throwIfFatal(e);
st.onError(e);
}
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
o.onError(e);
}
}
});
}
(1)假设已有一个Observable<T>,调用lift()方法,生成一个Observable<R>,此时就有了两个Observable和两个OnSubscribe对象。
(2)然后调用Observable<R>的subscribe()方法,传入一个Subscriber<R>对象,此时触发Observable<R>.onSubscribe.call()方法,也就是上面lift()方法中的call()方法。
(3)在上述lift()的call()方法中会调用onSubscribe.call()方法,注意这个onSubscribe是Observable<T>中的那个OnSubscribe<T>对象,它需要传入一个Subscriber<T>对象,这个对象是通过operator.call()方法生成的。正是这个Operator对象将两个Subscriber对象关联起来,OnSubscribe<T>在执行Subscriber<T>.onNext(T t)方法的时候也会执行Subscriber<R>.onNext(R r)。
小结
上面的Observable<T>是事件源,对它进行lift()变换得到新的Observable<R>,这个新的Observable<R>的回调已经固定,也就是上面lift()方法中的call()方法。此时调用Observable<R>的subscribe()方法,传入的Subscriber<R>是用户定义的事件监听者,它监听的是新的Observable<R>。这个Observable<R>的回调是固定的,它并不能产生新事件,所以得靠事件源Observable<T>。那么,此时通过Operator生成一个中间的Subscriber<T>对象,然后利用这个Subscriber<T>向原始Observable<T>进行订阅,然后原始Observable<T>就开始发送事件。但是这个Subscriber<T>对象并没有消耗事件,而是起着一个代理的作用,即接收事件源Observable<T>的事件,并将事件转给用户定义的Subscriber<R>。所以,Operator可以看做是一个生成代理的工具类,通过它生成一个代理Subscriber<T>,然后通过代理Subscriber<T>将事件发送给Subscriber<R>。
精简版:在Observable执行了lift(Operator) 方法之后,会返回一个新的Observable,这个新的 Observable会像一个代理一样,负责接收原始的Observable 发出的事件,并在处理后发送给 Subscriber。
3.举个例子:通过lift(Operator)将Integer变换为String
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(121);
Log.e(TAG, "lift call Integer: 121");
}
}).lift(new Observable.Operator<String, Integer>() {
@Override
public Subscriber<? super Integer> call(final Subscriber<? super String> subscriber) {
return new Subscriber<Integer>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "Subscriber<Integer> onNext: " + integer);
Log.e(TAG, "lift transform: " + "Integer to String :" + integer);
subscriber.onNext("Integer to String " + integer);
}
};
}
}).subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
Log.e(TAG, "Subscriber<String> onNext: " + s);
}
});
网友评论