已经很久没有对Android开发知识点进行学习了。这半年来除了公司的正经事要忙,还抽空把flutter和小程序学习了一遍。现在是时候重新对Android温故而知新了
RxJava2是一个面试环节必问的一个框架。万变不离其宗,你把源码流程思想都嚼烂了,害怕能问出什么幺蛾子呢?希望本篇文章能够帮到大家更好的理解RxJava2的源码,理解其设计思路
前言
RxJava2与RxJava1相比,基本上没有太大变化,但是多了一个背压策略。所以在分析RxJava2源码的时候,一定会多带上它。
那么我们话不多说,直接开始吧
RxJava2源码分析(无背压)
先来看一下RxJava2无背压版使用范例。通过范例的回调结果,我们可以更加轻松的了解整个RxJava2的实现流程
Observable.create(object : ObservableOnSubscribe<String> {
override fun subscribe(emitter: ObservableEmitter<String>) {
Log.d("Demo", "subscribe")
emitter.onNext("Hello")
emitter.onComplete()
}
}).subscribe(object : Observer<String> {
override fun onComplete() {
Log.d("Demo", "onComplete")
}
override fun onSubscribe(d: Disposable) {
Log.d("Demo", "onSubscribe")
}
override fun onNext(t: String) {
Log.d("Demo", "onNext $t")
}
override fun onError(e: Throwable) {
Log.d("Demo", "onError")
}
})
运行之后得到的结果如下
onSubscribe
subscribe
onNext Hello
onComplete
因此,我们可以从结果推断出事件处理顺序为:
-
Observable
被观察者的create()
先被触发 -
Observable
被观察者与Observer
观察者通过subscribe()
进行绑定关联 -
Observer
观察者的onSubscribe()
被触发 -
Observable
被观察者中的ObservableOnSubscribe
接口被触发,并执行了subscribe()
。ObservableEmitter
对象执行了onNext()
-
Observer
观察者的onNext()
被触发 -
ObservableEmitter
对象执行了onComplete()
-
Observer
观察者的onComplete()
被触发
了解到这个顺序之后,我们再来看源码,看一下我们的推断与源码描述是否一致
1. 观察者Observable
Observable
中包含RxJava2所有操作符的操作方法,但是最重要2个方法是create()
和subscribe()
。
先进入create()
。create()
虽然只有短短两行代码,但每一行都是阅读源码的关键
// create()将我们自定义的ObservableOnSubscribe对象作为参数传递进去
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
// requireNonNull()仅仅是一个非空检查,其实并不关键,但之所以说它很关键,是因为RxJava2源码中有很多地方都调用了这个方法。
// 你只要记得这个写法就行,以后不需要太关注
ObjectHelper.requireNonNull(source, "source is null");
// 返回ObservableCreate对象
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
入参ObservableOnSubscribe
正是串联Observable
与Observer
的关键,看名字就知道。
public interface ObservableOnSubscribe<T> {
void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}
重点在return
的对象了。先来看看RxJavaPlugins
的onAssembly()
,它的返回值即为我们传入的source
对象,即Observable
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
// 我们并未调用过`setOnObservableAssembly()`对`onObservableAssembly`进行赋值
// 因此`onObservableAssembly`的值为null
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
小结一下,Observable
被观察者通过create()
创建出来ObservableCreate
对象。持续的小结很重要,这是源码阅读的一个方法。ObservableCreate
我们稍后再说,继续把Observable
看完,接下来看下subscribe()
。subscribe()
就是观察者订阅被观察者的地方。
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
// 这里又出现requireNonNull了
ObjectHelper.requireNonNull(observer, "observer is null");
try {
// 包装一下,还是返回Observer
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
// 关键方法
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
} catch (Throwable e) {
}
}
这里有一个重要的抽象方法subscribeActual()
。无论是操作符控制还是线程调度都会涉及到各种subscribeActual()
。因此,当你看到subscribeActual()
的时候都要打起十二分精神。这里还是先放一下,等稍后再说
protected abstract void subscribeActual(Observer<? super T> observer);
2. 桥梁ObservableOnSubscribe
刚才我们没有对ObservableOnSubscribe
进行详细说明,现在补充一下。ObservableOnSubscribe
起到的是一个桥梁的作用,由它串联Observable
与Observer
,从而将ObservableEmitter
产生的事件交由Observer
。我们看下ObservableEmitter
,ObservableEmitter
继承自Emitter
public interface ObservableEmitter<T> extends Emitter<T>
Emitter
有几个我们很熟悉的方法,它们就是我们自定义ObservableOnSubscribe
中所用到的方法
public interface Emitter<T> {
void onNext(@NonNull T value);
void onError(@NonNull Throwable error);
void onComplete();
}
ObservableOnSubscribe
在后面会有闪亮的动作,先到这里结束
3. ObservableCreate
接下来重头戏就在这了。刚才我们已经知道,create()
方法最后返回的是ObservableCreate
对象。其构造方法接收一个自定义的ObservableOnSubscribe
对象
// 记住这个source,会有很多地方要用
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
该类里有一个重要的方法叫subscribeActual()
,这个方法实现了之前我们所说Observable
中的相应抽象方法。你可以回头再看下刚才的subscribe()
,这里的Observer
就是我们自定义的观察者。这样,ObservableCreate
本身作为Observable
,同时持有Observer
与ObservableOnSubscribe
的引用了
@Override
// 传入的是`Observer`观察者对象
protected void subscribeActual(Observer<? super T> observer) {
// 创建了CreateEmitter
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
// 执行了observer中的onSubscribe(),也就是上文所说的步骤三
observer.onSubscribe(parent);
try {
// 执行ObservableOnSubscribe中的subscribe(),入参为CreateEmitter对象,进入上文所说的步骤四
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
该方法首先创建一个CreateEmitter
对象,并将Observer
传递进去。那我们势必要知道CreateEmitter
的作用
4. CreateEmitter
CreateEmitter
类也实现了ObservableEmitter
接口,这就意味着它跟ObservableOnSubscribe
有某种关联。Observer
对象作为参数传递到其构造方法中。我们以onNext()
方法为例来进行说明
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
// CreateEmitter中的`onNext()`、`onError()`、`onComplete()`与`Observer`中的相应方法进行关联
// 这样,只要调用CreateEmitter中的上述三种方法,即可触发`Observer`中的同名方法
@Override
public void onNext(T t) {
// RxJava2不允许传递null类型数据
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
// 执行`Observer`中的`onNext()`
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
@Override
public void setDisposable(Disposable d) {
DisposableHelper.set(this, d);
}
@Override
public void setCancellable(Cancellable c) {
setDisposable(new CancellableDisposable(c));
}
@Override
public ObservableEmitter<T> serialize() {
return new SerializedEmitter<T>(this);
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
}
onNext()
、onError()
、onComplete()
与Observer
中的相应方法进行关联,这就意味着你触发ObservableEmitter
相关方法,也会同步触发Observer
的同名方法
小结一下。subscribeActual()
创建CreateEmitter
,将onNext()
、onError()
、onComplete()
与Observer
中的相应方法进行关联。之后就是调用Observer
的onSubscribe()
。随后通过source
调用ObservableOnSubscribe
的subscribe()
。当触发Emitter
的onNext()
的时候,同时触发Observer
的onNext()
,onComplete()
同理。这样,示意图中的第4、5、6、7步就有理可据,整个事件流程就被完整的串联起来了
5. 观察者Observer
`Observer的部分已经不是特别重要了,它具体的功能完全是我们自定义的
// `Observer`是一个接口,它有4个方法
public interface Observer<T> {
void onSubscribe(@NonNull Disposable d);
void onNext(@NonNull T t);
void onError(@NonNull Throwable e);
void onComplete();
}
最后,用一张UML图来总结一下RxJava2无背压版源码
1. Observable
通过create()
与subscribe()
两个方法将ObservableOnSubscribe
与Observer
收入囊中
2. 通过Observable
的create()
得到ObservableCreate
对象
3. 通过ObservableCreate
的subscribeActual()
方法得到CreateEmitter
对象
4. 得到CreateEmitter
对象的同时,触发Observer
的onSubscribe()
,并执行ObservableOnSubscribe
中的subscribe
,入参为CreateEmitter
5. 通过对CreateEmitter
中onNext()
、onError()
与onComplete()
的调用完成事件向Observer
同名方法的传递
RxJava2源码分析(有背压)
不支持背压的Observable
与支持背压的Flowable
事件流程基本一致,唯一不同之处在于后者多了背压策略。
我们依然从范例对事件流程进行回顾
Flowable.create(object : FlowableOnSubscribe<String> {
override fun subscribe(emitter: FlowableEmitter<String>) {
Log.d("Demo", "subscribe")
emitter.onNext("123")
emitter.onNext("456")
emitter.onNext("789")
emitter.onComplete()
}
}, BackpressureStrategy.DROP).subscribe(object : FlowableSubscriber<String> {
override fun onComplete() {
Log.d("Demo", "onComplete")
}
override fun onSubscribe(s: Subscription) {
Log.d("Demo", "onSubscribe")
s.request(2)
}
override fun onNext(t: String?) {
Log.d("Demo", "onNext $t")
}
override fun onError(t: Throwable?) {
Log.d("Demo", "onError")
}
})
运行之后得到的结果如下:
onSubscribe
subscribe
onNext 123
onNext 456
onComplete
看上去好像跟无背压版本差不多,但是细心的你肯定会发现:我明明传递了3个onNext()
事件,怎么最后就打印出来了2个?
那我们就继续通过对源码的学习,来看看这个问题到底是如何产生的
1. 观察者Flowable
Flowable
实现了Publisher
接口
public abstract class Flowable<T> implements Publisher<T>
Publisher
同样也有subscribe
方法。这里同无背压版在原理上是一致的
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
同样,重点还是在create()
与subscribe()
。create()
与之前无背压版不同,多了一个背压策略BackpressureStrategy
public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) {
return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
}
背压策略一共有5种,分别是MISSING
、ERROR
、BUFFER
、DROP
、LATEST
。
public enum BackpressureStrategy {
MISSING,
ERROR,
BUFFER,
DROP,
LATEST
}
RxJavaPlugins.onAssembly
这个就不用多说了,与之前一样。显然create()
方法返回值就是FlowableCreate
对象了。
我们依然稍后再说FlowableCreate
,因为它与无背压版的ObservableCreate
一样都很关键
看subscribe()
方法,其实总的来说,这个跟无背压版subscribe()
实际上也是一样的
public final void subscribe(FlowableSubscriber<? super T> s) {
ObjectHelper.requireNonNull(s, "s is null");
try {
// 将FlowableSubscriber进行包装
Subscriber<? super T> z = RxJavaPlugins.onSubscribe(this, s);
ObjectHelper.requireNonNull(z, "The RxJavaPlugins.onSubscribe hook returned a null FlowableSubscriber. Please check the handler provided to RxJavaPlugins.setOnFlowableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
// 将包装好的对象传递到实现subscribeActual抽象方法的类
subscribeActual(z);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
}
}
2. 桥梁FlowableOnSubscribe
看看它们的命名,与ObservableOnSubscribe
如出一辙,这里也不再多说了
public interface FlowableOnSubscribe<T> {
void subscribe(@NonNull FlowableEmitter<T> emitter) throws Exception;
}
3. FlowableCreate
这就是重点类,构造方法中传入FlowableOnSubscribe
和BackpressureStrategy
public FlowableCreate(FlowableOnSubscribe<T> source, BackpressureStrategy backpressure) {
this.source = source;
this.backpressure = backpressure;
}
重中之重依然是subscribeActual
方法。
@Override
public void subscribeActual(Subscriber<? super T> t) {
BaseEmitter<T> emitter;
// 根据不同的背压模式进行不同的处理
switch (backpressure) {
case MISSING: {
emitter = new MissingEmitter<T>(t);
break;
}
case ERROR: {
emitter = new ErrorAsyncEmitter<T>(t);
break;
}
case DROP: {
emitter = new DropAsyncEmitter<T>(t);
break;
}
case LATEST: {
emitter = new LatestAsyncEmitter<T>(t);
break;
}
default: {
emitter = new BufferAsyncEmitter<T>(t, bufferSize());
break;
}
}
// 后面的部分与`ObservableCreate`一致,都是调用`Subscriber`的onSubscribe以及`FlowableOnSubscribe`的`subscribe`
t.onSubscribe(emitter);
try {
source.subscribe(emitter);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
emitter.onError(ex);
}
}
emitter
参数我们虽然知道它的大概意思,但是毕竟涉及到背压策略,所以我们还是进去看一下。以背压丢弃策略DropAsyncEmitter
为例进行说明即可,因为流程上这5中策略差别不会很大
4. 背压丢弃策略DropAsyncEmitter
DropAsyncEmitter
继承自NoOverflowBaseAsyncEmitter
static final class DropAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {
private static final long serialVersionUID = 8360058422307496563L;
// downstream即为传入的Subscriber对象
DropAsyncEmitter(Subscriber<? super T> downstream) {
super(downstream);
}
@Override
void onOverflow() {
// nothing to do
}
}
NoOverflowBaseAsyncEmitter
则继承自BaseEmitter
abstract static class NoOverflowBaseAsyncEmitter<T> extends BaseEmitter<T>
BaseEmitter
又继承自FlowableEmitter
abstract static class BaseEmitter<T>
extends AtomicLong
implements FlowableEmitter<T>, Subscription
而FlowableEmitter
又继承自Emitter
public interface FlowableEmitter<T> extends Emitter<T>
最终都回到了Emitter
,因此总的来说背压版流程与无背压版是一致的。来看看区别在哪里,在NoOverflowBaseAsyncEmitter
类使用了AtomicLong
来确定事件下发的数量并确保计数的准确性
abstract static class NoOverflowBaseAsyncEmitter<T> extends BaseEmitter<T> {
private static final long serialVersionUID = 4127754106204442833L;
NoOverflowBaseAsyncEmitter(Subscriber<? super T> downstream) {
super(downstream);
}
@Override
public final void onNext(T t) {
if (isCancelled()) {
return;
}
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
// 这里是关键。因为BaseEmitter继承自AtomicLong,因此如果其数值为0,则不会触发downstream.onNext(t)
if (get() != 0) {
downstream.onNext(t);
BackpressureHelper.produced(this, 1);
} else {
onOverflow();
}
}
abstract void onOverflow();
}
有get()
的地方肯定就有add()
,进入BaseEmitter
类
@Override
public final void request(long n) {
if (SubscriptionHelper.validate(n)) {
// 这里就是设置可处理数量的地方
BackpressureHelper.add(this, n);
onRequested();
}
}
进入BackpressureHelper
类
public static long add(AtomicLong requested, long n) {
for (;;) {
// 获取当前`AtomicLong`中尚可发射事件的总量
long r = requested.get();
// 如果当前量为`Long.MAX_VALUE`,则直接返回
if (r == Long.MAX_VALUE) {
return Long.MAX_VALUE;
}
// 通过`addCap`将当前量与新增量相加重新设置AtomicLong的值
long u = addCap(r, n);
if (requested.compareAndSet(r, u)) {
return r;
}
}
}
有加就有减。刚才NoOverflowBaseAsyncEmitter
的onNext()
就是事件发出的时候进行AtomicLong
数量-1的操作
if (get() != 0) {
downstream.onNext(t);
BackpressureHelper.produced(this, 1);
} else {
onOverflow();
}
再次进入BackpressureHelper
类查看produced()
相关的代码
public static long produced(AtomicLong requested, long n) {
for (;;) {
long current = requested.get();
if (current == Long.MAX_VALUE) {
return Long.MAX_VALUE;
}
// 当前值-1
long update = current - n;
if (update < 0L) {
RxJavaPlugins.onError(new IllegalStateException("More produced than requested: " + update));
update = 0L;
}
// 重新赋值完成更新
if (requested.compareAndSet(current, update)) {
return update;
}
}
}
所以,我们的代码中request(2)
,前2个onNext()
会执行downstream.onNext(t)
,同时剩余量将会-1。当总量为0的时候,执行onOverflow
空操作
最后,用一张UML图来总结一下RxJava2有背压版源码
1. Flowable
通过create()
与subscribe()
两个方法将FlowableOnSubscribe
与FlowableSubscriber
收入囊中
2. 通过Flowable
的create()
得到FlowableCreate
对象
3. 通过FlowableCreate
的subscribeActual()
方法得到DropAsyncEmitter
对象
4. 得到DropAsyncEmitter
对象的同时,触发Subscriber
的onSubscribe()
,并执行FlowableOnSubscribe
中的subscribe
,入参为DropAsyncEmitter
5. 通过对DropAsyncEmitter
中onNext()
、onError()
与onComplete()
的调用完成事件向Subscriber
同名方法的传递
6. 在DropAsyncEmitter
触发onNext()
之前必须对其自身进行原子操作,设置背压事件发送的总数。每成功发送一次,总数减1,当总数为0,停止向Subscriber
发送onNext()
操作符
还是先来看一个范例
Observable.create<String> {
it.onNext("1")
it.onNext("2")
}.map(object : Function<String, Int> {
override fun apply(t: String): Int {
Log.d("Demo", "Function $t")
return t.toInt() + 100
}
}).subscribe {
Log.d("Demo", "onNext $it")
}
注意一下事件的打印顺序,map
操作符的参数Function
中的apply(t)
回调方法会比Observer
中的onNext()
提前返回
Function 1
onNext 101
Function 2
onNext 102
那操作符的执行流程是怎样的呢,我们又得从源码进行查看。所有操作符的执行流程应该是一致的,这里以map
操作符为例进行说明
1. ObservableMap
进入map()
方法。map()
的入参为Function
类,它的作用很简单,就是将T类型的数据通过apply()
执行之后得到R类型
public interface Function<T, R> {
R apply(@NonNull T t) throws Exception;
}
RxJava2中有很多以Function
为前缀的类,它们的命名因入参的个数不同而有所差异,但是它们的功能是一样的
继续往下看。通过之前的学习我们知道,map()
的返回值是ObservableMap
对象
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
ObservableMap
对象在初始化的时候完成了Observable
对象以及Function
对象的持有
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
}
注意这个source
对象,它在ObservableMap
初始化的时候传递给它的父类AbstractObservableWithUpstream
类。AbstractObservableWithUpstream
类是一个抽象类,它继承自Observable
。之前传入原始ObservableSource
对象可以通过source()
方法获取
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
protected final ObservableSource<T> source;
AbstractObservableWithUpstream(ObservableSource<T> source) {
this.source = source;
}
@Override
public final ObservableSource<T> source() {
return source;
}
}
到这里我们就明白,通过操作符操作后原Observable
就被转换成包含原Observable
与Function
的一个新对象ObservableMap
后面subscribe()
没啥好说的,跟上文描述无背压版都是一致的
重点来看看ObservableMap
的subscribeActual()
方法。在这里,当原始Observer
对象t
被送到ObservableMap
对象的时候,会发生一次拆合的动作。t
和function
被整合成一个新的MapObserver
对象,让这个新的Observer
重新被source
订阅
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
2. MapObserver
MapObserver
对象继承自BasicFuseableObserver
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
}
注意入参actual
,它是下游的Observer
对象。在本文范例中,它是我们自定义的原始Observer
对象,如果你下游还有其他操作符比如map()
,那么它可能是MapObserver
等其他类型的Observer
actual
被传递到而其父类BasicFuseableObserver
,BasicFuseableObserver
实现了Observer
接口
public abstract class BasicFuseableObserver<T, R> implements Observer<T>, QueueDisposable<R> {
protected final Observer<? super R> downstream;
protected Disposable upstream;
public BasicFuseableObserver(Observer<? super R> downstream) {
this.downstream = downstream;
}
public final void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
if (d instanceof QueueDisposable) {
this.qd = (QueueDisposable<T>)d;
}
if (beforeDownstream()) {
downstream.onSubscribe(this);
afterDownstream();
}
}
}
}
注意这里downstream
,它就是刚才我们所说的Observer
。所以这里对MapObserver
调用onSubscribe
的时候,通过downstream.onSubscribe(this);
调用的是原始Observer
中的onSubscribe
那么我们现在回过头来再看ObservableMap
中的subscribeActual()
。这里同样会与之前无背压版中所描述的那样,在ObservableCreate
中的subscribeActual()
创建CreateEmitter
对象,依次发射onNext()
、onError()
、onComplete()
,但是不同的是接收者为MapObserver
,所以我们要去MapObserver
中去看看这些事件是怎么被处理的
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
首先看看onNext()
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v;
try {
// 注意一下这里,这里执行了mapper.apply(t)并得到了返回值
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
// 调用传入的原始Observer中的onNext()
downstream.onNext(v);
}
@Override
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}
@Nullable
@Override
public U poll() throws Exception {
T t = qd.poll();
return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
}
}
这样一切就通了,将MapObserver
进行拆分,先执行Function
得到返回值,随后将返回值传递给原始Observer
。剩下的就没啥好说的了,又直接调用原始Observer
对象继续它的onNext()
方法
总结一下操作符map
的源码
1. 将原始Observable
与Function
包装成ObservableMap
2. 当执行ObservableMap
的subscribe()
方法时,会调用到其subscribeActual()
3. 对当前ObservableMap
进行拆分,将其中的Function
与Observer
进行整合得到MapObserver
,并交由Observable
进行订阅
4. 继续向上对操作符包装好的Observable
重复拆分直到原始Observable
为止
5. Emitter
触发的onNext()
等操作将传递给MapObserver
,MapObserver
的onNext()
中先执行Function
的apply()
,再将返回值通过MapObserver
中的Observer
向下发送直到原始Observer
为止
线程调度
线程调度这个概念我们再熟悉不过了,在使用RxJava2进行网络请求、文件流读写等情况下我们都会使用到。RxJava2提供了5种线程调度器,我们使用Schedulers.x
来声明我们所需要的线程类型
.subscribeOn(Schedulers.newThread()).observeOn(AndroidSchedulers.mainThread())
这里就出现了一个关键字Schedulers
1. Schedulers
说到Schedulers
,顾名思义就是多个Scheduler
的意思。Schedulers
中声明了5个静态变量,它们分别代表5种不同的线程调度器
@NonNull
static final Scheduler SINGLE;
@NonNull
static final Scheduler COMPUTATION;
@NonNull
static final Scheduler IO;
@NonNull
static final Scheduler TRAMPOLINE;
@NonNull
static final Scheduler NEW_THREAD;
这些静态变量在初始化的时候被赋值,他们本质上都是一个个的Task
static {
SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
IO = RxJavaPlugins.initIoScheduler(new IOTask());
TRAMPOLINE = TrampolineScheduler.instance();
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}
你可能会问:“AndroidSchedulers.mainThread()
到哪去了呢?”。别着急,它算Rx项目组专门为Android量身定做的一个线程调度器。我们稍后会讲到它
由于每种调度器实现原理都不同,所以我们选择相对容易理解的Schedulers.NEW_THREAD
来进行分析。Schedulers.NEW_THREAD
其实是NewThreadTask
,它实现了Callable<Scheduler>
接口。该接口有一个call()
的回调方法,返回的是NewThreadHolder.DEFAULT
对象
static final class NewThreadTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return NewThreadHolder.DEFAULT;
}
}
该对象也是Scheduler
类型,即NewThreadScheduler
static final class NewThreadHolder {
static final Scheduler DEFAULT = new NewThreadScheduler();
}
最后就是要找到调用call()
的地方了。回到Schedulers
类中定义NEW_THREAD
的地方。进入initNewThreadScheduler()
RxJavaPlugins.initNewThreadScheduler(new NewThreadTask())
callRequireNonNull()
的入参defaultScheduler
是NewThreadTask
类,不要忘记了
public static Scheduler initNewThreadScheduler(@NonNull Callable<Scheduler> defaultScheduler) {
ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null");
// 未给onInitNewThreadHandler赋值,f为null
Function<? super Callable<Scheduler>, ? extends Scheduler> f = onInitNewThreadHandler;
if (f == null) {
// defaultScheduler是NewThreadTask类
return callRequireNonNull(defaultScheduler);
}
return applyRequireNonNull(f, defaultScheduler);
}
最终我们在callRequireNonNull(defaultScheduler)
发现了我们一直想要知道的s.call()
。到这里NewThreadTask
里的call()
才被调用,NewThreadScheduler
对象创建完毕。还真是绕了一大圈
static Scheduler callRequireNonNull(@NonNull Callable<Scheduler> s) {
try {
return ObjectHelper.requireNonNull(s.call(), "Scheduler Callable result can't be null");
} catch (Throwable ex) {
throw ExceptionHelper.wrapOrThrow(ex);
}
}
到这里,一句话小结一下之前所描述的内容:我们所指定的线程调度器,其实都是不同的Scheduler
对象
2. Scheduler
下面就是真正的线程切换部分了,切换线程也就是切换不同的Scheduler
。我们分别对subscribeOn()
与observeOn()
进行学习。不过不要着急,我们先通过Scheduler
类来了解一下线程是如何进行调度的
Scheduler
是一个抽象类
public abstract class Scheduler
它有一个重要的抽象方法createWorker()
,顾名思义它是创建Worker
类的对象
public abstract Worker createWorker();
Worker
是负责执行线程调度的关键类。它是一个抽象类,实现了Disposable
接口
public abstract static class Worker implements Disposable
除此之外,Scheduler
类还有一个重要的方法是scheduleDirect()
,我们的事件就是在这里完成线程调度的
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
// 创建好Worker对象
final Worker w = createWorker();
// 选择好事件将要运行在的线程
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
// 对线程与Worker对象进行包装
DisposeTask task = new DisposeTask(decoratedRun, w);
// 执行线程调度
w.schedule(task, delay, unit);
return task;
}
3. scheduleDirect()解读
刚说了createWorker()
是一个抽象方法,所以我们从实现类NewThreadScheduler
的createWorker()
开始看。这里返回的是NewThreadWorker
对象
public Worker createWorker() {
return new NewThreadWorker(threadFactory);
}
NewThreadWorker
类继承Scheduler.Worker
类并实现Disposable
接口。这里面最重要的地方就是,在初始化的同时,创建了线程池ScheduledExecutorService
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
// 创建了线程池
private final ScheduledExecutorService executor;
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
}
回到Scheduler
类继续往下。通过RxJavaPlugins.onSchedule(run);
得到一个Runnable
。这个对象是什么,我们稍后揭开谜底
再往下,这个Runnable
对象与Worker
对象又被包装成DisposeTask
,但DisposeTask
本质上依然是一个Runnable
static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection
最后就是Worker
调用schedule()
。schedule()
又是Worker
中的一个抽象方法,我们来到实现类NewThreadWorker
public Disposable schedule(@NonNull final Runnable run) {
return schedule(run, 0, null);
}
public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (disposed) {
return EmptyDisposable.INSTANCE;
}
return scheduleActual(action, delayTime, unit, null);
}
跳转到scheduleActual()
。这里很明显了,传进来的Runnable
被包装为ScheduledRunnable
,它本质上依然是一个Runnable
。随后它被添加到线程池中进行执行
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
依然小结一下,线程调度器就是通过Scheduler
中的createWorker
创建的Worker
对象后,在调用Worker
中的schedule
方法来完成线程调度的
这么看来,剩下的问题就是Runnable
从哪里来了
4. subscribeOn()
我们先学习subscribeOn()
,之后再学习observeOn()
。毕竟两者有所差别
进入subscribeOn()
,这里不用说我们也知道,该方法返回了一个ObservableSubscribeOn
对象。ObservableSubscribeOn
类有2个入参:当前Observable
对象和线程调度器对象
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
ObservableSubscribeOn
继承自AbstractObservableWithUpstream
,这个我们在操作符里面已经说过了,重点就是source
对象
在ObservableSubscribeOn
类中我们又看到熟悉的声影subscribeActual()
。不过这个方法里面的内容有点复杂,我们细细的说
public void subscribeActual(final Observer<? super T> observer) {
// 得到包装后的Observer
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
// 执行Observer的onSubscribe()方法
observer.onSubscribe(parent);
// 执行scheduler中的`scheduleDirect()`进行线程调度
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
经过SubscribeOnObserver
类包装observer
得到的parent
对象实际仍然是Observer
对象。包装类SubscribeOnObserver
里面都是对传入进来的观察者Observer
的事件进行回调操作
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
// 此为传入的Observer
final Observer<? super T> downstream;
final AtomicReference<Disposable> upstream;
SubscribeOnObserver(Observer<? super T> downstream) {
this.downstream = downstream;
this.upstream = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable d) {
DisposableHelper.setOnce(this.upstream, d);
}
@Override
public void onNext(T t) {
downstream.onNext(t);
}
@Override
public void onError(Throwable t) {
downstream.onError(t);
}
@Override
public void onComplete() {
downstream.onComplete();
}
@Override
public void dispose() {
DisposableHelper.dispose(upstream);
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
回到ObservableSubscribeOn
类中的subscribeActual()
继续说。这里出现了之前所说的scheduleDirect()
,那么我们所需要的Runnable
就是SubscribeTask
对象了。
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
最终在那个线程池中执行的就是SubscribeTask
中的run()
里的代码source.subscribe(parent);
,即订阅关系的绑定。这样后续Observable
中的subscribeActual()
也就是上游事件都是在该线程中执行
总结一下subscribeOn()
里线程调度的流程:
1. 通过传入的不同的Schedule
来指定所使用的线程
2. 通过继承AbstractObservableWithUpstream
得到ObservableSubscribeOn
对象
3. 实现subscribeActual()
,并将包装好的Observer
通过scheduleDirect()
传递给Scheduler
4. 使用已选择的Scheduler
来创建Worker
,再将被观察者的事件流通过schedule()
放置在相应的线程中执行
5. observeOn()
看完了subscribeOn()
我们再来看看observeOn()
。总的来说,它们两的调度流程是类似的,毕竟传入的都是Scheduler
对象
进入observeOn()
。这个方法返回的对象是ObservableObserveOn
类,默认入参是我们声明的Scheduler
对象。其他参数跟线程配置有关系,不影响我们的主流程,我们可以暂时不管它
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
进入ObservableObserveOn
。又是一波熟悉的节奏,与之前ObservableSubscribeOn
如出一辙,那么真正做事情的地方自然而然又是subscribeActual()
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T>
subscribeActual()
基本上都是相似的操作:先创建Worker
,然后执行source.subscribe()
。
这里有一个非常关键的地方,source.subscribe()
绑定关系依旧会运行在之前subscribeOn()
指定的线程里,因为这里的worker
只跟observer
有关联,并没有和source
发生任何关系,也就是只有下游事件才会受到影响。
这个我将在最后用Demo进行详细的阐述
ObserveOnObserver
类包装了Observer
和Worker
,说明我们定义的线程调度器将在这里发挥作用。
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
ObserveOnObserver
又实现了Observer
和Runnable
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable
作为一个Observer
,自然就要注意它那些常规方法,重点看onNext()
、onError()
和onComplete()
。它们有一个共性:都有schedule()
方法。所以schedule()
就是线程调度的关键。
同时还要注意一下,事件发射的参数t
,被加入到队列queue
中了
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
error = t;
done = true;
schedule();
}
@Override
public void onComplete() {
if (done) {
return;
}
done = true;
schedule();
}
果不其然,schedule()
中出现了Worker
,而调用的那个this
,就是Runnable
。worker.schedule()
的出现说明线程切换在这里执行了。
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
直接看run()
,里面是一个判断。选择drainNormal()
看看
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
这里面最重要的就是a.onNext(v);
。a
就是Observer
,v
就是我们发射的那个泛型值。数值从队列中被取出并向下发射这个操作就这样运行在我们自定义的线程调度器中执行
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = downstream;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
disposed = true;
upstream.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
最后总结一下observeOn()
里线程调度的流程:
1. 通过传入的不同的Schedule
来指定所使用的线程
2. 通过继承AbstractObservableWithUpstream
得到ObservableObserveOn
对象
3. 实现subscribeActual()
4. 使用已选择的Scheduler
来创建Worker
,同时将Observer
与Worker
整合成一个新的ObserveOnObserver
对象来重新订阅之前的Observable
5. 在调用观察者onNext()
等方法时将相应的事件流通过schedule()
放置在相应的线程中执行
6. AndroidSchedulers
我们重点关注一下为什么使用AndroidSchedulers.mainThread()
可以实现切换到UI线程。这个线程调度器本质上是HandlerScheduler
,里面有一个重要的入参Handler
:new Handler(Looper.getMainLooper())
。我想只要你懂android开发,应该都明白了个大概
public final class AndroidSchedulers {
private static final class MainHolder {
static final Scheduler DEFAULT
= new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
}
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
}
进入HandlerScheduler
类,直接看schedule
。这里就不用多说了吧
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
// 包装handler跟run
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
// 创建Message对象
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
if (async) {
message.setAsynchronous(true);
}
// 使用handler发送message,handler运行环境为主线程
handler.sendMessageDelayed(message, unit.toMillis(delay));
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
7. 关于线程切换的Demo
请回答subscribe()
与onNext()
分别打印的是哪个线程?
Observable.create(ObservableOnSubscribe<String> { emitter ->
Log.d("Demo", "subscribe: " + Thread.currentThread().name)
emitter.onNext("2")
emitter.onComplete()
})
.subscribeOn(AndroidSchedulers.mainThread())
.map {
Log.d("Demo", "map: " + Thread.currentThread().name)
return@map "2$it"
}
.observeOn(Schedulers.io())
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.subscribe(object : Observer<String> {
override fun onComplete() {
}
override fun onSubscribe(d: Disposable) {
}
override fun onNext(t: String) {
Log.d("Demo", "onNext: " + Thread.currentThread().name)
}
override fun onError(e: Throwable) {
}
})
给出答案
subscribe: main
map: main
onNext: RxNewThreadScheduler-2
就像我们之前所说的,subscribeOn()
管的只是观察者与被观察者绑定的那个部分,所以自然而然是逐层向上,越靠近Observable
的那个线程策略才会生效。而observeOn()
面对的是观察者,自然而然是逐层向下,对下游产生影响。至于中间的那个map
操作符,是因为上游的emitter
运行在AndroidSchedulers.mainThread()
,在没有遇到observeOn()
改变策略前肯定是不会发生变化的
结合官网图片加深理解吧
RxJava2线程切换
网友评论