1.回顾
上篇已经讲了RxJava2创建操作符create源码解析,不清楚的可以查看RxJava2框架源码分析二(Create篇)
2.Map操作符
- 定义
官方定义:transform the items emitted by an Observable by applying a function to each item
拙劣的翻译:应用一个函数 转换所有的被发射的item
- 实例讲解
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "类型转换:" + integer;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("开始采用subscribe连接");
}
@Override
public void onNext(String integer) {
System.out.println(integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
System.out.println("对Complete事件作出响应");
}
});
关于创建步骤这些上面文章已经分析过了,所以直接跳过了,不清楚的可以到RxJava2框架源码分析二(Create篇)观看,这里直接看到运行结果:
示意图.png3. 源码分析
老套路,按照步骤走
- 步骤一:创建被观察者
ObservableCreate
&定义需发送的事件 - 步骤二:通过
ObservableCreate
再次创建被观察者ObservableMap
&对事件进行加工操作 - 步骤三:创建观察者
Observer
&定义响应事件的行为 - 步骤四:通过
ObservableMap
订阅subscribe
观察者Observer
注意:上面用到的ObservableCreate
,ObservableMap
都是Observable
的子类,都是被观察者。
步骤一:创建ObservableCreate
步骤二
- 源码分析
//步骤二,创建ObservableMap被观察者
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "类型转换:" + integer;
}
})
//源码分析
//创建ObservableMap被观察者
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
//判断非空
ObjectHelper.requireNonNull(mapper, "mapper is null");
//创建ObservableMap并且返回出去(注意这里构造方法中的this指的是上一个被观察者ObservableCreate)
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
//这类继承了AbstractObservableWithUpstream,该类也是Observable的子类
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
...
// 仅贴出关键源码
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
//初始化,调用父类构造方法
//source是指上一个被观察者,本文中为ObservableCreate
super(source);
//将map()接收的function传递至全局
this.function = function;
}
}
//这类也是Observable的子类,主要作用是包装,扩展
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
//上一个被观察者的引用
protected final ObservableSource<T> source;
AbstractObservableWithUpstream(ObservableSource<T> source) {
//通过构造方法赋值
this.source = source;
}
@Override
public final ObservableSource<T> source() {
//返回上一个被观察者
return source;
}
}
/**
* 此接口为Observable.map()里面的参数接口
**/
public interface Function<T, R> {
//定义两个泛型,参数为泛型T,返回泛型R
R apply(@NonNull T t) throws Exception;
}
-
步骤二总结:
通过上一个被观察者ObservableCreate.map()
创建了一个ObservableMap
对象,并通过构造方法传入ObservableCreate
引用this
以及Function
接口的实现类。ObservableMap
构造方法把ObservableCreate
的this
传递给其父类AbstractObservableWithUpstream
并赋source
步骤三:创建观察者Observer
创建Observer
接口的实现类
步骤四:通过ObservableMap
订阅subscribe
观察者Observer
- 源码分析
/**
* 源码分析:ObservableMap.subscribe(observer)
* 说明:该方法属于 Observable 类的方法(注:传入1个 Observer 对象)
**/
public abstract class Observable<T> implements ObservableSource<T> {
...
// 仅贴出关键源码
@Override
public final void subscribe(Observer<? super T> observer) {
...
// 仅贴出关键源码
//可以看到调用的是本类的下面抽象方法
subscribeActual(observer);
}
//定义了一个抽象方法当调用subscribe时会跟这个调用Observable子类的实现方法(就是调用者)
protected abstract void subscribeActual(Observer<? super T> observer);
}
/**
* 现在我们回到先前创建的被观察者中 ObservableMap类
**/
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
//构造方法传入的Function实现类
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
//super()将上游的Observable保存起来 ,用于subscribeActual()中用。
//source是指上一个被观察者,本文中为ObservableCreate
super(source);
//将map()接收的function传递至全局
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
//source是AbstractObservableWithUpstream的成员变量,通过上面构造方法传入的(本例子中带代表的是ObservableCreate)
//步骤一,创建MapObserver对象,并传入观察者(observer)以及function,这里只是创建对象,没有调用里面的方法
//步骤二,调用source.subscribe()把MapObserver对象传入ObservableCreate中去
source.subscribe(new MapObserver<T, U>(t, function));
}
/**
* 这类是map 的包装类是Observer的子类,BasicFuseableObserver继承的是Observer,
* 该类是可融合中间观察者的基类,里面主要实现了onSubscribe()、onError()、onComplete() 等一些抽象方法
**/
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
//Function实现类
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
//将观察者传递给父类downstream(就是父类的成员变量downstream)
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
// 父类成员变量,表示是否发送过onError()、onComplete()事件
if (done) {
return;
}
//默认sourceMode是0,所以跳过
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v;
try {
// 讲通过调用Function接口方法,将t传入,返回u,完成类型转换
//相当于 v =mapper.apply(t)
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
//调用观察者的onNext()
downstream.onNext(v);
}
...
// 仅贴出关键源码
}
}
/**
* 熟悉的类ObservableCreate,详细的分析在上篇文章
* 上一个类中我们调用了source.subscribe(new MapObserver<T, U>(t, function));
* Observable.subscribe()方法最终会回调到ObservableCreate中的subscribeActual()方法
**/
public final class ObservableCreate<T> extends Observable<T> {
@Override
protected void subscribeActual(Observer<? super T> observer) {
//将MapObserver()观察者封装成CreateEmitter对象
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
// 调用观察者(MapObserver)的onSubscribe()
observer.onSubscribe(parent);
try {
//3.调用source对象的subscribe()方法(发射器中的subscribe()实现类中的onNetx()系列方法)
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
//这里传入的是MapObserver对象
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
//调用的其实是MapObserver对象中的onNext()方法
observer.onNext(t);
}
}
}
-
步骤四总结:
当被观察者订阅观察者的时候,会调用被观察者Observable的subscribeActual()抽象方法,回调其子类重新的subscribeActual()方法。这方法里面有几个步骤:
- 通过
ObservableMap
调用父类Observable
方法subscribe()
,该方法回调父类抽象方法subscribeActual()
的实现类ObservableMap
中去。 -
ObservableMap
类中重写subscribeActual()
方法中创建了MapObserver
观察者对象,并通过调用父类Observable
方法subscribe()
把MapObserver
对象传递给上一个被观察者ObservableCreate
中的subscribeActual()
实现方法中。 - 创建1个
CreateEmitter
对象(封装成一个Disposable
对象)。 - 调用观察者
MapObserver
的onSubscribe(CreateEmitter parent )
使其可以取消订阅。 - 调用
source
对象的subscribe(CreateEmitter parent)
方法,通过parent
发送事件回调。
4. 源码总结
- 创建被观察者,通过
Observable.create()
方法创建了ObservableCreate
对象,然后通过ObservableCreate
对象又创建了ObservableMap
。 - 订阅,
ObservableMap
通过调用父类方法subscribe()
方法回调到ObservableMap
重写父类subscribeActual()
方法中,该方法创建MapObserver
对象,并实现Observer
中的onNext()
,里面调用接口Function
实现数据类型转换。 -
ObservableMap.subscribeActual()
方法中调用上一个被观察者ObservableCreate.subscribe()
,回调到ObservableCreate.subscribeActual()
中去,然后创建发射器调用MapObserver.onNext()
。
网友评论