简言
Completable只会关心onComplete和onError事件,要么成功要么失败
例子
CompletableOnSubscribe subscribe = new CompletableOnSubscribe() {
@Override
public void subscribe(CompletableEmitter emitter) throws Exception {
emitter.onComplete();
}
};
Completable.create(subscribe).subscribe(new CompletableObserver() {
@Override
public void onSubscribe(Disposable d) {
System.out.println(d.isDisposed());
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
@Override
public void onError(Throwable e) {
System.out.println(e.getMessage());
}
});
}
一个非常简单的demo,最终执行的是onComplete()方法
image.pngCompletable抽象类实现的 接口,从源码中看出,Completable只关心onComplete()和onError()方法
public interface CompletableObserver {
void onSubscribe(@NonNull Disposable d);
void onComplete();
void onError(@NonNull Throwable e);
源码分析
由于Completable是抽象类,没办法直接new ,但是这个类提供了大量的静态方法用于构造实例,我们先看一个create()方法
public static Completable create(CompletableOnSubscribe source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new CompletableCreate(source));
}
源码中直接构造CompletableCreate类
public final class CompletableCreate extends Completable {
final CompletableOnSubscribe source;
public CompletableCreate(CompletableOnSubscribe source) {
this.source = source;
}
@Override
protected void subscribeActual(CompletableObserver observer) {
Emitter parent = new Emitter(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
...
}
先看CompletableOnSubscribe,出现再CompletableCreate 构造方法里的source就是我们在开始demo里的subscribe对象,定义成source有数据源头的意思,数据的发射都是从source对象中发射出去,实际上是用CompletableEmitter 实例发射,CompletableEmitter 实例调用onComplete()方法,这样就可以在CompletableObserver中收到数据,我们看看数据流是怎么调用的。
public interface CompletableOnSubscribe {
void subscribe(@NonNull CompletableEmitter emitter) throws Exception;
}
CompletableOnSubscribe 接口中只有一个方法,emitter对象用于数据的发射控制
要想整个数据流可以用,就要先订阅,类似我们之前的观察者模式,
订阅完成后也不一定就可以收到数据,需要用emitter对象发射数据。
Completable 类里面有个重要的方法
public final void subscribe(CompletableObserver observer) {
...
subscribeActual(observer);
...
}
重要的是调用subscribeActual()方法,注意方法是protected ,只有子类才能调用,这样就把observer传递进来了。
protected abstract void subscribeActual(CompletableObserver observer);
好,我们回到CompletableCreate 源码,订阅完成后就会调用subscribeActual()方法
Emitter parent = new Emitter(observer);
observer.onSubscribe(parent);
方法里new了一个Emitter实例,我们传递进来的observer,调用了onSubscribe()方法
image.png
注意source.subscribe(parent);在observer.onSubscribe(parent);代码之后
这样我们可以在observer.onSubscribe(parent)方法调用后拿到emitter对象
控制数据流向是否可以直接停止。
当代码调用source.subscribe(parent);方法时,我们才能在下图中发射数据
image.png
接下来看看Emitter类,我们只看onComplete()方法
static final class Emitter
extends AtomicReference<Disposable>
implements CompletableEmitter, Disposable {
private static final long serialVersionUID = -2467358622224974244L;
final CompletableObserver downstream;
Emitter(CompletableObserver downstream) {
this.downstream = downstream;
}
@Override
public void onComplete() {
if (get() != DisposableHelper.DISPOSED) {
Disposable d = getAndSet(DisposableHelper.DISPOSED);
if (d != DisposableHelper.DISPOSED) {
try {
downstream.onComplete();
} finally {
if (d != null) {
d.dispose();
}
}
}
}
}
...
}
代码中将CompletableObserver实例传递进来,downstream对象就是
image.png
当我们在这里调用onComplete()方法时,就是直接到Emitter实例的onComplete()方法
image.png
Emitter是也是AtomicReference的子类,保准多线程调用的安全
public void onComplete() {
if (get() != DisposableHelper.DISPOSED) {
Disposable d = getAndSet(DisposableHelper.DISPOSED);
if (d != DisposableHelper.DISPOSED) {
try {
downstream.onComplete();
} finally {
if (d != null) {
d.dispose();
}
}
}
}
}
最终调用 downstream.onComplete();,然后整个数据dispose了。dispose之后将接收不到数据,但是任然可以发射数据。
网友评论