美文网首页Flink
Flink-sql自定义udtf 函数

Flink-sql自定义udtf 函数

作者: wudl | 来源:发表于2021-08-12 00:24 被阅读0次

    1. 用自定义的函数在Flink Sql 中使用

    1.1 官网也说的很详细

    https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/udfs.html

    1.1.1 官网上面的例子:

    import org.apache.flink.table.annotation.DataTypeHint;
    import org.apache.flink.table.annotation.FunctionHint;
    import org.apache.flink.table.functions.TableFunction;
    import org.apache.flink.types.Row;
    
    // function with overloaded evaluation methods
    // but globally defined output type
    @FunctionHint(output = @DataTypeHint("ROW<s STRING, i INT>"))
    public static class OverloadedFunction extends TableFunction<Row> {
    
      public void eval(int a, int b) {
        collect(Row.of("Sum", a + b));
      }
    
      // overloading of arguments is still possible
      public void eval() {
        collect(Row.of("Empty args", -1));
      }
    }
    
    // decouples the type inference from evaluation methods,
    // the type inference is entirely determined by the function hints
    @FunctionHint(
      input = {@DataTypeHint("INT"), @DataTypeHint("INT")},
      output = @DataTypeHint("INT")
    )
    @FunctionHint(
      input = {@DataTypeHint("BIGINT"), @DataTypeHint("BIGINT")},
      output = @DataTypeHint("BIGINT")
    )
    @FunctionHint(
      input = {},
      output = @DataTypeHint("BOOLEAN")
    )
    public static class OverloadedFunction extends TableFunction<Object> {
    
      // an implementer just needs to make sure that a method exists
      // that can be called by the JVM
      public void eval(Object... o) {
        if (o.length == 0) {
          collect(false);
        }
        collect(o[0]);
      }
    }
    

    2. 自己实现

    需要注意的是: 自己实现参数定义传参

    package com.wudl.flink.sql;
    
    import com.wudl.flink.bean.WaterSensor;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.annotation.DataTypeHint;
    import org.apache.flink.table.annotation.FunctionHint;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.table.functions.ScalarFunction;
    import org.apache.flink.table.functions.TableFunction;
    import org.apache.flink.types.Row;
    
    import static org.apache.flink.table.api.Expressions.$;
    import static org.apache.flink.table.api.Expressions.call;
    
    /**
     * @ClassName : Flink_Sql_Function_UDTF
     * @Description : Flink自定义udtf 函数
     * @Author :wudl
     * @Date: 2021-08-11 22:55
     */
    
    public class Flink_Sql_Function_UDTF {
        public static void main(String[] args) throws Exception {
    
    
            //1. 获取执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
            //2. 读取端口中的数据并且转化为javaBean
            SingleOutputStreamOperator<WaterSensor> waterSensorDs = env.socketTextStream("192.168.1.180", 9999)
                    .map(line -> {
                        String[] split = line.split(",");
                        return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
                    });
    
            // 3. 讲流 转化为动态表
            Table table = tableEnv.fromDataStream(waterSensorDs);
            // 5. 先注册在使用
            tableEnv.createTemporarySystemFunction("mySpilt",mySpilt.class);
            //5.1 在使用注册的自定义函数 名称为MyLength
    //        table.joinLateral(call("mySpilt",$("id")))
    //                .select($("id"),$("s")).execute().print();
            // 5.2 采用sql 的方式进行使用自定义函数
                tableEnv.sqlQuery("select id, s from  "+table+", lateral table(mySpilt(id))").execute().print();
            //5. 执行任务
            env.execute();
    
    
        }
    
        // 自定义函数类
        @FunctionHint(output = @DataTypeHint("ROW<s STRING>"))
        public static class  mySpilt extends TableFunction<Row> {
            public void eval(String value) {
                String[] split = value.split("_");
                for (String str:split)
                {
                    collect(Row.of(str));
                }
            }
        }
    
    }
    
    
    Flink-udft 函数.png

    相关文章

      网友评论

        本文标题:Flink-sql自定义udtf 函数

        本文链接:https://www.haomeiwen.com/subject/tdekbltx.html