直接上代码,以下代码为根据RxLifeCycle的实现提取出的一个完整的上游数据源到下游数据源所经过的历程
,注释已经写的差不多了
Ob
.compose(new ObservableTransformer<Object, Object>() {
@Override
public ObservableSource<Object> apply(Observable<Object> upstream) {
//一个subject既是Observable又是Observer
Observable<ActivityEvent> activityEventObservable = OpenAccountActivity.this.provideLifecycleSubject()
//Ob只会向最后订阅的Ov发送事件,调用share即可以
.share();
return upstream
//takeUntil有两种实现
// 一种是:当返回true时,就发送onComplete
// 另一种是:当(参数指定的)数据源发射事件时,发送onComplete
//这里采用第二种,同时借助filter操作符(控制数据源发送的事件是否会被拦截),实现当到达对应生命周期时发送onComplete的效果
.takeUntil(
//每当联合的数据源中,有数据源发送新的事件时,就会将所有的数据源发送的最新事件合并起来,发送一个新的事件
//这里利用这个特性,在每个生命周期方法的最后调用onNext发送生命周期事件
Observable.combineLatest(
//将订阅时的生命周期转化为停止发射事件的生命周期
activityEventObservable
.take(1)
.map(new Function<ActivityEvent, ActivityEvent>() {
@Override
public ActivityEvent apply(ActivityEvent activityEvent) throws Exception {
return activityEvent;
}
}),
//跳过订阅时的生命周期,当后续生命周期发送
activityEventObservable
.skip(1),
new BiFunction<ActivityEvent, ActivityEvent, Boolean>() {
@Override
public Boolean apply(ActivityEvent bindUntilEvent, ActivityEvent lifecycleEvent) throws Exception {
return lifecycleEvent.equals(bindUntilEvent);
}
})
//是否拦截事件
.filter(b -> b));
}
})
网友评论