通常而言,Rx如果遇到if条件语句、switch case语句时需要先选择分支条件,然后再进行链式调用。RxCondition产生的目的就是为了在这些情况下也能顺利地使用链式调用。
我在查找RxJava的条件、布尔操作符时,没有找到符合我需求的操作符。于是,我在网上找到了RxJavaComputationExpressions, 做了一些修改将RxJava1升级到RxJava2,增加了对Flowable的支持。
下载安装
下载地址:
https://github.com/fengzhizi715/RxCondition
使用方法:
1.ifThen用法
if条件语句传统的写法:
Observable<String> observable = null;
if (flag) {
observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("this is true");
}
});
} else {
observable = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("this is false");
}
});
}
observable.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("s="+s);
}
});
使用了ifThen()以后的写法:
Statement.ifThen(new BooleanSupplier() {
@Override
public boolean getAsBoolean() throws Exception {
return flag;
}
}, Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("this is true");
}
}),Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("this is false");
}
})).subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("s="+s);
}
});
ifThen(BooleanSupplier condition, Observable<? extends R> then,Observable<? extends R> orElse),其中第一个Observable是条件为true时执行的,第二个Observable则是条件为false时执行。
使用lambda表达式,进一步简化写法:
Statement.ifThen(()->{
return flag;
},Observable.create((e)->{
e.onNext("this is true");
}),Observable.create((e)->{
e.onNext("this is false");
})).subscribe((Consume) (s) -> {System.out.println("s="+s)});
当然,ifThen也支持Flowable:
Statement.ifThen(()->{
return flag;
},Flowable.just("this is true"), Flowable.just("this is false"))
.subscribe((Consume) (s) -> {System.out.println("s="+s)});
2.switchCase用法
switch case语句传统的写法:
Flowable<String> flowable = null;
switch(type) {
case 0:
flowable = Flowable.just("this is 0");
break;
case 1:
flowable = Flowable.just("this is 1");
break;
case 2:
flowable = Flowable.just("this is 2");
break;
case 3:
flowable = Flowable.just("this is 3");
break;
default:
flowable = Flowable.just("this is default");
break;
}
flowable.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("s="+s);
}
});
使用了switchCase()以后的写法:
Map<Integer,Flowable<String>> maps = new HashMap<>();
maps.put(0,Flowable.just("this is 0"));
maps.put(1,Flowable.just("this is 1"));
maps.put(2,Flowable.just("this is 2"));
maps.put(3,Flowable.just("this is 3"));
Statement.switchCase(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return type;
}
},maps,Flowable.just("this is default"))
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("s="+s);
}
});
首先,将各个分支情况放入maps中。
其次,switchCase()的第一个参数是caseSelector,用于返回maps的key。最后一个参数是defaultCase,相当于switch case语句中的default语句。
改成lambda表达:
Map<Integer,Flowable<String>> maps = new HashMap<>();
maps.put(0,Flowable.just("this is 0"));
maps.put(1,Flowable.just("this is 1"));
maps.put(2,Flowable.just("this is 2"));
maps.put(3,Flowable.just("this is 3"));
Statement.switchCase((Callable)()-> {return type;},maps,Flowable.just("this is default"))
.subscribe((Consumer)(s) -> {System.out.println("s="+s);});
switchCase()中,第一个参数返回的是Map<K, R>中的key,它支持范型,所以switchCase()相对于switch case语句而已能够支持更多种类型。
原理
以ifThen为例,看看它某个方法是怎么运行的。
public static <R> Observable<R> ifThen(BooleanSupplier condition, Observable<? extends R> then,
Observable<? extends R> orElse) {
return RxJavaPlugins.onAssembly(new ObservableIfThen<R>(condition, then, orElse));
}
ifThen静态方法借助RxJavaPlugins.onAssembly()来生成一个新的Observable并返回。
ObservableIfThen是一个Observable的实现类。它的subscribeActual()方法是被订阅时真正执行的方法,用来衔接Observable和Observer(Subscriber)。在这里,subscribeActual()根据condition返回的bool值来判断是使用then还是使用orElse来做Observable。
import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.Observer;
import io.reactivex.functions.BooleanSupplier;
import io.reactivex.internal.disposables.EmptyDisposable;
/**
* Created by Tony Shen on 2017/5/9.
*/
final class ObservableIfThen<T> extends Observable<T> {
final BooleanSupplier condition;
final ObservableSource<? extends T> then;
final ObservableSource<? extends T> orElse;
ObservableIfThen(BooleanSupplier condition, ObservableSource<? extends T> then,
ObservableSource<? extends T> orElse) {
this.condition = condition;
this.then = then;
this.orElse = orElse;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
boolean b;
try {
b = condition.getAsBoolean();
} catch (Throwable ex) {
EmptyDisposable.error(ex, observer);
return;
}
if (b) {
then.subscribe(observer);
} else {
orElse.subscribe(observer);
}
}
}
switchCase的原理也是大同小异,区别在于订阅时需要根据key从map中取到对应的Observable/Flowable,取不到则使用defaultCase。
总结
这个库其实很简单,编写它的同时我加深了对RxJava中Observable/Flowable原理的认识。
网友评论
一个小意见:
1.RxJavaPlugins.onAssembly() 你不需要借助他来生成Observable. 这个Plugin默认是null的 也就是返回你的本身。和new 没什么区别。 更多时候是为了测试的时候方便。
2. switch 可以使用GroupBy 来解决。