目标场景:
熟悉Promise的朋友应该知道Promise中我们可以通过next进行链式调用。在调用的任何一个环节如果出现错误可以直接发射error信号终止链式调用。RxJava的onNext的发射逻辑和Promise有一些不同,并且一个订阅事件不能再继续执行后续的异步调用。在某一些场景下,我们希望能够像Promise一样通过一系列的链式异步调用完成一些系统功能的流程。例如,如果目前有N个任务,我们希望能够对这N个任务进行顺序执行,上一个任务如果正常执行,则继续执行下一个任务,否则结束链式调用。
实现方案:
-
采用单个Flowable循环调用的方式。
Flowable.just(List<T> list)
.map(new Function<List<T>, O>(){
@Override
public O apply(List<T> list) throws Exception{
for (int m = 0; m < list.size(); m ++){
result = t.handle(); //伪方法示例
if (result!=SUCCESS){
throw new Exception(); //结束链式调用
}
}
return new O(); //返回一个结束消息
}
})
.subscribe(new Subscriber(){
...
@Override
void onNext(){
//这里处理返回结果
}
@Override
void onError(){
//这里处理错误
}
});
这种方案,怎么说呢,感觉和异步的结构不太一致,毕竟是采用循环的方式嵌套在一个map操作符里。
-
采用fromIterable
RxJava2 将RxJava1中的from拆解为若干个fromXXX操作符,对于容器,我们可以采用fromIterable。这个操作符是顺序执行每次遍历出来的内容,因此可以满足链式调用的方法。
Flowable.fromIterable(List<T> list)
.map(new Function<T, O>(){
@Override
public O apply(T t){
result = t.handle(); //伪方法示例
if (result!=SUCCESS){
throw new Exception(); //结束链式调用
}
}
})
.subscribe(...);
这种方式比上一种在结构上更加清晰。我们也可以把map操作符替换成flatmap操作符去实现嵌套形式的链式调用。
但是,方案1和2有一个共同的问题,则是所有的任务必须在执行前全部写入一个List。我们希望能够更加灵活的添加链式任务,这时候我们可以直接使用连续的map操作符。
-
采用连续map
Flowable.just(T t)
//任务1
.map(new Function<T, T>(){
@Override
public T apply(T t){
result = t.handle(); //伪方法示例
if (result!=SUCCESS){
throw new Exception(); //结束链式调用
}
}
})
//任务2
.map(new Function<T, T>(){
@Override
public T apply(T t){
result = t.handle(); //伪方法示例
if (result!=SUCCESS){
throw new Exception(); //结束链式调用
}
}
})
...
.subscribe(...);
-
更复杂的场景
如果每个链式任务执行成功后要执行一些不同的额外操作,一个subsriber订阅会有一些问题,RxJava2中提供了若干个doOnXXX操作符(doOnNext,doOnError,doOnComplete),原理上可以解决这个问题。如果采用fromIterable传入链式任务,我们需要使用flatmap为每个调用单独配置doOnXXX。
- 首先,把任务和额外的操作封装成一个Bundle
interface RxTask{
int task(Object o);
}
class RxBundle{
RxTask task;
RxTask sideTask;
}
- 然后,把任务通过List方式传入Flowable
Flowable.fromIterable(List<RxBundle> list)
.flatMap(new Function<RxBundle, Publisher<?>(){
@Override
public Publisher<?> apply(Bundle bundle){
final RxTask task = bundle.task;
final RxTask sideTask = bundle.sideTask
return Flowable.create(new FlowableOnSubscribe<Object>() {
@Override
public void subscribe(FlowableEmitter<Object> e) throws Exception {
int result = task.task();
if (result!=SUCCESS){
throw new Exception(); //结束链式调用
}
}
}, BackpressureStrategy.BUFFER))
.doOnNext(new Consumer(){
@Override
public void apply(Object o){
sideTask.task(); //这里执行额外操作
}
});
})
.subscribe(...);
这样我们最终的回调结果是发射到.subscribe()当中,并且每一个调用都有针对的处理。
目前尚未验证的是当某一个任务时扔出Exception整条链是否会停止执行。欢迎有经验的同学补充。
网友评论