介绍
map()方法是对Observable内的数据处理器Observable.OnSubscribe执行完数据处理后的再次加工。
执行代码
//初始化被观察者Observable,并给其加上数据处理器Observable.OnSubscribe
Observable Aobservable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("杨");
subscriber.onNext("月");
subscriber.onCompleted();
}
});
//做Map变换处理
Observable Bobservable = Aobservable.map(new Func1<String,String>() {
@Override
public String call(String string) {
return string+"YaZhou";
}
});
//初始化观察者Observer,视作结果接收器
Observer observer = new Observer<String>() {
@Override
public void onCompleted() {
LogShowUtil.addLog("RxJava","结束",true);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String string) {
LogShowUtil.addLog("RxJava","结果: "+string,true);
}
};
Bobservable.subscribe(observer);
上面代码
Aobservable
、Bobservable
命名不规范是为了做凸显之意,不要介意。
源码分析
1. 初始化被观察者AObservable
Observable Aobservable = Observable.create(原始数据处理器);
由此可知被观察者AObservable持有原始数据处理器对象Observable.OnSubscribe。
2. 执行map()变换操作
Observable Bobservable = Aobservable.map(数据变换器)
Observable#map
public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
return create(new OnSubscribeMap<T, R>(this, func));
}
接着我们看其中的new OnSubscribeMap(Aobservable,数据变换器)操作
OnSubscribeMap#OnSubscribeMap
public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
this.source = source;
this.transformer = transformer;
}
由代码可知代理数据变换器OnSubscribeMap持有
Aobservable
和数据转换器func
回到map()方法内继续执行create(代理数据变换器)
return create(new OnSubscribeMap<T, R>(this, func));
create方法之前已经分析过,由此可知Bobservable持有代理数据变换器OnSubscribeMap。
3. 初始化结果接受器观察者Observer,
Observer observer = new Observer<String>() {
...
}
4. 订阅
Bobservable.subscribe(observer);
由之前分析可知会使用 Bobservable内的代理数据变换器OnSubscribeMap做call()方法。
其中observer为结果接受器
OnSubscribeMap#call
@Override
public void call(final Subscriber<? super R> o) {
//步骤一
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
o.add(parent);
//步骤二
source.unsafeSubscribe(parent);
}
先看
步骤一
此处初始化了数据变换中转器MapSubscriber(结果接收器,数据变换器)。
其中结果接收器是subscribe()方法传递进来的。
数据变换器是map时初始化OnSubscribeMap传递进来的。
接着看数据变换中转器MapSubscriber()
构造方法
public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
this.actual = actual;
this.mapper = mapper;
}
由此可知数据变换中转器中持有结果接收器
actual
和数据变换器mapper
再回到OnSubscribeMap的call()方法内继续执行步骤二
//步骤二
source.unsafeSubscribe(parent);
其中
source
为Aobservable,parent
为数据变换中转器MapSubscriber
继续执行会进入Aobservable.unsafeSubscribe()方法
public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
try {
subscriber.onStart();
//获取数据处理器Observable.OnSubscribe,并做数据处理工作
RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
try {
subscriber.onError(RxJavaHooks.onObservableError(e));
} catch (Throwable e2) {
Exceptions.throwIfFatal(e2);
RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
RxJavaHooks.onObservableError(r);
throw r;
}
return Subscriptions.unsubscribed();
}
}
由此可知AObservable的原始数据处理器先执行call(数据变换中转器MapSubscriber)方法
接下来会进入外部实现的外部数据处理器
数据处理器内的call()方法
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("杨");
subscriber.onNext("月");
subscriber.onCompleted();
}
然后会进入数据变换中转器MapSubscriber的onNext()
方法
@Override
public void onNext(T t) {
R result;
try {
//步骤一 数据变换操作
result = mapper.call(t);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
}
//步骤二 返回数据结果给用户(结果接收器)
actual.onNext(result);
}
执行步骤一进入map()方法Func的外部实现方法,并返回数据
observable = observable.map(new Func1<String,String>() {
@Override
public String call(String string) {
return string+"YaZhou";
}
});
接着会执行步骤二,其中actual上文已经分析可知为结果接收器
actual.onNext(result);
接着就会进入结果接收器Observer内方法体内
Observer observer = new Observer<String>() {
@Override
public void onCompleted() {
LogShowUtil.addLog("RxJava","结束",true);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String string) {
LogShowUtil.addLog("RxJava","结果: "+string,true);
}
};
最终输出结果
结果: 杨YaZhou
结果: 月YaZhou
结束
网友评论