谈谈对于响应式编程RxJava的理解 - 核心思想篇
谈谈对于响应式编程RxJava的理解 - 原理篇
源码分析
我们直接先看一个最简单的例子
Observable
.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> e) throws Exception {
e.onNext("A");
}
})
// .map(new Function<Object, Object>() {
//
// @Override
// public Object apply(Object o) throws Exception {
// return null;
// }
// })
.subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object o) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
map操作符我们现在先不看,先从最简单的流程开始,Observable不管用create 创建还是just创建其实内部原理是一样的,我们这里便于分析RxJava的整个流程所以用了create来创建。根据观察者模式的概念,我们将RxJava的源码分析过程从简单到难,拆分为三步进行:
- 1、Observer观察者的创建
- 2、Observable#create被观察者的创建
- 3、subscribe订阅关系的建立
第一步:Observer观察者的创建
public interface Observer<T> {
/**
* Provides the Observer with the means of cancelling (disposing) the
* connection (channel) with the Observable in both
* synchronous (from within {@link #onNext(Object)}) and asynchronous manner.
* @param d the Disposable instance whose {@link Disposable#dispose()} can
* be called anytime to cancel the connection
* @since 2.0
*/
void onSubscribe(@NonNull Disposable d);
/**
* Provides the Observer with a new item to observe.
* <p>
* The {@link Observable} may call this method 0 or more times.
* <p>
* The {@code Observable} will not call this method again after it calls either {@link #onComplete} or
* {@link #onError}.
*
* @param t
* the item emitted by the Observable
*/
void onNext(@NonNull T t);
/**
* Notifies the Observer that the {@link Observable} has experienced an error condition.
* <p>
* If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or
* {@link #onComplete}.
*
* @param e
* the exception encountered by the Observable
*/
void onError(@NonNull Throwable e);
/**
* Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
* <p>
* The {@link Observable} will not call this method if it calls {@link #onError}.
*/
void onComplete();
}
Observer其实属于观察者模式中的抽象观察者对象,就是一个接口,里面有onSubscribe开始订阅回调、onNext拿到事件回调、onError错误事件回调、onComplete完成事件回调的抽象方法,我们这里就是new一个这个Observer接口的匿名内部类。
第二步:Observable#create被观察者的创建
我们先看到Observable.create方法的调用
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
我们点进去RxJavaPlugins.onAssembly看看
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
一开始是一个对onObservableAssembly的判空操作,其实RxJava内部并没有用到,一开始onObservableAssembly初始化后就是空的,
setOnObservableAssembly(null);
按我的理解onObservableAssembly是提供给用户我们自己用的,我们可以重写
setOnObservableAssembly。例如:
RxJavaPlugins.setOnObservableAssembly(new Function<Observable, Observable>() {
@Override
public Observable apply(Observable observable) throws Exception {
//例如这里可以加入一下日志打印,可以查看全局哪些地方用到了RxJava
Log.d("RxJava",observable.toString());
return observable;
}
};
如果要重写setOnObservableAssembly的话,记得return返回observable,这样才可以继续RxJava的整个事件流,不然返回null肯定会报错的。
这里也不是我们分析的重点,我们再看到return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));这句代码,我们看到这里返回了一个ObservableCreate对象
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
ObservableCreate对象是继承自Observable,的我们先大致看下它的结果,具体先接着看第三步流程,后面再结合起来分析。
第三步:subscribe订阅关系的建立
直接进入subscribe的源码
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
subscribe方法里传进来的就是我们自定义的observer(就是自己写的匿名类),前面也是一些初始化和判空操作,我们主要看 subscribeActual(observer);这句代码。我们点进去发现是Observable里的一个抽象方法
protected abstract void subscribeActual(Observer<? super T> observer);
我们从第二步可以知道ObservableCreate对象是继承自Observable的,所以这里抽象方法subscribeActual的实现就是在ObservableCreate里,我们看到ObservableCreate里的subscribeActual方法
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
一开始将我们传进来的observer包装成一个发射器CreateEmitter对象parent,再调用观察者的observer.onSubscribe(parent);这里实现onSubscribe方法的实际上就是我们自己写的内部类里的onSubscribe方法,
image.png
我们看到onSubscribe方法里最终我们调用了发射器CreateEmitter的onNext方法,所以我们进去onNext方法看看
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
我们看到发射器里的onNext方法就是最终调用到我们自定义观察者observer的
onNext方法。
至此,RxJava最简单的订阅关系和收发数据的流程就通了,我们再通过一张简易的流程图总结下流程。
image.png那么现在我们再加入map操作符一起分析看看
Map操作符原理分析
我们对上面的代码进行简单的修改,并且加入map操作符
注意看代码注释
Observable
//ObservableCreate 将自定义source传进去
.create(
//自定义被观察者(起点)
new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("A");
}
})
//ObservableCreate.map
.map(new Function<String, Bitmap>() {
@Override
public Bitmap apply(String o) throws Exception {
return null;
}
})
//ObservableMap.subscribe
.subscribe(
//自定义观察者(终点)
new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object o) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
map.png
其实上面map操作符的作用就是从Observable起点开始,通过map操作符将String类型数据转化为Bitmap类型数据最后流向终点Observer。
经过之前的分析,我们已经知道Observable. create返回了一个ObservableCreate对象,这时我们如果再调用map操作符的话,就是对ObservableCreate进行了.map的操作了。
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
从源码可以看出.map操作返回一个ObservableMap,所以上面整个程序的最后一步调用会走到ObservableMap.subscribe。(具体可以看我代码的注释)
因为有前面的分析基础,现在我们直接进入subscribe订阅流程的代码
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
因为我们加入了map操作符,现在这里的subscribeActual(observer);就是调用的
ObservableMap的subscribeActual了,
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
这里,源码将自定义观察者Observer(终点),传入到MapObserver对象里,将自定义观察者Observer包装了一层,new了一个MapObserver对象出来。
而这里的 source.subscribe里的source就是上一层的Observable也就是ObservableCreate,所以这里调用了ObservableCreate的subscribe方法。因为不管是ObservableMap还是ObservableCreate都是继承自Observable
所以我们再进入Observable的subscribe方法
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
而这次 subscribeActual(observer);调用的是ObservableCreate的subscribeActual了,而且这次的observer观察者是自定义观察者经过包装之后的MapObserver对象,
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
observer.onSubscribe(parent);就是调用了我们自定义观察者(终点的)的onSubscribe, source.subscribe(parent);这句调用的就是自定义被观察者(起点)的subscribe方法,也就是我们上面流程分析的CreateEmitter发射器的onNext方法,
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
而此时的observer.onNext(t);的onNext方法不再是自定义observer的onNext方法了,而是先调用MapObserver的onNext方法
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}
而这一句代码 v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");就是真正执行变化的代码,也就是我们自己重写的map的apply方法里,最后再调用actual.onNext(v);方法返回整个终点的onNext(v);,到此执行完整个事件流。还是和之前一样,我再用一张流程图总结下。
image.png
网友评论