事件序列
简单使用:
Observable.just(1).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
Observable:
Observable<Integer> just = Observable.just(1);
是一个被观察者,看看just
做了什么:
public static <T> Observable<T> just(T item) {
ObjectHelper.requireNonNull(item, "item is null");
return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
onAssembly
参数new
了一个ObservableJust
,函数体只是创建一个钩子函数。查下ObservableJust
:
public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
private final T value;
public ObservableJust(final T value) {
this.value = value;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
ScalarDisposable<T> sd = new ScalarDisposable<T>(observer, value);
observer.onSubscribe(sd);
sd.run();
}
@Override
public T call() {
return value;
}
}
把发射的数据保存为value,然后重写subscribeActual
,记住这个函数,继续查看订阅发生了什么:
订阅:
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
一些判空,observer = RxJavaPlugins.onSubscribe(this, observer);
也是创建钩子,然后调用了subscribeActual
,这个才是订阅发生的真实事件,返回查看这个方法,一共有三部:
- new了
ScalarDisposable
对象, - 调用
observer.onSubscribe(sd)
, - 调用第一步new出来的对象的run方法,也就是
observer.onNext(value);
,如果发射完成还会调用observer.onComplete();
这是一个简单的订阅流程。上游发射数据,订阅的时候,observe
调用自己的方法。
数据转换
上面流程加了map
操作:
Observable.just(1)
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return String.valueOf(integer);
}
})
查看map
的实现:
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
和just几乎一样,只不过onAssembly
的参数是ObservableMap
。ObservableMap
和上面的ObservableJust
结构一样,只不过这里保存的不再是value
,而是一个Function
,我们需要关注的也是这个方法:
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
订阅的时候会调用这个方法,这里new了一个MapObserver
,并和上游完成订阅,在这里订阅的时候,依然会调用MapObserver的onNext
方法。而onNext
方法等于一个代理,调用了下游订阅的Observe
,也就是代码里写的那个Observe
:
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
downstream.onNext(v);
}
整个流程就是,把原来的Observable
转化为ObservableMap
,然后在订阅的时候,为ObservableMap
添加一个订阅者MapObserver
,接受上游的数据,在这个订阅者里面转化数据并回调给原来的Observe
。
disposed
取消订阅
取消分为以下三种情况:
- 无延迟无后续,
single.just
之类,只发射一个数据; - 有延迟无后续,
delay
之类,延迟发射一次数据; - 有延迟有后续,
interval
之类,轮询发射数据;
无延迟无后续
Single.just(1)
.subscribe(new SingleObserver<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(Integer integer) {
}
@Override
public void onError(Throwable e) {
}
});
这个订阅事件只发射一次,并且没有延迟,上面我们看过subscribeActual
,知道onSubscribe
的参数是哪一个,下面查看代码,看看取消做了什么:
@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
observer.onSubscribe(Disposables.disposed());
observer.onSuccess(value);
}
点进去查看disposed()
,返回的是EmptyDisposable.INSTANCE
,查看它的取消:
@Override
public void dispose() {
// no-op
}
什么都没做。因为整个事件只有一个数据,订阅就发射了,并没有后续事件。所以事件序列已经完成,不需要我们手动取消了。
有延迟不后续
Single.just(1)
.delay(1,TimeUnit.SECONDS)
.subscribe(new SingleObserver<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(Integer integer) {
}
@Override
public void onError(Throwable e) {
}
});
根据前面map
的代码分析,return delay(time, unit, Schedulers.computation(), false);
可是看到是自动切了线程,然后我们找到SingleDelay
,查看代码:
@Override
protected void subscribeActual(final SingleObserver<? super T> observer) {
final SequentialDisposable sd = new SequentialDisposable();
observer.onSubscribe(sd);
source.subscribe(new Delay(sd, observer));
}
SingleDelay
被下面的内部类Delay
订阅,这个SequentialDisposable
就是实际的订阅时候的Disposable
。它其实是个引用:
@Override
public void onSubscribe(Disposable d) {
sd.replace(d);
}
@Override
public void onSuccess(final T value) {
sd.replace(scheduler.scheduleDirect(new OnSuccess(value), time, unit));
}
@Override
public void onError(final Throwable e) {
sd.replace(scheduler.scheduleDirect(new OnError(e), delayError ? time : 0, unit));
}
在订阅的时候,replace
的是上游的Disposable
,因为这个方法是上游调用并赋值的,然后,在onSuccess
和onError
的时候,又一次replace
,也就是说,在订阅发生,数据还在延迟的时候,是取消的上游,在收到后,是取消的下游。
有延迟有后续
Observable.interval(1, TimeUnit.SECONDS)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
根据前面delay
的解析,先进入interval
函数查看,这里看到会自动切线程,然后一个新建的ObservableInterval
,进入查看:
@Override
public void subscribeActual(Observer<? super Long> observer) {
IntervalObserver is = new IntervalObserver(observer);
observer.onSubscribe(is);
Scheduler sch = scheduler;
if (sch instanceof TrampolineScheduler) {
Worker worker = sch.createWorker();
is.setResource(worker);
worker.schedulePeriodically(is, initialDelay, period, unit);
} else {
Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);
is.setResource(d);
}
}
第二行的observer
就是下游写的订阅者,onSubscribe
传递的参数就是第一行new的IntervalObserver
,那么它一定是个Disposable
,果不其然:
static final class IntervalObserver
extends AtomicReference<Disposable>
implements Disposable, Runnable {
但是看到这里,又疑惑了,这里类似线程的原子操作:
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
public static boolean dispose(AtomicReference<Disposable> field) {
Disposable current = field.get();
Disposable d = DISPOSED;
if (current != d) {
current = field.getAndSet(d);
if (current != d) {
if (current != null) {
current.dispose();
}
return true;
}
}
return false;
}
其实IntervalObserver
继承了AtomicReference<Disposable>
,又实现了Disposable
,也就说,它可以用AtomicReference
代理别的Disposable
,也可以用自己的Disposable
。它既可以歌手,也可以是个经纪人。
回到上面继续看subscribeActual
:
Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);
is.setResource(d);
第一行有一个d
,然后setResource
,这个d就是IntervalObserver
实际代理的Disposable
。第一行的代码执行了切换线程,和向下游发射数据,所以取消,也就是取消了上游的发射数据,让上游停止发射数据。再看它的另一个方法:
@Override
public void run() {
if (get() != DisposableHelper.DISPOSED) {
downstream.onNext(count++);
}
}
downstream
就是传是入的下游订阅者,在这里判断,如果DISPOSED
,就不调用downstream.onNext(count++);
,也就是下游接收不到了。
既取消了上游的发射,也取消了下游的接收。
网友评论