RxJava 2.0简介
关于RxJava到底是什么,我们可以看看它的开发者是如何描述它的:
RxJava is a Java VM implementation of Reactive Extensions: a library for composing asynchronous and event-based programs by using observable sequences.
It extends the observer pattern to support sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.
它是通过使用可观察序列来组件异步和基于事件程序的库,它使用了观察这模式来支持事件与数据的序列化,并且提供了一些列的操作符,你可以用这些操作符来将各种序列显式的组合起来,并且它可以帮你把异步线程、线程同步、线程和并发的数据结构中抽象出来,让你远离那些实现细节。
嗯,没错,这就是RxJava,一个啥都不需要你考虑,只需要把你想做的事务和要处理的数据组合成操作序列,剩下的就交给RxJava就好了。
当然,不是说RxJava能做到的,其他库或者我们自己就不能做到,那为什么还要使用RxJava呢?(其实我不喜欢使用RxJava)这里我挑一些别人说的理由:
-
它是一种函数响应式编程,显然函数响应式编程的好处它都有(不知道函数响应式编程的自己查资料)
-
逻辑清晰,使用简洁(是不是这样呢?谁用谁知道)
而我不喜欢的理由:
我也不是不喜欢RxJava,只是觉得有时候只是为了一个异步任务就动不动RxJava,有点杀鸡用牛刀的感觉。这些轻量级别的任务,我还是喜欢用bolts-task(强推)。这里贴一个科普链接:https://blog.csdn.net/u010295885/article/details/52463039
操作符flatmap的简单介绍与应用
flatmap是RxJava众多操作符中的一个,flat字面含义就是"压平",当你需要把多个集合整理成一个集合,其实就是一个压平的操作,如下图,这个时候你就可以使用flatmap这个操作符了。
image.png
看一个简单的应用
List<Student> students = new ArrayList<Student>();
students.add(new Student("1", "zhangsan")
.addCourse(new Course("1", "数学"))
.addCourse(new Course("5", "计算机")));
students.add(new Student("2", "lisi")
.addCourse(new Course("2", "语文")));
students.add(new Student("3", "wangwu")
.addCourse(new Course("3", "英语")));
Observable.fromIterable(students)
.flatMap(new Function<Student, ObservableSource<Course>>() {
@Override
public ObservableSource<Course> apply(Student student) throws Exception {
return Observable.fromIterable(student.courses);
}
})
.subscribe(new Consumer<Course>() {
@Override
public void accept(Course course) throws Exception {
System.out.println(course);
}
});
应用非常简单,数据源是一些学生,而我最后打印的是每个学生的所选的课程,我们来试试使用map操作符来实现看看:
Observable.fromIterable(students)
.map(new Function<Student, List<Course>>() {
@Override
public List<Course> apply(Student student) throws Exception {
return student.courses;
}
})
.subscribe(new Consumer<List<Course>>() {
@Override
public void accept(List<Course> courses) throws Exception {
for (Course cours : courses) {
System.out.println(cours);
}
}
});
比较看下来,相对于flatmap,map操作符只能给我们每个学生所选的所有课程,并不能告诉我们他所选的每一门课都是什么,这就需要我们自己去加一层遍历了。
另外,从比较结果来看,flatmap好像是替我们对学生的课程列表做了一次遍历。嗯,没错,从结果上看它确实是做了,那它是怎么做到的呢?
flatmap源码分析
这里以上面那个简单应用作为一个入口,来看看flatmap如何帮我们把数据压平的。
1. Observable::fromIterable
public static <T> Observable<T> fromIterable(Iterable<? extends T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableFromIterable<T>(source));
}
创建一个ObservableFromIterable对象,构造参数为一个迭代器(在我们的应用就是students)。然后判断是否要进行Hook操作,最后返回一个Observable对象。
2. RxJavaPlugins::onAssembly
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
这里的f为空,即不需要Hook,所以第一步中会直接返回一个ObservableFromIterable对象。接下来我们调用了flatmap。
3. ObservableFromIterable::flatmap
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
return flatMap(mapper, false);
}
flatmap被重载了很多次,对于只传了一个映射方法的flatmap方法,最后会调用下面的flatmap
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
if (this instanceof ScalarCallable) {
@SuppressWarnings("unchecked")
T v = ((ScalarCallable<T>)this).call();
if (v == null) {
return empty();
}
return ObservableScalarXMap.scalarXMap(v, mapper);
}
return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
}
这里需要关注它几个参数:
- mapper:就是student到ObservableSource<Course>的映射方法
- delayErrors:false
- maxConcurrency:Integer.MAX_VALUE
- bufferSize:bufferSize()
由于不Hook,所以这里直接返回ObservableFlatMap对象,注意这里把我们在第一步中创
ObservableFromIterable作为构造参数传给了ObservableFlatMap对象。
4.ObservableFlatMap::this
public ObservableFlatMap(ObservableSource<T> source,
Function<? super T, ? extends ObservableSource<? extends U>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize) {
super(source);
this.mapper = mapper;
this.delayErrors = delayErrors;
this.maxConcurrency = maxConcurrency;
this.bufferSize = bufferSize;
}
流程很简单,这里要注意这个source最后会赋值给父类的一个名字为source成员。如下
/** The source consumable Observable. */
protected final ObservableSource<T> source;
/**
* Constructs the ObservableSource with the given consumable.
* @param source the consumable Observable
*/
AbstractObservableWithUpstream(ObservableSource<T> source) {
this.source = source;
}
小结,flatmap只做了一件事情,就是把我们最开始的ObservableFromIterable包了一层,构造了一个ObservableFlatMap,然后我们对ObservableFlatMap进行了订阅操作
5. ObservableFlatMap::subscribe
回顾下我们的调用
.subscribe(new Consumer<Course>() {
@Override
public void accept(Course course) throws Exception {
System.out.println(course);
}
});
调用的是参数类型为Consumer的subscribe方法
public final Disposable subscribe(Consumer<? super T> onNext) {
return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
return ls;
}
调用了subscribe多参重载方法,注意这里的传参
- onNext:就是我们的Consumer
- onSubscribe:Functions.emptyConsumer(),它返回一个EmptyConsumer对象,accept不做任何处理
static final class EmptyConsumer implements Consumer<Object> {
@Override
public void accept(Object v) { }
@Override
public String toString() {
return "EmptyConsumer";
}
}
多参的subscribe将我们的入参包装成了一个Observer对象,也就是LambdaObserver来代理了我们的Consumer,然后调用参数类型为Observer的subscribe
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
这里RxJavaPlugins.onSubscribe依然不会对observer进行Hook,所以这里最后就是直接调用subscribeActual方法,它是一个抽象方法,每个子类都会有自己的实现,进行真正的订阅操作,这里我们最后调用的就是ObservableFlatMap的subscribeActual方法
6. ObservableFlatMap::subscribeActual
@Override
public void subscribeActual(Observer<? super U> t) {
if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
return;
}
source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
还记得这个source是谁吧?就是开始创建的那个ObservableFromIterable对象,这里才开始对我们原始的数据源进行订阅。而且从这里开始,每一次调用subscribe,都会先创建一个新的Observer对象来代理上一个Observer对象,从而形成一种上下游的Flow,这个为后面的onSubscribe做了准备。
小结,subscribe方法与onSubscribe是一对操作,subscribe负责把订阅者一层层包起来,每包一层就增加了一层对原始数据的加工,而onSubscribe则负责把加工的数据一层层往里面传,最后传给我们最初的订阅者。可以结合下图理解
image.png
接下来,就看看原始订阅者的上游MergeObserver
7. MergeObserver::this
MergeObserver(Observer<? super U> actual, Function<? super T, ? extends ObservableSource<? extends U>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize) {
this.downstream = actual;
this.mapper = mapper;
this.delayErrors = delayErrors;
this.maxConcurrency = maxConcurrency;
this.bufferSize = bufferSize;
if (maxConcurrency != Integer.MAX_VALUE) {
sources = new ArrayDeque<ObservableSource<? extends U>>(maxConcurrency);
}
this.observers = new AtomicReference<InnerObserver<?, ?>[]>(EMPTY);
}
重点关注几个地方:
- downStream:下游,这里就是我们最原始的订阅者
- mapper:就是之前我们给flatmap的映射函数
- maxConcurrency:就是我们构建ObservableFlatMap所传的值,Integer.MAX_VALUE
- observers:InnerObserver类型数组的一个原子引用
这里涉及到了InnerObserver,既然已经登场我们就先来简单了解下
8. InnerObserver
static final class InnerObserver<T, U> extends AtomicReference<Disposable>
implements Observer<U> {
private static final long serialVersionUID = -4606175640614850599L;
final long id;
final MergeObserver<T, U> parent;
volatile boolean done;
volatile SimpleQueue<U> queue;
int fusionMode;
InnerObserver(MergeObserver<T, U> parent, long id) {
this.id = id;
this.parent = parent;
}
这里暂时只需要知道,它是一个Observer对象,它有个parent指向MergeObserver,我们先回到第6步中source被MergeObserver订阅的地方。这里的source就是我们原始的观察源ObservableFromIterable,我们看看它的subscribe方法
9.ObservableFromIterable::subscribeActual
同样的,subscribe方法是它的基类方法,最后都会调用到子类的subscribeActual
@Override
public void subscribeActual(Observer<? super T> observer) {
Iterator<? extends T> it;
try {
it = source.iterator();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
EmptyDisposable.error(e, observer);
return;
}
boolean hasNext;
try {
hasNext = it.hasNext();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
EmptyDisposable.error(e, observer);
return;
}
if (!hasNext) {
EmptyDisposable.complete(observer);
return;
}
FromIterableDisposable<T> d = new FromIterableDisposable<T>(observer, it);
observer.onSubscribe(d);
if (!d.fusionMode) {
d.run();
}
}
整个流程很简单,source由第1步中知道它就是我们提供的数据源(在我们的实例中就是students),如果source有数据,则走接下来的三步:
- 构建FromIterableDisposable,一个Disposable对象,是onSubscribe方法的参数。它的构造参数有两个
- observer:这里就是MergeObserver,作为下游保存起来
- it:数据源的迭代器
- 调用MergeObserver的onSubscribe方法
如果FromIterableDisposable没有被消费,则执行它的run方法
小结,这里才开始真正的消费数据,两种情况:首先交给onSubscribe消费,如果没有被消费则通过FromIterableDisposable的run方法来消费
10. MergeObserver::onSubscribe
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
downstream.onSubscribe(this);
}
}
流程很简单,首先判断是否已经有了上游了,如果有了,则不做任何处理,否则参数d作为自己的上游,赋值给upStream,调用自己下游的onSubscribe方法。这里的downstream就是我们最原始的订阅者Consumer,由第5步可知,它被LambdaObserver代理了,我们看看LambdaObserver的onSubscribe方法
11. LambdaObserver::onSubscribe
public void onSubscribe(Disposable d) {
if (DisposableHelper.setOnce(this, d)) {
try {
onSubscribe.accept(this);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
d.dispose();
onError(ex);
}
}
}
首先保存了下d,然后调用它的onSubscribe的accept方法,由第5步可知,onSubscribe就是一个EmptyConsumer对象,看看EmptyConsumer的accept方法
12. EmptyConsumer::accept
static final class EmptyConsumer implements Consumer<Object> {
@Override
public void accept(Object v) { }
@Override
public String toString() {
return "EmptyConsumer";
}
}
accpet是一个空方法,所以onSubscribe并没有做消费数据的操作,所以FromIterableDisposable的fusionMode还是false,由第9步可知,会走到FromIterableDisposable::run方法,通过它来消费数据
小结,从以上onSubscribe流程可以看到,每个onSubscribe参数都是上游,执行体里面都是调用它的下游来消费数据,跟第6步的小结吻合
13. FromIterableDisposable::run
void run() {
boolean hasNext;
do {
if (isDisposed()) {
return;
}
T v;
try {
v = ObjectHelper.requireNonNull(it.next(), "The iterator returned a null value");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
downstream.onError(e);
return;
}
downstream.onNext(v);
if (isDisposed()) {
return;
}
try {
hasNext = it.hasNext();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
downstream.onError(e);
return;
}
} while (hasNext);
if (!isDisposed()) {
downstream.onComplete();
}
}
代码略长,简单梳理下流程
- 该任务是否已经取消了?是,则退出,否则判断是否有下一个数据,没有则到3,有则到2
- 调用下游的onNext,将数据传给下游,这里的downStream就是MergeObserver,再到1
- 调用下游的onComplete方法
还有一个错误处理的分支:当获取数据的过程中出错了,则会调用下游的onError
14. MergeObserver::onNext
@Override
public void onNext(T t) {
// safeguard against misbehaving sources
if (done) {
return;
}
ObservableSource<? extends U> p;
try {
p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
upstream.dispose();
onError(e);
return;
}
if (maxConcurrency != Integer.MAX_VALUE) {
synchronized (this) {
if (wip == maxConcurrency) {
sources.offer(p);
return;
}
wip++;
}
}
subscribeInner(p);
}
去掉一些判断,流程还是很简单的
- 通过映射函数mapper,将参数t转变成一个新的观察源ObservableSource对象
- maxConcurrency为Integer.MAX_VALUE,所以直接调用subscribeInner
15. MergeObserver::subscribeInner
void subscribeInner(ObservableSource<? extends U> p) {
for (;;) {
if (p instanceof Callable) {
if (tryEmitScalar(((Callable<? extends U>)p)) && maxConcurrency != Integer.MAX_VALUE) {
boolean empty = false;
synchronized (this) {
p = sources.poll();
if (p == null) {
wip--;
empty = true;
}
}
if (empty) {
drain();
break;
}
} else {
break;
}
} else {
InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
if (addInner(inner)) {
p.subscribe(inner);
}
break;
}
}
}
由我们的映射函数生成的ObservableSource是ObservableFromIterable,它没有实现Callable,所有走到了else分支,流程如下:
- 创建InnerObserver
- addInner,将每个InnerObserver添加到observers中
- 使用InnerObserver来对我们通过mapper创建的ObservableFromIterable进行订阅
从第8步中,我们知道每一个InnerObserver都有一个parent指向MergeObserver,留意这一点
16. MergeObserver::addInner
boolean addInner(InnerObserver<T, U> inner) {
for (;;) {
InnerObserver<?, ?>[] a = observers.get();
if (a == CANCELLED) {
inner.dispose();
return false;
}
int n = a.length;
InnerObserver<?, ?>[] b = new InnerObserver[n + 1];
System.arraycopy(a, 0, b, 0, n);
b[n] = inner;
if (observers.compareAndSet(a, b)) {
return true;
}
}
}
将每一个InnerObserver存放到observers当中,其实逻辑很简单,但是这里做了很多同步操作
由第9步可知,对ObservableFromIterable进行订阅,首先会回调订阅者的onSubscribe来消费数据,我们看看InnerObserver的onSubscribe
17. InnerObserver::onSubscribe
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.setOnce(this, d)) {
if (d instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable<U> qd = (QueueDisposable<U>) d;
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
if (m == QueueDisposable.SYNC) {
fusionMode = m;
queue = qd;
done = true;
parent.drain();
return;
}
if (m == QueueDisposable.ASYNC) {
fusionMode = m;
queue = qd;
}
}
}
}
流程如下
- 一样的,保存自己的上游
- 判断d是否是QueueDisposable,而FromIterableDisposable是QueueDisposable的子类,所以成立
- 调用FromIterableDisposable::requestFusion,这里传参是ANY | BOUNDARY
- 如果requestFusion返回SYNC则会走parent.drain方法,这里的parent就是MergeObserver,另外有以下几个赋值
- fusionMode = SYNC
- done = true
- queue = qd,即queue指向了FromIterableDisposable,也就是它的上游,随后会通过queue从它的上游获取数据
18. FromIterableDisposable::requestFusion
@Override
public int requestFusion(int mode) {
if ((mode & SYNC) != 0) {
fusionMode = true;
return SYNC;
}
return NONE;
}
由传参ANY | BOUNDARY,得到返回就是SYNC,故会走到MergeObserver::drain方法,另外这里讲标志位fusionMode标记为true了,即该FromIterableDisposable已经被消费了,它的FromIterableDisposable::run将不再被执行
19. MergeObserver::drain
void drain() {
if (getAndIncrement() == 0) {
drainLoop();
}
}
做了多线程保护,然后直接调用drainLoop
20. MergeObserver::drainLoop
void drainLoop() {
final Observer<? super U> child = this.downstream;
int missed = 1;
for (;;) {
if (checkTerminate()) {
return;
}
//这里queue没有赋值,所有为空
SimplePlainQueue<U> svq = queue;
if (svq != null) {
for (;;) {
U o;
for (;;) {
if (checkTerminate()) {
return;
}
o = svq.poll();
if (o == null) {
break;
}
child.onNext(o);
}
if (o == null) {
break;
}
}
}
//done为false
boolean d = done;
svq = queue;
InnerObserver<?, ?>[] inner = observers.get();
int n = inner.length; //n至少为1
int nSources = 0;
if (maxConcurrency != Integer.MAX_VALUE) {
synchronized (this) {
nSources = sources.size();
}
}
if (d && (svq == null || svq.isEmpty()) && n == 0 && nSources == 0) {
Throwable ex = errors.terminate();
if (ex != ExceptionHelper.TERMINATED) {
if (ex == null) {
child.onComplete();
} else {
child.onError(ex);
}
}
return;
}
//所以我们可以从这里开始分析,以下一段代码是确定下一个需要处理的InnerObserver的lastIndex,lastId,记为流程1
boolean innerCompleted = false;
if (n != 0) {
long startId = lastId;
int index = lastIndex;
if (n <= index || inner[index].id != startId) {
if (n <= index) {
index = 0;
}
int j = index;
for (int i = 0; i < n; i++) {
if (inner[j].id == startId) {
break;
}
j++;
if (j == n) {
j = 0;
}
}
index = j;
lastIndex = j;
lastId = inner[j].id;
}
//找到以下一个待处理的InnerObserver后,则开始从index处开始遍历依次向后处理observers中未处理的InnerObserver,记为流程2
int j = index;
sourceLoop:
for (int i = 0; i < n; i++) {
if (checkTerminate()) {
return;
}
@SuppressWarnings("unchecked")
InnerObserver<T, U> is = (InnerObserver<T, U>)inner[j];
for (;;) {
if (checkTerminate()) {
return;
}
SimpleQueue<U> q = is.queue;
if (q == null) {
break;
}
U o;
for (;;) {
try {
o = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
is.dispose();
errors.addThrowable(ex);
if (checkTerminate()) {
return;
}
removeInner(is);
innerCompleted = true;
i++;
continue sourceLoop;
}
if (o == null) {
break;
}
child.onNext(o);
if (checkTerminate()) {
return;
}
}
if (o == null) {
break;
}
}
boolean innerDone = is.done;
SimpleQueue<U> innerQueue = is.queue;
if (innerDone && (innerQueue == null || innerQueue.isEmpty())) {
removeInner(is);
if (checkTerminate()) {
return;
}
innerCompleted = true;
}
j++;
if (j == n) {
j = 0;
}
}
lastIndex = j;
lastId = inner[j].id;
}
if (innerCompleted) {
if (maxConcurrency != Integer.MAX_VALUE) {
ObservableSource<? extends U> p;
synchronized (this) {
p = sources.poll();
if (p == null) {
wip--;
continue;
}
}
subscribeInner(p);
}
continue;
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
鉴于代码比较长,代码有些地方我做了注释,由于判断条件都不满足,所以我们只用看代码中标注的两个流程
-
流程1
n不为0,进入if分支,流程如下- 获取上次记录的待处理的InnerObserver的id以及index
- 如果observers中的InnerObserver中的个数减少了或者lastIndex对应的InnerObserver发生变化了,则到3,否则已经找到
- 分两种情况:
- 如果observers中的InnerObserver的个数减少了,则需要重置index,重头开始找到待处理的InnerObserver,重新赋值给lastId以及lastIndex
- 如果observer中的innerObserver的个数没有减少,但是最之前lastIndex对应的InnerObserver已经发生了变化,则需要从index处重新找到待处理的InnerObserver,如果没有找到,则重头开始找
-
流程2
通过index取出需要处理的InnerObserver,拿到InnerObserver中的queue,开始遍历
queue中的数据,由第17步中,可知这个queue就是FromIterableDisposable,FromIterableDisposable中有一个迭代器,queue通过其中的迭代器不断的获取数据通过onNext回调传给它的下游,这里就是我们最原始的订阅者。当前的queue处理完后,外层循环会因为o == null而退出,随后就是赋值lastIndex与lastId,也就是下一个即将要被处理的InnerObserver。
小结,MergeObserver通过drainLoop方法将我们通过mapper创建出来的观察源一一创建一个内部订阅者,也就是InnerObserver,由InnerObserver来消费这些观察源,而InnerObserver又持有一个MergeObserver的引用,而MergeObserver又持有它的下游,也就是我们最原始的订阅者,这样我们就可以把由mapper创建的观察源的数据传给最终的订阅者
总结
- RxJava中有大量的服务于多线程的代码,从flatmap的源码分析总可以看出来,很多代码都是了线程同步,保证线程安全。
- flatmap通过在MergeObserver中创建一系列的InnerObserver,在同一层级对所有通过mapper创建的观察源进行消费,然后通过内部订阅者持有的parent即MergeObserver将数据派发给MergeObserver下游。虽然这里的InnerObserver并没有显式的指定它的下游,但是它的下游从它的parent也就是MergeObserver“继承了”。
- RxJava是基于数据流的,分析它的代码一定要重点关注数据的流动,它的数据加工过程可以看成责任链模式的运用,它的数据分发则是观察者模式与代理模式的运用。
- 数据加工的链是通过subscribe方法链接起来的,而数据的分发则是通过onSubscribe方法来完成的
网友评论