转载请以链接形式标明出处:
本文出自:103style的博客
本文基于 RxJava 2.x
版本
create操作符例子:
Observable
.create(new ObservableOnSubscribe<Object>() {
@Override
public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
}
})
.subscribe(new Observer<Object>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Object o) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
首先我们看create
方法:
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
RxJavaPlugins
类的 onAssembly
方法:
static volatile Function<? super Observable, ? extends Observable> onObservableAssembly;
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
在源码中查看引用可知 onObservableAssembly
只有在测试的时候才不为 null
。
所以Observable.create(ObservableOnSubscribe<T> source)
实际上就是返回了 ObservableCreate
对象
ObservableCreate
类,可以看到 ObservableCreate
是 Observable
的子类,并实现了父类的 subscribeActual
方法。
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {...}
...
}
然后我们看subscribe
方法: 实际上是调用了 Observable
的抽象方法 subscribeActual(observer);
public final void subscribe(Observer<? super T> observer) {
...
subscribeActual(observer);
...
}
protected abstract void subscribeActual(Observer<? super T> observer);
又因为 create
操作符返回的 ObservableCreate
是 Observable
的子类,
所以实际上调用的是ObservableCreate
的 subscribeActual(observer);
ObservableCreate
的 subscribeActual(observer)
方法:
- 首先创建了
CreateEmitter
对象, - 然后调用了
subscribe
方法传进来的Observer
对象的onSubscribe()
方法 - 然后调用了
create
操作符 传进来的ObservableOnSubscribe
对象的subscribe(ObservableEmitter<T> emitter)
方法
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
因为 CreateEmitter
类实现了 ObservableEmitter<T>
和 Disposable
接口,
所以我们可以在 create
操作符 传进来的 ObservableOnSubscribe
对象的 subscribe(ObservableEmitter<T> emitter)
方法里调用onNext
、onError
、onComplete
等方法。
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
...
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
...
}
ObservableEmitter
接口:
public interface ObservableEmitter<T> extends Emitter<T> {
void setDisposable(@Nullable Disposable d);
void setCancellable(@Nullable Cancellable c);
boolean isDisposed();
ObservableEmitter<T> serialize();
boolean tryOnError(@NonNull Throwable t);
}
public interface Emitter<T> {
void onNext(@NonNull T value);
void onError(@NonNull Throwable error);
void onComplete();
}
Disposable
接口:
public interface Disposable {
void dispose();
boolean isDisposed();
}
因为CreateEmitter
又重写了onNext
、onError
、onComplete
等方法。
所以 create
操作符 传进来的 ObservableOnSubscribe
对象的 subscribe(ObservableEmitter<T> emitter)
方法里调用onNext
、onError
、onComplete
等方法实际上调用了 CreateEmitter
的 onNext
、onError
、onComplete
等方法。
CreateEmitter
的 onNext
、onError
、onComplete
方法:
- 对
onNext
、onError
传进来的值做了空判断。 - 如果
!isDisposed()
则继续执行observer
对象的onNext
、onError
、onComplete
等方法。
(observer
对象为create
操作符 之后的subscribe()
方法传进来的Observer<T>
对象) - 并在
onComplete
和onError
方法最后执行dispose()
方法。
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("..."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
@Override
public void onError(Throwable t) {
if (!tryOnError(t)) {
RxJavaPlugins.onError(t);
}
}
@Override
public boolean tryOnError(Throwable t) {
if (t == null) {
t = new NullPointerException("...");
}
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
dispose();
}
return true;
}
return false;
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete();
} finally {
dispose();
}
}
}
接下来我们来看 CreateEmitter
的 dispose()
和 isDisposed()
方法
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
继续看 get()
方法,看下面代码可知 get()
返回的是一个 Disposable
对象
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {...}
public class AtomicReference<V> implements Serializable {
private volatile V value;
public AtomicReference(V var1) {
this.value = var1;
}
public AtomicReference() {
}
public final V get() {
return this.value;
}
继续看 DisposableHelper
的 isDisposed(Disposable d)
和 dispose(AtomicReference<Disposable> field)
方法
-
isDisposed(Disposable d)
则是判断d
是否和枚举值DISPOSED
相等。 -
dispose(AtomicReference<Disposable> field)
方法即是 将CreateEmitter
的isDisposed()
中调用get()
获取的对象赋值为DisposableHelper
的枚举值DISPOSED
。
所以调用dispose(AtomicReference<Disposable> field)
方法后,则理论上isDisposed(Disposable d)
即返回true
。
public enum DisposableHelper implements Disposable {
/**
* The singleton instance representing a terminal, disposed state, don't leak it.
*/
DISPOSED
;
public static boolean isDisposed(Disposable d) {
return d == DISPOSED;
}
...
public static boolean dispose(AtomicReference<Disposable> field) {
Disposable current = field.get();
Disposable d = DISPOSED;
if (current != d) {
current = field.getAndSet(d);
if (current != d) {
if (current != null) {
current.dispose();
}
return true;
}
}
return false;
}
...
}
DisposableHelper
类中 dispose(AtomicReference<Disposable> field)
,
当 field.getAndSet(d);
之后,如果 Disposable
对象的值还不等于 DISPOSED
,
则会调用current.dispose();
current
为 以下例子的 disposableObserver
对象。
DisposableObserver disposableObserver;
private void test() {
disposableObserver = Observable
.create(new ObservableOnSubscribe<Object>() {...})
.subscribeWith(new DisposableObserver<Object>() {...});
}
create
操作符 返回的是 ObservableCreate
,因为 ObservableCreate
未重写 subscribeWith
方法,所以调用的是 Observable
的 subscribeWith
方法:
public final <E extends Observer<? super T>> E subscribeWith(E observer) {
subscribe(observer);
return observer;
}
所以我们知到 disposableObserver
即为subscribeWith
传进来的 DisposableObserver
对象。
DisposableObserver
类:
public abstract class DisposableObserver<T> implements Observer<T>, Disposable {
final AtomicReference<Disposable> upstream = new AtomicReference<Disposable>();
@Override
public final void dispose() {
DisposableHelper.dispose(upstream);
}
}
我们可以看到 dispose()
方法继续调用了DisposableHelper.dispose(AtomicReference<Disposable> field);
所以我们是否可以得出结论,
dispose(AtomicReference<Disposable> field)
在设置值为DISPOSED
失败之后会一直重复调用直到设置成功为止?
网友评论