RxJava源码分析基于RxJava1.3.8。
在上一章节中,主要介绍了RxJava的基本使用并对该部分的源码做了详细分析。在这一章节中,将主要介绍RxJava的另一大核心功能:变换。
变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。
在RxJava中,提供了许多针对不同场景实现变换功能的操作符,如下:
- map()
- flatMap(), concatMap(), and flatMapIterable()
- switchMap()
- scan()
- groupBy()
- buffer()
- window()
- cast()
虽然,RxJava中提供了如此多的变换操作符,但是变换的原理基本都是一致的。在本章节将主要针对map()和flatMap()操作符来分析变换的原理。
接下来,先通过一个实例来理解什么是变换以及map()实现变换的原理。
map变换
实例
需求,根据图片url实现ImageView图片的加载。
以下通过RxJava实现该操作:
Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
// 1、传入图片路径
subscriber.onNext("http://localhost:8080/...");
}
}).map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String s) {
// 2、根据图片路径获取Bitmp并返回
Bitmap bitmap = getImageBitmap(s);
return bitmap;
}
}).subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) {
// 3、ImageView设置Bitmap
imageView.setImageBitmap(bitmap);
}
});
在上面RxJava代码中,功能很清晰就是通过url实现图片的加载的流程,并且这里就完成了一个变换的过程:我们只传入了一个图片url,但在map方法中完成图片url到Bitmap转换的过程,最终在<code>Action1.call(Bitmap)</code>中只需要设置bitmap给ImageView即可。
所以在上述流程中,<code>map(Func1)</code>是完成变换的核心方法,下面我们就对该方法的源码做具体分析。
为了便于上述代码的分析,我们将上面链式调用的代码改成非链式调用的代码,这要能更清楚的展示两个Observable对象:stringObservable和mapObservable,如下:
Observable<String> stringObservable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
// 1、传入图片路径
subscriber.onNext("http://localhost:8080/...");
}
});
Observable<Bitmap> mapObservable = stringObservable.map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String s) {
// 2、根据图片路径获取Bitmp并返回
Bitmap bitmap = getImageBitmap(s);
return bitmap;
}
});
mapObservable.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) {
// 3、ImageView设置Bitmap
imageView.setImageBitmap(bitmap);
}
});
map操作符源码分析
首先,我们看下<code>map(Func1)</code>方法的具体实现,如下:
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
// 创建一个Observable对象并返回
return unsafeCreate(new OnSubscribeMap<T, R>(this, func));
}
关于unsafeCreate()方法在上一章节已经讲解过,这里就不在重复了。
在<code>map(Func1)</code>方法中,主要有两个功能:
- 创建并返回Observable对象,对于该Observable对象这里称之为mapObservable对象。
- 为mapObservable对象创建了一个OnSubscribe对象,也就是OnSubscribeMap对象(OnSubscribeMap对象是实现变换以及链式调用的核心)
在通过<code>map(Func1)</code>方法创建并返回mapObservable对象后,该mapObservable就通过调用<code>subscribe()</code>方法开始整个事件。
根据上章节对<code>subscribe()</code>方法的分析可知:在<code>subscribe()</code>方法内部会调用对应Observable对象(这里就是mapObservable对象)的<code>onSubscribe.call()</code>方法。而在mapObservable对象中onSubscribe实际就是指<code>OnSubscribeMap</code>对象,所以接下来就需要看看该类都做了些什么。
OnSubscribeMap
OnSubscribeMap源码如下:
public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {
final Observable<T> source;
final Func1<? super T, ? extends R> transformer;
/**
* 1、构造方法
*
* @param source 变换前Observable对象,也就是stringObservable
* @param transformer Func1对象,通过map方法传入的参数
*/
public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
this.source = source;
this.transformer = transformer;
}
/**
* 开始事件的方法
*
* @param o 观察者对象,该参数就是通过subscribe()方法传入的参数
*/
@Override
public void call(final Subscriber<? super R> o) {
// 创建观察者对象,MapSubscriber
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
// 将创建的观察者对象添加到定于列表中
o.add(parent);
// 调用方法,这是整个事件的开始
source.unsafeSubscribe(parent);
}
static final class MapSubscriber<T, R> extends Subscriber<T> {
// 该内部类稍后在讲解
}
}
这里,我们需要关注的是call()方法,在call()方法内部,最终会通过<code>source.unsafeSubscribe(parent)</code>开始整个事件的链式调用。
这里,我们看下<code>source.unsafeSubscribe(parent)</code>方法的内部实现:
public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
try {
subscriber.onStart();
/**
* 1、通过onObservableStart()方法获取OnSubscribe对象,也就是创建stringObservable对象是传入的OnSubscribe对象
* 2、调用OnSubscribe对象的call方法
*/
RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
// 省略部分代码
}
}
对于source变量,在<code>OnSubscribeMap</code>代码中就提到过,source变量就是指向引用stringObservable对象,这样虽然是通过mapObservable对象先调用subscribe()方法,但是最终还是stringObservable调用了subscribe()方法,这样也就保证了链式调用始终是从上而下执行的。
MapSubscriber
在上面的流程中,我们仅仅完成了链式调用的第一步,也就是stringObservable完成了call方法的调用,那么变换又是如何实现的呢?
在<code>stringObservable.call(Subscriber)</code>方法调用时,我们需要注意此时<code>call(Subscriber)</code>方法中传入的参数实际已经变成了MapSubscriber(看OnSubscribeMap.call()方法)。
MapSubscriber是OnSubscribeMap的一个静态内部类并且该类继承自Subscriber,所以它是一个观察者。
MapSubscriber类代码如下:
static final class MapSubscriber<T, R> extends Subscriber<T> {
final Subscriber<? super R> actual;
final Func1<? super T, ? extends R> mapper;
boolean done;
/**
* 构造方法
* @param actual 观察者对象,也就是subscribe()方法传入的参数
* @param mapper Func1对象,通过map方法传入的参数
*/
public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
this.actual = actual;
this.mapper = mapper;
}
@Override
public void onNext(T t) {
R result;
try {
// 1、调用Func1对象的call方法,实现事件变换
result = mapper.call(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}
// 2、完成变换后,将变换后的结果传入到下一个观察者
actual.onNext(result);
}
@Override
public void onError(Throwable e) {
if (done) {
RxJavaHooks.onError(e);
return;
}
done = true;
actual.onError(e);
}
@Override
public void onCompleted() {
if (done) {
return;
}
actual.onCompleted();
}
@Override
public void setProducer(Producer p) {
actual.setProducer(p);
}
}
根据上述代码可知:<code>MapSubscriber.onNext()</code>方法就是实现变换的核心。在该方法中会根据传入的原始参数并通过<code>Func1.call(t)</code>完成变换功能,最终将变换的结果传递到下一个事件中去。
这里,我们可以通过一张图来描述上述整个过程:
这里写图片描述
flatMap变换
flatMap操作符:使用一个指定的函数对原始Observable发射的每一项数据执行变换操作,这个函数返回一个本身也发射数据的Observable,然后FlatMap合并这些Observables发射的数据,最后将合并后的结果当做它自己的数据序列发射。
实例演示
需求:输入学生列表中的每个学生的具体课程名称。
通过RxJava代码实现:
Observable
.from(students)
.flatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student student) {
return Observable.from(student.courseList);
}
})
.subscribe(new Action1<Course>() {
@Override
public void call(Course course) {
System.out.println(course.courseName);
}
});
同样的,为了便于分析源码,我们将上面链式调用的源码改为非链式调用的源码,如下:
Observable<Student> fromObservable = Observable.from(students);
Observable<Course> flatMapObservable = fromObservable.flatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student student) {
return Observable.from(student.courseList);
}
});
flatMapObservable.subscribe(new Action1<Course>() {
@Override
public void call(Course course) {
System.out.println(course.courseName);
}
});
flatMap操作符源码分析
创建flatMapObservable
首先,我们看下flatMapObservable的创建过程,内部代码如下:
/**
* 开始变换
*
* @param func
* @param <R>
* @return
*/
public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
if (getClass() == ScalarSynchronousObservable.class) {
return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func);
}
/*
* 1、通过map(func)方法创建一个Observable对象,该对象的onSubscribe类型为OnSubscribeMap
* 2、通过merge()方法创键创建一个新的Observable对象,并返回最终通过该Observable调用subscribe()方法
*/
return merge(map(func));
}
/**
* 创建变换后的Observable对象
*
* @param source 通过map(func)方法创建一个Observable对象,改对象的onSubscribe类型为OnSubscribeMap
* @return
*/
public static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) {
if (source.getClass() == ScalarSynchronousObservable.class) {
return ((ScalarSynchronousObservable<T>)source).scalarFlatMap((Func1)UtilityFunctions.identity());
}
/*
* 1、OperatorMerge.<T>instance(false)获取一个OperatorMerge(HolderNoDelay.INSTANCE)对象,该对象实际就是一个Func1
*
* 2、通过map(func)创建的Observable对象调用lift()方法
*/
return source.lift(OperatorMerge.<T>instance(false));
}
/*
* 1、通过map(func)创建的Observable对象调用lift()方法
*
* @param operator OperatorMerge.<T>instance(false)获取一个OperatorMerge对象,该对象实际就是一个Func1
* @param <R>
* @return
*/
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
/*
* 这里onSubscribe实例是OnSubscribeMap
*/
return unsafeCreate(new OnSubscribeLift<T, R>(onSubscribe, operator));
}
/**
* 最终通过flatMap()方法创建的Observable,该类的onSubscribe类型为OnSubscribeLift
*
* @param f
* @param <T>
* @return
*/
public static <T> Observable<T> unsafeCreate(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f));
}
在flatMap操作符下,将创建一个新的Observable对象,该对象的onSubscribe对象实例为OnSubscribeLift,最终在该<code>Observable.subscribe()</code>方法调用时<code>OnSubscribeLift.call()</code>会被调用。
接下来,就来看下订阅的过程吧。
flatMapObservable订阅事件
完成了flatMapObservable变换Observable的创建后,就要调用订阅方法subscribe()了,如下:
flatMapObservable.subscribe(new Action1<Course>() {
@Override
public void call(Course course) {
System.out.println(course.courseName);
}
});
通过在上一章节中的介绍,我们了解到<code>flatMapObservable.subscribe(subscriber)</code>调用时,内部会调用<code>flatMapObservable.onSubscribe.call(subscriber)</code>,这里就需要确定<code>flatMapObservable.onSubscribe</code>的具体类型了。
由上面flatMapObservable的创建可知,<code>flatMapObservable.onSubscribe</code>的具体类型就是<code>lift()</code>方法中创建的<code>OnSubscribeLift</code>。
OnSubscribeLift源码如下:
public final class OnSubscribeLift<T, R> implements Observable.OnSubscribe<R> {
/**
* parent类型,就是创建fromObservable时,创建的OnSubscribe对象
*
* OnSubscribeMap类型,
*/
final Observable.OnSubscribe<T> parent;
/**
* OperatorMerge(HolderNoDelay.INSTANCE)对象
*/
final Observable.Operator<? extends R, ? super T> operator;
public OnSubscribeLift(Observable.OnSubscribe<T> parent, Observable.Operator<? extends R, ? super T> operator) {
this.parent = parent;
this.operator = operator;
}
@Override
public void call(Subscriber<? super R> o) {
try {
// 创建一个MergeSubscriber,该对象被被o持有
Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
try {
st.onStart();
// OnSubscribeMap.call()被调用,这个里面其实就map操作符转换的结果
parent.call(st);
} catch (Throwable e) {
// ....
}
} catch (Throwable e) {
// ....
}
}
}
在<code>OnSubscribeLift.call(Subscriber)</code>方法中,最终会调用<code>parent.call(st)</code>,这个parent就是<code>fromObservable.onSubscribe</code>对象,这里指的就是OnSubscribeMap对象,<font color='red'>这样RxJava链式调用的第一步就完成了,其实还是map()操作符的变换</font>。
然而,与普通map操作符不同的是,在<code>parent.call(st)</code>方法中传入的参数为MergeSubscriber,这个参数最终会被MapSubscriber对象持有并在onNext()方法中被调用到。
MergeSubscriber源码如下(该方法过长,这里就看下onNext方法),这个对象的作用是将完成转化的序列(这是是指Observable序列)依次发射出去并最终被调用。
static final class MergeSubscriber<T> extends Subscriber<Observable<? extends T>> {
// 通过onNext的参数就能知道,这个是处理转换后的Observable
@Override
public void onNext(Observable<? extends T> t) {
if (t == null) {
return;
}
if (t == Observable.empty()) {
emitEmpty();
} else
if (t instanceof ScalarSynchronousObservable) {
tryEmit(((ScalarSynchronousObservable<? extends T>)t).get());
} else {
// 1、创建一个Subscriber
InnerSubscriber<T> inner = new InnerSubscriber<T>(this, uniqueId++);
// 2、通过addInner()方法存储所有转换后的Subscriber,合并所有的Observable对象。
addInner(inner);
t.unsafeSubscribe(inner);
// 3、将转换后的序列发出并处理,这里最终会调用我们subscribe()方法中传入的Subscriber.onNext()方法。
emit();
}
}
}
这样关于flatmap变换操作的讲解就讲到这里了。
总结
通过map及flatMap操作符源码的分析可知,对于变换操作符的核心流程就是通过OnSubscribeLift类实现的。同时创建<code>MapSubscribe</code>及<code>OnSubscribeMap</code>这两个类,对于不同种类的变换不同的仅仅就是最终在OnSubscribeLift指定这两个类中具体的属性而已。
网友评论