本篇文章的目的:分析RxJava map操作符和flatMap操作符的实现
map操作符的实现
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return String.valueOf(integer);
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe: ");
}
@Override
public void onNext(String string) {
Log.e(TAG, "onNext: " + string);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete: ");
}
});
先上个图

Observable的create()方法简化版
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
return new ObservableCreate<T>(source);
}
create()方法返回的Observable是一个ObservableCreate对象。
Observable的map()方法简化版
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
return new ObservableMap<T, R>(this, mapper);
}
Observable的subscribe()方法简化版
public final void subscribe(Observer<? super T> observer) {
subscribeActual(observer);
}
我们进入到ObservableMap类去看
public void subscribeActual(Observer<? super U> t) {
//1.
source.subscribe(new MapObserver<T, U>(t, function));
}
这里的source就是create()方法返回的ObservableCreate对象。然后使用传入的下游的观察者和function构建了一个MapObserver对象。
然后source调用subscribe()方法。内部也是调用subscribeActual()方法
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
这个过程在前面的文章中已经分析过了,这里我们只需要知道,当数据发出的时候,会调用map操作符返回的MapObserver对象对应的onNext(),onError()和onComplete()方法。
MapObserver类简化版
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
U v;
//应用mapper以后,返回结果
v = mapper.apply(t)
downstream.onNext(v);
}
}
总结:map操操作符,在onNext() 方法中,获取上游发射的数据t,应用传入的function,获得对应类型的对象v,然后使用v作为参数,调用下游观察者的onNext()方法。还是比较简单的。
flatMap操作符的实现
Observable.create(new ObservableOnSubscribe<List<Integer>>() {
@Override
public void subscribe(ObservableEmitter<List<Integer>> emitter) throws Exception {
List<Integer> list1 = new ArrayList<>();
list1.add(1);
list1.add(2);
list1.add(3);
List<Integer> list2 = new ArrayList<>();
list2.add(4);
list2.add(5);
list2.add(6);
emitter.onNext(list1);
emitter.onNext(list2);
emitter.onComplete();
}
}).flatMap(new Function<List<Integer>, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(List<Integer> integers) throws Exception {
return Observable.fromIterable(integers);
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe: ");
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete: ");
}
});
上张图

Observable的flatMap()方法
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
return flatMap(mapper, false);
}
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors) {
return flatMap(mapper, delayErrors, Integer.MAX_VALUE);
}
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency) {
return flatMap(mapper, delayErrors, maxConcurrency, bufferSize());
}
方法重载也是狠,最终是调用了这个方法,返回一个ObservableFlatMap对象。
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize) {
return new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize);
}
当调用subscribe()方法把观察者和被观察者关联起来的时候会调用ObservableFlatMap的subscribeActual()方法。
ObservableFlatMap的subscribeActual()方法简化版
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
创建了一个MergeObserver对象,然后source(ObservableCreate对象)调用subscribe()方法。最终会进入到ObservableCreate的subscribeActual()方法。
ObservableCreate的subscribeActual()方法简化版
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//1. 调用观察者的onSubscribe()方法
observer.onSubscribe(parent);
//2. 发射数据
source.subscribe(parent);
}
- 调用观察者的onSubscribe()方法
MergeObserver的onSubscribe()方法
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {
//为upstream赋值,并调用下游的onSubscribe()方法
this.upstream = d;
downstream.onSubscribe(this);
}
}
- 发射数据
ObservableCreate对象发射数据这个过程在前面的文章中已经分析过了,这里我们只需要知道,当数据发出的时候,会调用flatMap操作符返回MergeObserver对象对应的onNext(),onError()和onComplete()方法。
List<Integer> list1 = new ArrayList<>();
list1.add(1);
list1.add(2);
list1.add(3);
List<Integer> list2 = new ArrayList<>();
list2.add(4);
list2.add(5);
list2.add(6);
emitter.onNext(list1);
emitter.onNext(list2);
emitter.onComplete();
MergeObserver的onNext方法简化版
public void onNext(T t) {
ObservableSource<? extends U> p;
//应用传入的mapper,在这个例子中,p是一个ObservableFromIterable对象
p = mapper.apply(t)
subscribeInner(p);
}
MergeObserver的subscribeInner方法简化版
void subscribeInner(ObservableSource<? extends U> p) {
for (;;) {
//1. 创建一个InnerObserver对象
InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
//2. 添加inner
if (addInner(inner)) {
//3. 如果添加成功,p就订阅inner对象。
p.subscribe(inner);
}
//跳出循环
break;
}
}
-
创建一个InnerObserver对象
InnerObserver类是ObservableFlatMap的一个静态内部类,暂且不去管。 -
添加inner,内部操作就是把创建好的InnerObserver对象添加到observers中去。
final AtomicReference<InnerObserver<?, ?>[]> observers;
- 如果添加成功,p就订阅inner对象。正常情况下添加是成功的,这里的p就是我们应用flatMap后返回的对象。在上面的例子中就是
Observable.fromIterable(integers)
返回的对象。
Observable的fromIterable()方法简化版
public static <T> Observable<T> fromIterable(Iterable<? extends T> source) {
return new ObservableFromIterable<T>(source);
}
返回一个ObservableFromIterable对象。
ObservableFromIterable的subscribeActual方法简化版
public void subscribeActual(Observer<? super T> observer) {
Iterator<? extends T> it;
it = source.iterator();
boolean hasNext= it.hasNext();
if (!hasNext) {
EmptyDisposable.complete(observer);
return;
}
FromIterableDisposable<T> d = new FromIterableDisposable<T>(observer, it);
//1. 首先调用observer的onSubscribe()方法
observer.onSubscribe(d);
}
- 首先调用observer的onSubscribe()方法
InnerObserver的onSubscribe()方法简化版
public void onSubscribe(Disposable d) {
if (DisposableHelper.setOnce(this, d)) {
if (d instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable<U> qd = (QueueDisposable<U>) d;
//1. m的取值是SYNC
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
if (m == QueueDisposable.SYNC) {
fusionMode = m;
//2. 为queue赋值
queue = qd;
done = true;
//3 . 调用parent.drain()
parent.drain();
return;
}
if (m == QueueDisposable.ASYNC) {
fusionMode = m;
queue = qd;
}
}
}
}
- m的取值是SYNC
FromIterableDisposable的requestFusion()方法
public int requestFusion(int mode) {
if ((mode & SYNC) != 0) {
fusionMode = true;
return SYNC;
}
return NONE;
}
返回结果是SYNC
//2. 为queue赋值
这里明确一点即可,我们能从queue中取出我们想要的数据。
3 . 调用parent.drain()方法
这里的parent就是MergeObserver,我们回到MergeObserver类。
在这个例子中,drain()方法会被调用3次。在onNext() 方法中调用2次,然后我们发现在MergeObserver的onComplete中调用一次。
public void onComplete() {
if (done) {
return;
}
done = true;
drain();
}
drain()方法有点复杂,就不一行一行代码的去分析了,不过这个方法确实是flatMap操作符的精髓所在。在经过几次debug一步一步调试之后,大致看懂了这个方法的作用如下。
-
在onNext() 方法中调用drain()方法的时候,会从queue中取出我们的数据,然后调用下游的onNext()方法。(在这个例子中,MergeObserver的onNext() 方法会被调用2次,每次调用下游的observer的onNext() 方法3次)。
-
在onComplete() 方法中调用drain()方法,就直接调用下游observer的onComplete() 方法。
总结:自己分析的还是很粗糙的,在后续的学习和研究过程中会不断完善。
网友评论