美文网首页
借助JDK8和RxJava如何让你的业务代码运行的更快

借助JDK8和RxJava如何让你的业务代码运行的更快

作者: 空山雪林 | 来源:发表于2018-12-07 16:36 被阅读0次

    背景

    微服务流行后,在我们项目开发过程中,一个服务经常会调用N个微服务,调用每个微服务可能需要几百毫秒,试想,一个复杂的业务如果要调用上百的微服务,如果各个服务同步执行,可能就需要花费好几秒,试想:这些服务为什么不能并行运行呢?

    一个复杂的计算任务,为什么不能分解成更小的任务单位,让他们并行运行呢?

    本文通过以上两个业务场景,比较各个实现方案的差异,在讲解之前,我们先来了解下本文提到的RxJava

    案例

    从一段最简单的服务开始:该服务需调用3个微服务,每个微服务费时250ms,三个微服务都获取数据后返回给前端(该微服务三个服务分别是商品详情,商品评论和推荐商品列表),如果按顺序执行,那么代码是这样的:

    public static void main(String[] args) throws Exception {
        long c = System.currentTimeMillis();
        System.out.println("顺序执行:");
        System.out.println(service("商品详情微服务")+service("商品评论微服务")+service("推荐商品微服务"));
        spendTime(c);
    }
    //模拟某个服务
    private static String service(String srvName){
        try {
            Thread.sleep(250);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return srvName+"\r\n";
    }
    private static void spendTime(long preTime) {
        System.out.println("花费:" + (System.currentTimeMillis() - preTime) + " 毫秒");
    }
    

    这段代码毫无疑问,打印输出:

    花费:781 毫秒

    改造一下,使用JDK8的CompletableFuture,3个微服务独立线程运行,都完成后通知主线程打印,代码如下:

    public static void main(String[] args) throws Exception {
            final long cc = System.currentTimeMillis(); 
        CompletableFuture<String> s1 = CompletableFuture.supplyAsync(() -> service("商品详情微服务"));
        CompletableFuture<String> s2 = CompletableFuture.supplyAsync(() -> service("商品评论微服务"));
        CompletableFuture<String> s3 = CompletableFuture.supplyAsync(() -> service("推荐商品微服务"));
        s1.thenCombine(s2, (i,j)->{
            return i+j;
        }).thenCombine(s3, (i,j)->{
            System.out.println("使用JDK8的并行编程:");
            System.out.println(i+j);
            spendTime(cc);
            return i+j;
        });
    }
    

    以上代码的执行结果取决于3个微服务中最长时间的那个服务,相比原先速度有明显提高:

    花费:311 毫秒

    那么以上的代码使用RxJava怎么来写呢?我们可以flatMap将服务分拆到各自独立线程中去执行,代码如下:

    private static String[] ss = {"商品详情微服务","商品评论微服务","推荐商品微服务"};
    public static void main(String[] args) throws Exception {
        Observable.range(0,3)
        .flatMap(new Function<Integer, ObservableSource<String>>() {
            @Override
            public ObservableSource<String> apply(Integer t) throws Exception {
                return Observable.just(t)
            .subscribeOn(Schedulers.from(Executors.newFixedThreadPool(1)))
                    .map(new Function<Integer, String>() {
                        @Override
                        public String apply(Integer t) throws Exception {
                            return service(ss[t]);
                        }
                    });
                }
            })
            .reduce((s1,s2)->s1+s2)
            .subscribe(s -> {
                System.out.println("Observable:\r\n" + s);
                spendTime(cc2);
            });
    }
    

    花费:455 毫秒

    RxJava模拟的针对每个数据项的并发操作调用时间上要比直接使用JDK8的API慢得多

    第二个业务场景是将复杂的计算进行拆分子计算任务,然后将每个任务计算合并成最终计算结果,以下直接给出所有源码,我们来看看几种计算方式在耗时上的不同,复杂计算任务是:对1到210000000开根号求总和

    package com.sumslack.rxjava;
    
    import java.util.Arrays;
    import java.util.concurrent.CompletableFuture;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import io.reactivex.Observable;
    import io.reactivex.ObservableSource;
    import io.reactivex.functions.BiFunction;
    import io.reactivex.functions.Function;
    import io.reactivex.schedulers.Schedulers;
    
    public class TestComputer {
        private static final int MAX_I = 210000000;
        
        private static void spendTime(long preTime) {
            System.out.println("花费:" + (System.currentTimeMillis() - preTime) + " 毫秒");
        }
        
        private static void spendTime(long preTime,String str) {
            System.out.println("[" + str + "] 花费:" + (System.currentTimeMillis() - preTime) + " 毫秒");
        }
        private static ExecutorService eService = Executors.newCachedThreadPool();
        public static void main(String[] args) throws Exception{
            
            int[] ss = new int[MAX_I];
            for(int i=1;i<=MAX_I;i++) {
                ss[i-1] = i;
            }
            
            
            long c = System.currentTimeMillis();
            System.out.println(xx(0,MAX_I));
            spendTime(c,"顺序执行");
    
            final long cc5 = System.currentTimeMillis();
            Observable.range(1, MAX_I).map(new Function<Integer, Double>() {
                @Override
                public Double apply(Integer t) throws Exception {
                    return Math.sqrt(t);
                }
            }).reduce((i,j)->i+j)
            .subscribeOn(Schedulers.computation())
            .subscribe(s -> {
                spendTime(cc5,"Observable直接算");
            });
            final long cc = System.currentTimeMillis();
            CompletableFuture<Double> cf1 = CompletableFuture.supplyAsync(() -> {
                return xx(0,MAX_I/2);
            });
            CompletableFuture<Double> cf2 = CompletableFuture.supplyAsync(() -> {
                return xx(MAX_I/2,MAX_I);
            });
            cf1.thenCombine(cf2,  (i,j)->{
                System.out.println(""+(i+j));
                spendTime(cc,"CompletableFuture");
                return i+j;
            });
                   
            //也可以用:CompletableFuture.allOf(cf1,cf2).join();
            c = System.currentTimeMillis();
            Double dd = Arrays.stream(ss).mapToDouble(d -> Math.sqrt(d)).reduce(0d,Double::sum);
            System.out.println(dd);
            spendTime(cc,"stream");
            
            c = System.currentTimeMillis();
            Double dd2 = Arrays.stream(ss).parallel().mapToDouble(d -> Math.sqrt(d)).reduce(0d,Double::sum);
            System.out.println(dd2);
            spendTime(cc,"parallel stream");
            
            final long cc2 = System.currentTimeMillis();
            Observable.fromArray(0,1,2)
            .flatMap(new io.reactivex.functions.Function<Integer,ObservableSource<Double>>(){
                @Override
                public ObservableSource<Double> apply(Integer t) throws Exception {
                    if(t%3==0) {
                        return Observable.just(t)
                            .subscribeOn(Schedulers.computation())
                            .map(new Function<Integer, Double>() {
                                @Override
                                public Double apply(Integer t) throws Exception {
                                    return xx(0,MAX_I/3);
                                }
                            });
                    }else if(t%3==1) {
                        return Observable.just(t)
                                .subscribeOn(Schedulers.computation())
                                .map(new Function<Integer, Double>() {
                                    @Override
                                    public Double apply(Integer t) throws Exception {
                                        return xx(MAX_I/3,MAX_I*2/3);
                                    }
                                });
                    }else {
                        return Observable.just(t)
                                .subscribeOn(Schedulers.computation())
                                .map(new Function<Integer, Double>() {
                                    @Override
                                    public Double apply(Integer t) throws Exception {
                                        return xx(MAX_I*2/3,MAX_I);
                                    }
                                });
                    }
                }
            })
            .reduce(new BiFunction<Double, Double, Double>() {
                @Override
                public Double apply(Double t1, Double t2) throws Exception {
                    return t1+t2;
                }
            })
            .subscribe( s->{
                System.out.println(s);
                spendTime(cc2,"Observable");
            });
            Thread.sleep(100000);
        }
        
        private static double xx(int start,int end) {
            double sum = 1;
            for(int i=start;i<end;i++) {
                sum += Math.sqrt(i+1);
            }
            return sum;
        }
    }
    

    以下是费时结果:

    [顺序执行] 花费:1086 毫秒
    [CompletableFuture] 花费:537 毫秒
    [stream] 花费:1028 毫秒
    [parallel stream] 花费:1305 毫秒
    [Observable] 花费:461 毫秒
    [Observable直接算] 花费:4265 毫秒

    这里使用 RxJava 进行计算任务分解求和是最快的,因为JDK8并发编程我们分解的是两个计算任务,而RxJava分解成3个所致!

    关于RxJava

    RxJavaReactive ExtensionsJava实现,通过使用Obserable/Flowable序列来构建异步和基于事件的程序的库,RxJava实现和扩展了观察者模式。

    RxJava基于响应式编程,是一种面向数据流和变化传播的编程范式。传统编程方式代码都是顺序执行的,而响应式编程是基于异步编程的,借助于CPU多核能力,提高运行效率,降低延迟和阻塞,基于数据流模型,如一个函数可作用与数据流中的每项,可变化传播。在响应式编程中,函数成为其第一等公民,同原型类型一样,函数可作用与参数,也可作为返回值。

    RxJava基于函数式编程,传统面向对象是通过抽象出对象关系来解决问题,函数式编程是通过函数的组合来解决问题。

    概念

    • Observable:被订阅者,比如在安卓开发中,可能是某个数据源,数据源的变化要通知到UI,那么UI就是Observer,被订阅者有冷热之分,热Observable无论有没有订阅者订阅,事件流始终发送,而冷Observable则只有订阅者订阅事件流才开始发送数据,它们之间是可以通过API相互转化的,比如使用publish可以冷->热,RefCount可以热->冷;
    • Observer:订阅者;

    RxJava编程

    • 被订阅者:用的做多的是Observable,如果要支持背压则使用Flowable,还可以使用Single(只要OnSuccess和onError,没有onComplete),Completable(创建后不发射任何数据,只有onComplete和onError)和Maybe(只发送0或1个数据);
    • 生命周期监听:Observable创建后可使用doXXX监听你说需要的生命周期回调;
    • 流的创建:create(使用一个函数从头创建),just(指定值创建,最多10个),fromXXX(基于X类创建),repeat(特定数据重复N次创建),defer(直到有订阅者订阅时才创建),interval(每隔一段时间创建一个数据发送),timer(延迟一段时间后发送数据);
    • RxJava线程模型: 内置多个线程控制器,包括single(定长为1的线程池),newThread(启动新线程执行),computation(大小为CPU核数线程池,一般用于密集型计算),io(适用IO操作),trampoline(直接在当前线程运行)和Schedulers.from(自定义);
    • 变化操作符:map(数据转型),flatMap(数据转某个Observable后合并发送),scan(每个数据应用一个函数,然后按顺序发送),groupBy(按Key分组拆分成多个Observable),buffer(打包发送),window,cast(强制转换类型);
    • 过滤操作:filter(按条件过滤),takeLast(只发送最后N个数据),last(只发送最后一个数据),lastOrDefault(只发送最后一个数据,为Null发送默认值),takeLastBuffer(将最后N个数据当做单个数据发送),skip(跳过N个发送),skipLast(跳过最后N个),take(只发送开始的N个数据),first,takeFirst(只发送满足条件的第一个数据),elementAt(只发送第N个数据),timeout(指定事件内没发送数据,就发送异常),distinct(去重),ofType(只发送特定类型的数据),ignoreElements(丢失所有正常数据,只发送错误或完成通知),sample(一段时间内,只处理最后一个数据),throttleFirst(一段时间内,只处理第一个数据),debounce(发送一个数据,开始计时,到了规定时间没有再发送数据,则开始处理数据);
    • 条件操作和布尔操作符:all(发送的数据是否都满足条件),contains(发送的数据是否包含某数据),amb(多个被订阅者数据发送只发送首次被订阅的那个数据流),defaultIfEmpty(如果原始被订阅者没有值,则发送一个默认值),sequenceEquals(判定两个数据流是否一样,返回true或false),skipUtil(直到符合条件才发送),skipWhile(直到条件不符合才开始发送),takeUntil(满足条件后不发送)和takeWhile(条件满足的一直发送);
    • 合并和连接操作符:merge(将多个被订阅数据流合并),zip(将多个数据流结合发送,返回数据流的数据个数是最少的那个),combineLastest(类似zip,任意被订阅者开始发送数据时即发送,而zip要每个被订阅者开始发送数据才发送),join(两个被订阅者结合合并,总数据项是M*N项),startWith(在数据序列开头插入指定项),connect,灵活控制发送数据规则可使用push,refCount,replay(保证所有订阅者收到相同数据);
    • 背压:被订阅者发送数据过快以至于订阅者来不及处理的情况;

    总结

    对于复杂计算,你可以将计算任务分解成N个子计算任务,交给多个线程处理并将结果合并后取得最终结果,对于服务业务的调用,你应该清楚,哪些子任务可以并行运行,哪些需要顺序执行,使用RxJava在代码上可能更加直观,也可以使用JDK8的CompletableFuture,其实JDK8的很多API参考了RxJava的实现,两者在写法上非常的类似,响应式编程相比传统代码的顺序执行在思路上有很大的不同,理解上也有一定的难度,希望通过本文让您全面了解函数式编程的实现思路。

    相关文章

      网友评论

          本文标题:借助JDK8和RxJava如何让你的业务代码运行的更快

          本文链接:https://www.haomeiwen.com/subject/idmwcqtx.html