美文网首页
Flink(1.13) FlinkSql自定义函数

Flink(1.13) FlinkSql自定义函数

作者: 万事万物 | 来源:发表于2021-08-25 21:45 被阅读0次

    函数分类

    官网介绍
    Currently, Flink distinguishes between the following kinds of functions:

    • Scalar functions:标量函数将标量值映射到一个新的标量值。
    • Table functions:制表函数将标量值映射到新行(类似于列转行)。
    • Aggregate functions:聚合函数将多行标量值映射为新标量值。
    • Table aggregate functions:属于Table functionsAggregate functions功能的合并。
    • Async table functions:异步表函数是用于执行查找的表源的特殊函数。

    自定义 Scalar functions

    • 准备一个类
    import org.apache.flink.table.functions.ScalarFunction;
    
    /**
     * 字符串转换大写
     * @author admin
     * @date 2021/8/21
     */
    public class MyScalarFunctionByUppercase extends ScalarFunction {
    
    }
    
    • 异常:需要自定义eval方法。
    org.apache.flink.table.api.ValidationException: Function class 'com.admin.flink.demo12.function.MyScalarFunctionByUppercase' 
    does not implement a method named 'eval'.
    

    方法名必须叫eval,参数和返回值随意。

    public class MyScalarFunctionByUppercase extends ScalarFunction {
        public String eval(String words){
            return words.toUpperCase();
        }
    }
    
    • 应用
        @Test
        public void test1(){
            // 环境准备
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
            // 模拟数据
            DataStreamSource<String> source = env.fromElements("java", "google", "hello");
    
            // 给字段取名
            Table table = tableEnv.fromDataStream(source,$("words"));
    
            // 使用内联的方式
            table.select($("words"),call(MyScalarFunctionByUppercase.class,$("words")))
                    .execute()
                    .print();
        }
    
    • 查询
    +----+--------------------------------+--------------------------------+
    | op |                          words |                            _c1 |
    +----+--------------------------------+--------------------------------+
    | +I |                           java |                           JAVA |
    | +I |                         google |                         GOOGLE |
    | +I |                          hello |                          HELLO |
    +----+--------------------------------+--------------------------------+
    3 rows in set
    
    • 注册后再使用
            // 先注册再使用
            tableEnv.createFunction("toUppercase",MyScalarFunctionByUppercase.class);
    
            table.select($("words"),call("toUppercase",$("words")))
                    .execute()
                    .print();
    
    • 查询
    +----+--------------------------------+--------------------------------+
    | op |                          words |                            _c1 |
    +----+--------------------------------+--------------------------------+
    | +I |                           java |                           JAVA |
    | +I |                         google |                         GOOGLE |
    | +I |                          hello |                          HELLO |
    +----+--------------------------------+--------------------------------+
    3 rows in set
    
    • 在sql中使用,必须先注册
            // 给字段取名
            Table table = tableEnv.fromDataStream(source,$("words"));
    
            // 先注册再使用
            tableEnv.createFunction("toUppercase",MyScalarFunctionByUppercase.class);
    
            // 在sql中使用
            tableEnv.sqlQuery("select words,toUppercase(words) as upp_words from "+table)
            .execute()
            .print();
    
    • 查询
    +----+--------------------------------+--------------------------------+
    | op |                          words |                      upp_words |
    +----+--------------------------------+--------------------------------+
    | +I |                           java |                           JAVA |
    | +I |                         google |                         GOOGLE |
    | +I |                          hello |                          HELLO |
    +----+--------------------------------+--------------------------------+
    3 rows in set
    

    自定义 Table functions

    • 自定义函数
    /**
     * 行专列
     * 泛型:每行数据有多列
     * @FunctionHint 指定返回列的类型
     * @author admin
     * @date 2021/8/21
     */
    @FunctionHint(output = @DataTypeHint("row<w string,len int>"))
    public class MyTableFunctionByRowToColumn extends TableFunction<Row> {
    
        public void eval(String phrase){
    
            Arrays.stream(phrase.split(" ")).forEach(s -> {
                collect( Row.of(s,s.length()));
            });
        }
    
    }
    

    查询

    +----+--------------------------------+--------------------------------+-------------+
    | op |                         phrase |                              w |         len |
    +----+--------------------------------+--------------------------------+-------------+
    | +I |                   hello world! |                          hello |           5 |
    | +I |                   hello world! |                         world! |           6 |
    | +I |                     明天 你好! |                           明天 |           2 |
    | +I |                     明天 你好! |                          你好! |           3 |
    | +I |                      数码 宝贝 |                           数码 |           2 |
    | +I |                      数码 宝贝 |                           宝贝 |           2 |
    | +I |                   名侦探 柯南! |                         名侦探 |           3 |
    | +I |                   名侦探 柯南! |                          柯南! |           3 |
    +----+--------------------------------+--------------------------------+-------------+
    8 rows in set
    
    • 应用 table api 使用(内联)
        @Test
        public void test1(){
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            DataStreamSource<String> source = env.fromElements("hello world!", "明天 你好!", "数码 宝贝", "名侦探 柯南!");
    
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
            Table table = tableEnv.fromDataStream(source,$("phrase"));
    
            // 需求:将元数据炸开,
            table.joinLateral(call(MyTableFunctionByRowToColumn.class,$("phrase")))
                    .execute().print();
    
        }
    
    • 应用 sql 使用
        @Test
        public void test2(){
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            DataStreamSource<String> source = env.fromElements("hello world!", "明天 你好!", "数码 宝贝", "名侦探 柯南!");
    
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
            Table table = tableEnv.fromDataStream(source,$("phrase"));
    
            // 创建一张临时表
            tableEnv.createTemporaryView("t",table);
    
            // 需求:将元数据炸开,
    
            // 注册
            tableEnv.createFunction("rowToColumn",MyTableFunctionByRowToColumn.class);
    
            //查询
            tableEnv.sqlQuery("select phrase , w ,len from t join lateral table (rowToColumn(phrase)) on true")
                    .execute()
                    .print();
    
        }
    
    • 取别名,内部内置函数 T 就是用于取别名
        <!--取别名-->
        <sql id="tableFunction2">
            select
            phrase , w1 ,len1
            from #{tableName}
            join lateral table (rowToColumn(phrase))
            as T(w1,len1)
            on true
        </sql>
    

    查询

    +----+--------------------------------+--------------------------------+-------------+
    | op |                         phrase |                             w1 |        len1 |
    +----+--------------------------------+--------------------------------+-------------+
    | +I |                   hello world! |                          hello |           5 |
    | +I |                   hello world! |                         world! |           6 |
    | +I |                     明天 你好! |                           明天 |           2 |
    | +I |                     明天 你好! |                          你好! |           3 |
    | +I |                      数码 宝贝 |                           数码 |           2 |
    | +I |                      数码 宝贝 |                           宝贝 |           2 |
    | +I |                   名侦探 柯南! |                         名侦探 |           3 |
    | +I |                   名侦探 柯南! |                          柯南! |           3 |
    +----+--------------------------------+--------------------------------+-------------+
    

    相关文章

      网友评论

          本文标题:Flink(1.13) FlinkSql自定义函数

          本文链接:https://www.haomeiwen.com/subject/rhcqiltx.html