美文网首页
(8)FlinkSQL自定义UDF

(8)FlinkSQL自定义UDF

作者: NBI大数据可视化分析 | 来源:发表于2022-08-12 09:40 被阅读0次

    (1)定义一个UDF

    package com.udf;
    
    import org.apache.flink.table.functions.ScalarFunction;
    
    /**
     * Created by lj on 2022-07-25.
     */
    public class TestUDF extends ScalarFunction {
        public String eval(String value) {
            return value + "_udf";
        }
    }
    

    (2)使用UDF

    public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
            DataStreamSource<String> streamSource = env.socketTextStream("127.0.0.1", 9999,"\n");
            SingleOutputStreamOperator<WaterSensor> waterDS = streamSource.map(new MapFunction<String, WaterSensor>() {
                @Override
                public WaterSensor map(String s) throws Exception {
                    String[] split = s.split(",");
                    return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
                }
            });
    
            // 将流转化为表
            Table table = tableEnv.fromDataStream(waterDS,
                    $("id"),
                    $("ts"),
                    $("vc"),
                    $("pt").proctime());
    
            tableEnv.createTemporaryView("EventTable", table);
    
    /*
            // 1. 直接调用自定义udf 函数
            //        table.select(call(myFunction.class,$("id"))).execute().print();
            // 2. 先注册在使用
            tableEnv.createTemporarySystemFunction("MyLength",myFunction.class);
            //2.1 在使用注册的自定义函数 名称为MyLength
            //        table.select(call("MyLength",$("id"))).execute().print();
            // 2.2 采用sql 的方式进行使用自定义函数
                tableEnv.sqlQuery("select id, MyLength(id) from "+table).execute().print();
    * */
    
            tableEnv.createTemporarySystemFunction("MyLength",TestUDF.class);
            Table result = tableEnv.sqlQuery(
                    "SELECT " +
                            "id as componentname, " +                //window_start, window_end,
                            "COUNT(ts) as componentcount ,SUM(ts) as componentsum, " +
                            "MyLength(cast(COUNT(ts) as string)) as testudf " +
                            "FROM TABLE( " +
                            "TUMBLE( TABLE EventTable , " +
                            "DESCRIPTOR(pt), " +
                            "INTERVAL '10' SECOND)) " +
                            "GROUP BY id , window_start, window_end"
            );
    
            tableEnv.toRetractStream(result, Row.class).print("toRetractStream");       //缩进模式
    
            env.execute();
        }
    

    (3)应用效果


    1.png

    相关文章

      网友评论

          本文标题:(8)FlinkSQL自定义UDF

          本文链接:https://www.haomeiwen.com/subject/wkkwwrtx.html