美文网首页@IT·互联网想法
利用java8 的 CompletableFuture 优化 F

利用java8 的 CompletableFuture 优化 F

作者: shengjk1 | 来源:发表于2024-05-23 16:42 被阅读0次

    你好,我是 shengjk1,多年大厂经验,努力构建 通俗易懂的、好玩的编程语言教程。 欢迎关注!你会有如下收益:

    1. 了解大厂经验
    2. 拥有和大厂相匹配的技术等

    希望看什么,评论或者私信告诉我!

    开头老规矩,先骂简书,直到图片可以正常展示为止:
    简书垃圾,快倒闭吧,要看图片的请转 https://blog.csdn.net/jsjsjs1789/article/details/139178436

    一、前言

    目前 Flink 利用 avatorscript 脚本语言,来做到规则的自动化更新。avatorscript将表达式直接翻译成对应的 java 字节码执行,所以在大数据量的情况下,自然而然这里就成为了瓶颈

    二、Flink 代码优化

    2.0 问题发现

    通过 Flink UI 发现 window 算子是瓶颈,而 window 算子的核心就是 avatorscript 表达式

    2.1 原有代码

    xxx
    AviatorEvaluator.execute(columnFunction, dataView.getProperties(), true);
    xxx
    

    经过测试平均执行时间在1毫秒以内,但经不住数据量大,所以Flink QPS一直在 11w 左右

    2.2 CompletableFuture 优化

    xxx
    List<CompletableFuture> executeFutures=new ArrayList<>();
    
    CompletableFuture<Object> executeFuture = CompletableFuture.supplyAsync(() -> {
                    return AviatorEvaluator.execute(columnFunction, dataView.getProperties(), true);
                });
    executeFutures.add(executeFuture);
    
    for (int i = 0; i < executeFutures.size(); i++) {
        executeFutures.get(i).get()
        xxxx
    }
    

    修改完上线后,Flink QPS 有原来 11W 增加到 17W 左右

    三、avatorscript 使用的简单介绍

    为了让你更容易理解 avatorscript,这里我们也可以先简单的介绍一下:

    3.1 自定义函数

    class AddFunction extends AbstractFunction {
        @Override
        public AviatorObject call(Map<String, Object> env,
    
                                  AviatorObject arg1, AviatorObject arg2) {
            Number left = FunctionUtils.getNumberValue(arg1, env);
            Number right = FunctionUtils.getNumberValue(arg2, env);
            return new AviatorDouble(left.intValue() + right.intValue());
        }
    
        public String getName() {
    
            return "add" ;
        }
    }
    
    
    public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {
        //注册函数
       AviatorEvaluator.addFunction(new AddFunction());
        System.out.println(AviatorEvaluator.execute( "add(2,1)" ));
    }
    

    3.2 从 Map 中取值

    public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {
        //注册函数
         AviatorEvaluator.addFunction(new AddFunction());
     HashMap<String, Object> stringObjectHashMap = new HashMap<>();
        stringObjectHashMap.put( "testId1" , 1);
        stringObjectHashMap.put( "testId2" , 2);
        Object execute = AviatorEvaluator.execute( "add(testId1,testId2)" , stringObjectHashMap);
    

    3.3 使用 Java 的工具类

    public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {
     HashMap<String, Object> stringObjectHashMap = new HashMap<>();
        stringObjectHashMap.put( "ip" , "a1111" );
        // stringObjectHashMap.put("result", "a&B&C&d");
     stringObjectHashMap.put( "voucher_endtime" , "2022.03.02 11:32" );
        stringObjectHashMap.put( "imei2" , "v1aaaaaa1" );
        stringObjectHashMap.put( "testId" , "v1ot_service_quality_1111" );
        stringObjectHashMap.put( "testId1" , "sku" );
        stringObjectHashMap.put( "a" , "123" );
        stringObjectHashMap.put( "a1" , "null" );
        stringObjectHashMap.put( "b1" , 123);
     
        AviatorEvaluator.addStaticFunctions( "doubleStatic" , Double.class);
        AviatorEvaluator.addInstanceFunctions( "doubleInstance" , Double.class)
    
     execute2 = AviatorEvaluator.execute( "(doubleStatic.valueOf(sys_net_bandwidth))" , stringObjectHashMap);
        System.out.println(execute2);
        execute2 = AviatorEvaluator.execute( "doubleInstance.longValue(doubleStatic.valueOf(sys_net_bandwidth)) " , stringObjectHashMap);
        System.out.println( "###" + execute2);
        execute2 = AviatorEvaluator.execute( "doubleInstance.longValue(doubleStatic.valueOf(str(voucher)))" , stringObjectHashMap);
    

    3.4 AviatorScript 函数

    ## examples/function.av
    fn add(x, y) {
      return x + y;
    }
    p(add(1,2))
    
    public static void main(String[] args) throws IllegalAccessException, NoSuchMethodException {
        String function = "## examples/function.av\n" +
                "\n" +
                "fn add(x, y) {\n" +
                "  return x + y;\n" +
                "}" ;
        AviatorEvaluator.defineFunction( "add" , function);
        System.out.println( "defineFunction6666================+" + AviatorEvaluator.execute( "add(1,2)" , stringObjectHashMap));
    }
    

    四、总结

    本文主要介绍了 Flink 中使用 avatorscript 脚本语言的问题,以及如何通过 CompletableFuture 优化代码来提高 Flink QPS。同时,还介绍了 avatorscript 的使用方法,包括自定义函数、从 Map 中取值、使用 Java 工具类和 AviatorScript 函数。通过本文的介绍,读者可以更好地了解 Flink 中 avatorscript 的使用方法,以及如何优化代码来提高 Flink QPS。

    相关文章

      网友评论

        本文标题:利用java8 的 CompletableFuture 优化 F

        本文链接:https://www.haomeiwen.com/subject/xqbtqjtx.html