背景
微服务流行后,在我们项目开发过程中,一个服务经常会调用N个微服务,调用每个微服务可能需要几百毫秒,试想,一个复杂的业务如果要调用上百的微服务,如果各个服务同步执行,可能就需要花费好几秒,试想:这些服务为什么不能并行运行呢?
一个复杂的计算任务,为什么不能分解成更小的任务单位,让他们并行运行呢?
本文通过以上两个业务场景,比较各个实现方案的差异,在讲解之前,我们先来了解下本文提到的RxJava
。
案例
从一段最简单的服务开始:该服务需调用3个微服务,每个微服务费时250ms,三个微服务都获取数据后返回给前端(该微服务三个服务分别是商品详情,商品评论和推荐商品列表),如果按顺序执行,那么代码是这样的:
public static void main(String[] args) throws Exception {
long c = System.currentTimeMillis();
System.out.println("顺序执行:");
System.out.println(service("商品详情微服务")+service("商品评论微服务")+service("推荐商品微服务"));
spendTime(c);
}
//模拟某个服务
private static String service(String srvName){
try {
Thread.sleep(250);
} catch (InterruptedException e) {
e.printStackTrace();
}
return srvName+"\r\n";
}
private static void spendTime(long preTime) {
System.out.println("花费:" + (System.currentTimeMillis() - preTime) + " 毫秒");
}
这段代码毫无疑问,打印输出:
花费:781 毫秒
改造一下,使用JDK8的CompletableFuture
,3个微服务独立线程运行,都完成后通知主线程打印,代码如下:
public static void main(String[] args) throws Exception {
final long cc = System.currentTimeMillis();
CompletableFuture<String> s1 = CompletableFuture.supplyAsync(() -> service("商品详情微服务"));
CompletableFuture<String> s2 = CompletableFuture.supplyAsync(() -> service("商品评论微服务"));
CompletableFuture<String> s3 = CompletableFuture.supplyAsync(() -> service("推荐商品微服务"));
s1.thenCombine(s2, (i,j)->{
return i+j;
}).thenCombine(s3, (i,j)->{
System.out.println("使用JDK8的并行编程:");
System.out.println(i+j);
spendTime(cc);
return i+j;
});
}
以上代码的执行结果取决于3个微服务中最长时间的那个服务,相比原先速度有明显提高:
花费:311 毫秒
那么以上的代码使用RxJava怎么来写呢?我们可以flatMap
将服务分拆到各自独立线程中去执行,代码如下:
private static String[] ss = {"商品详情微服务","商品评论微服务","推荐商品微服务"};
public static void main(String[] args) throws Exception {
Observable.range(0,3)
.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer t) throws Exception {
return Observable.just(t)
.subscribeOn(Schedulers.from(Executors.newFixedThreadPool(1)))
.map(new Function<Integer, String>() {
@Override
public String apply(Integer t) throws Exception {
return service(ss[t]);
}
});
}
})
.reduce((s1,s2)->s1+s2)
.subscribe(s -> {
System.out.println("Observable:\r\n" + s);
spendTime(cc2);
});
}
花费:455 毫秒
RxJava模拟的针对每个数据项的并发操作调用时间上要比直接使用JDK8的API慢得多
第二个业务场景是将复杂的计算进行拆分子计算任务,然后将每个任务计算合并成最终计算结果,以下直接给出所有源码,我们来看看几种计算方式在耗时上的不同,复杂计算任务是:对1到210000000开根号求总和
package com.sumslack.rxjava;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.functions.BiFunction;
import io.reactivex.functions.Function;
import io.reactivex.schedulers.Schedulers;
public class TestComputer {
private static final int MAX_I = 210000000;
private static void spendTime(long preTime) {
System.out.println("花费:" + (System.currentTimeMillis() - preTime) + " 毫秒");
}
private static void spendTime(long preTime,String str) {
System.out.println("[" + str + "] 花费:" + (System.currentTimeMillis() - preTime) + " 毫秒");
}
private static ExecutorService eService = Executors.newCachedThreadPool();
public static void main(String[] args) throws Exception{
int[] ss = new int[MAX_I];
for(int i=1;i<=MAX_I;i++) {
ss[i-1] = i;
}
long c = System.currentTimeMillis();
System.out.println(xx(0,MAX_I));
spendTime(c,"顺序执行");
final long cc5 = System.currentTimeMillis();
Observable.range(1, MAX_I).map(new Function<Integer, Double>() {
@Override
public Double apply(Integer t) throws Exception {
return Math.sqrt(t);
}
}).reduce((i,j)->i+j)
.subscribeOn(Schedulers.computation())
.subscribe(s -> {
spendTime(cc5,"Observable直接算");
});
final long cc = System.currentTimeMillis();
CompletableFuture<Double> cf1 = CompletableFuture.supplyAsync(() -> {
return xx(0,MAX_I/2);
});
CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(() -> {
return xx(MAX_I/2,MAX_I);
});
cf1.thenCombine(cf2, (i,j)->{
System.out.println(""+(i+j));
spendTime(cc,"CompletableFuture");
return i+j;
});
//也可以用:CompletableFuture.allOf(cf1,cf2).join();
c = System.currentTimeMillis();
Double dd = Arrays.stream(ss).mapToDouble(d -> Math.sqrt(d)).reduce(0d,Double::sum);
System.out.println(dd);
spendTime(cc,"stream");
c = System.currentTimeMillis();
Double dd2 = Arrays.stream(ss).parallel().mapToDouble(d -> Math.sqrt(d)).reduce(0d,Double::sum);
System.out.println(dd2);
spendTime(cc,"parallel stream");
final long cc2 = System.currentTimeMillis();
Observable.fromArray(0,1,2)
.flatMap(new io.reactivex.functions.Function<Integer,ObservableSource<Double>>(){
@Override
public ObservableSource<Double> apply(Integer t) throws Exception {
if(t%3==0) {
return Observable.just(t)
.subscribeOn(Schedulers.computation())
.map(new Function<Integer, Double>() {
@Override
public Double apply(Integer t) throws Exception {
return xx(0,MAX_I/3);
}
});
}else if(t%3==1) {
return Observable.just(t)
.subscribeOn(Schedulers.computation())
.map(new Function<Integer, Double>() {
@Override
public Double apply(Integer t) throws Exception {
return xx(MAX_I/3,MAX_I*2/3);
}
});
}else {
return Observable.just(t)
.subscribeOn(Schedulers.computation())
.map(new Function<Integer, Double>() {
@Override
public Double apply(Integer t) throws Exception {
return xx(MAX_I*2/3,MAX_I);
}
});
}
}
})
.reduce(new BiFunction<Double, Double, Double>() {
@Override
public Double apply(Double t1, Double t2) throws Exception {
return t1+t2;
}
})
.subscribe( s->{
System.out.println(s);
spendTime(cc2,"Observable");
});
Thread.sleep(100000);
}
private static double xx(int start,int end) {
double sum = 1;
for(int i=start;i<end;i++) {
sum += Math.sqrt(i+1);
}
return sum;
}
}
以下是费时结果:
[顺序执行] 花费:1086 毫秒
[CompletableFuture] 花费:537 毫秒
[stream] 花费:1028 毫秒
[parallel stream] 花费:1305 毫秒
[Observable] 花费:461 毫秒
[Observable直接算] 花费:4265 毫秒
这里使用 RxJava
进行计算任务分解求和是最快的,因为JDK8并发编程我们分解的是两个计算任务,而RxJava分解成3个所致!
关于RxJava
RxJava
是 Reactive Extensions
的Java
实现,通过使用Obserable
/Flowable
序列来构建异步和基于事件的程序的库,RxJava实现和扩展了观察者模式。
RxJava
基于响应式编程,是一种面向数据流和变化传播的编程范式。传统编程方式代码都是顺序执行的,而响应式编程是基于异步编程的,借助于CPU多核能力,提高运行效率,降低延迟和阻塞,基于数据流模型,如一个函数可作用与数据流中的每项,可变化传播。在响应式编程中,函数成为其第一等公民,同原型类型一样,函数可作用与参数,也可作为返回值。
RxJava
基于函数式编程,传统面向对象是通过抽象出对象关系来解决问题,函数式编程是通过函数的组合来解决问题。
概念
-
Observable
:被订阅者,比如在安卓开发中,可能是某个数据源,数据源的变化要通知到UI,那么UI就是Observer,被订阅者有冷热之分,热Observable无论有没有订阅者订阅,事件流始终发送,而冷Observable则只有订阅者订阅事件流才开始发送数据,它们之间是可以通过API相互转化的,比如使用publish
可以冷->热,RefCount
可以热->冷; -
Observer
:订阅者;
RxJava编程
- 被订阅者:用的做多的是
Observable
,如果要支持背压则使用Flowable
,还可以使用Single
(只要OnSuccess和onError,没有onComplete),Completable
(创建后不发射任何数据,只有onComplete和onError)和Maybe
(只发送0或1个数据); - 生命周期监听:Observable创建后可使用doXXX监听你说需要的生命周期回调;
- 流的创建:create(使用一个函数从头创建),just(指定值创建,最多10个),fromXXX(基于X类创建),repeat(特定数据重复N次创建),defer(直到有订阅者订阅时才创建),interval(每隔一段时间创建一个数据发送),timer(延迟一段时间后发送数据);
- RxJava线程模型: 内置多个线程控制器,包括single(定长为1的线程池),newThread(启动新线程执行),computation(大小为CPU核数线程池,一般用于密集型计算),io(适用IO操作),trampoline(直接在当前线程运行)和Schedulers.from(自定义);
- 变化操作符:map(数据转型),flatMap(数据转某个Observable后合并发送),scan(每个数据应用一个函数,然后按顺序发送),groupBy(按Key分组拆分成多个Observable),buffer(打包发送),window,cast(强制转换类型);
- 过滤操作:filter(按条件过滤),takeLast(只发送最后N个数据),last(只发送最后一个数据),lastOrDefault(只发送最后一个数据,为Null发送默认值),takeLastBuffer(将最后N个数据当做单个数据发送),skip(跳过N个发送),skipLast(跳过最后N个),take(只发送开始的N个数据),first,takeFirst(只发送满足条件的第一个数据),elementAt(只发送第N个数据),timeout(指定事件内没发送数据,就发送异常),distinct(去重),ofType(只发送特定类型的数据),ignoreElements(丢失所有正常数据,只发送错误或完成通知),sample(一段时间内,只处理最后一个数据),throttleFirst(一段时间内,只处理第一个数据),debounce(发送一个数据,开始计时,到了规定时间没有再发送数据,则开始处理数据);
- 条件操作和布尔操作符:all(发送的数据是否都满足条件),contains(发送的数据是否包含某数据),amb(多个被订阅者数据发送只发送首次被订阅的那个数据流),defaultIfEmpty(如果原始被订阅者没有值,则发送一个默认值),sequenceEquals(判定两个数据流是否一样,返回true或false),skipUtil(直到符合条件才发送),skipWhile(直到条件不符合才开始发送),takeUntil(满足条件后不发送)和takeWhile(条件满足的一直发送);
- 合并和连接操作符:merge(将多个被订阅数据流合并),zip(将多个数据流结合发送,返回数据流的数据个数是最少的那个),combineLastest(类似zip,任意被订阅者开始发送数据时即发送,而zip要每个被订阅者开始发送数据才发送),join(两个被订阅者结合合并,总数据项是M*N项),startWith(在数据序列开头插入指定项),connect,灵活控制发送数据规则可使用push,refCount,replay(保证所有订阅者收到相同数据);
- 背压:被订阅者发送数据过快以至于订阅者来不及处理的情况;
总结
对于复杂计算,你可以将计算任务分解成N个子计算任务,交给多个线程处理并将结果合并后取得最终结果,对于服务业务的调用,你应该清楚,哪些子任务可以并行运行,哪些需要顺序执行,使用RxJava在代码上可能更加直观,也可以使用JDK8的CompletableFuture,其实JDK8的很多API参考了RxJava的实现,两者在写法上非常的类似,响应式编程相比传统代码的顺序执行在思路上有很大的不同,理解上也有一定的难度,希望通过本文让您全面了解函数式编程的实现思路。
网友评论