map操作符
被观察者数据源泛型,当发射器的数据类型和观察者数据类型不同时,通过map操作符转换,可以将上游发射的类型转换成任意对象类型。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
}
}).map(new Function<Integer, String>() {
@Override
public String apply(Integer s) throws Exception {
String newStr = s + "_";
Log.d(TAG, "int apply s " + newStr);
return newStr;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String response) throws Exception {
Log.d(TAG, "Observer : " + response);
}
});
发射数据类型是Integer类,通过map操作符,将类型转换成String类。Function是一个类型转换接口,Function<T, R>,将T转换R,解决被观察者和观察者数据类型不匹配问题。
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
返回一个被观察者ObservableMap,封装原始被观察者ObservableCreate和转换接口Function,调用ObservableMap的subscribe注册方法。
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
创建观察者MapObserver,封装自定义观察者和转换接口Function。source源即内部ObservableCreate,调用它的subscribe方法。
被观察者链ObservableCreate的#subscribeActual方法,创建CreateEmitter数据发射器,通知观察者已经注册。
调用数据源source(ObservableOnSubscribe)的subscribe方法,将发射器暴漏给外部。外部通过发射器发射数据,如onNext方法。
发射器CreateEmitter持有观察者MapObserver,当onNext事件发射后,通知观察者MapObserver的onNext方法,传参发射的数据类型Integer类。
public void onNext(T t) {
...
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}
根据MapObserver内部转换接口Function,apply方法,将T类型转换成U类型,再调用自己定义观察者Observer的onNext方法,入参数据类型转换成String。
发射器onNext方法和观察者accept方法按照通知顺序执行。
flatMap操作符
flatMap操作符和map类似,Function接口实现类型转换,转换的对象是一个被观察者ObservableSource。
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
}
}).flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer s) throws Exception {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
//将上游Integer类型数据,在新发射器中改造发射。
String newStr = s + "_gc1";
String newStr2 = s + "_gc2";
Log.d(TAG, "int apply s " + newStr);
Log.d(TAG, "int apply s " + newStr2);
e.onNext(newStr);
e.onNext(newStr2);
}
});
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(String response) throws Exception {
Log.d(TAG, "Observer : " + response);
}
});
将上游发射器每个Integer类型的数据转换成Observable类型,再由每个转换的被观察者发射目标类型数据。
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize) {
...
return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
}
返回一个被观察者ObservableFlatMap。封装原始被观察者ObservableCreate和转换接口Function,调用ObservableFlatMap的subscribe注册方法。
public void subscribeActual(Observer<? super U> t) {
...
source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
创建观察者MergeObserver,封装自定义观察者和转换接口Function,source源即内部ObservableCreate,调用它的subscribe方法。当发射器onNext方法发射时,调用发射器内部MergeObserver的onNext方法。
@Override
public void onNext(T t) {
ObservableSource<? extends U> p;
try {
p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
} catch (Throwable e) {
return;
}
//调用的是新建ObservableSource的注册方法。
subscribeInner(p);
}
通过Function接口方法,将Integer类型转换成ObservableSource类型,转换对象是一个被观察者,外部创建,ObservableCreate类型,将Integer类型的数据暴露在新被观察者的数据源发射器中,处理转换成新发射器支持String类型,subscribeInner方法,新被观察者订阅。
void subscribeInner(ObservableSource<? extends U> p) {
for (;;) {
if (p instanceof Callable) {
...
} else {
InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
if (addInner(inner)) {
p.subscribe(inner);
}
break;
}
}
}
调用Observable的subscribe方法,订阅InnerObserver观察者,外部调用发射器onNext方法,可以获取apply方法中上层发射的Integer数据,按照String类型,触发两个onNext方法再次发射数据,两次调用观察者InnerObserver的onNext方法,每次,调用它引用MergeObserver的onNext方法,最终,通知到外部观察者。
flatMap最初的onNext顺序,在Function转换成新Observable后,根据收到的数据,包装重新发射一批新数据。在观察者到的onNext顺序不一定是按照最初的onNext顺序调用的。
上面发送的1,2,3,在观察者中看到的不一定是1,2,3的排序,加一个延迟就能看到,即1_gc1,1_gc2,3_gc1,3_gc2,2_gc1,2_gc2。
总结
flatMap不保证数据发射流的通知顺序。
concatMap和flatMap功能相同,可以保证按照发射顺序通知。
任重而道远
网友评论