1. 用自定义的函数在Flink Sql 中使用
1.1 官网也说的很详细
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/udfs.html
1.1.1 官网上面的例子:
import org.apache.flink.table.api.*;
import org.apache.flink.table.functions.ScalarFunction;
import static org.apache.flink.table.api.Expressions.*;
// define function logic
public static class SubstringFunction extends ScalarFunction {
public String eval(String s, Integer begin, Integer end) {
return s.substring(begin, end);
}
}
TableEnvironment env = TableEnvironment.create(...);
// call function "inline" without registration in Table API
env.from("MyTable").select(call(SubstringFunction.class, $("myField"), 5, 12));
// register function
env.createTemporarySystemFunction("SubstringFunction", SubstringFunction.class);
// call registered function in Table API
env.from("MyTable").select(call("SubstringFunction", $("myField"), 5, 12));
// call registered function in SQL
env.sqlQuery("SELECT SubstringFunction(myField, 5, 12) FROM MyTable");
2. 自己实现
需要注意的是: 自己实现注册全局自定义函数
package com.wudl.flink.sql;
import com.google.inject.internal.cglib.proxy.$Callback;
import com.wudl.flink.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;
/**
* @ClassName : Flink_Sql_Function_UDF
* @Description : Flink自定义udf 函数
* @Author :wudl
* @Date: 2021-08-11 22:55
*/
public class Flink_Sql_Function_UDF {
public static void main(String[] args) throws Exception {
//1. 获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//2. 读取端口中的数据并且转化为javaBean
SingleOutputStreamOperator<WaterSensor> waterSensorDs = env.socketTextStream("192.168.1.180", 9999)
.map(line -> {
String[] split = line.split(",");
return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
});
// 3. 讲流 转化为动态表
Table table = tableEnv.fromDataStream(waterSensorDs);
// 4. 直接调用自定义udf 函数
// table.select(call(myFunction.class,$("id"))).execute().print();
// 5. 先注册在使用
tableEnv.createTemporarySystemFunction("MyLength",myFunction.class);
//5.1 在使用注册的自定义函数 名称为MyLength
// table.select(call("MyLength",$("id"))).execute().print();
// 5.2 采用sql 的方式进行使用自定义函数
tableEnv.sqlQuery("select id, MyLength(id) from "+table).execute().print();
//5. 执行任务
env.execute();
}
// 自定义函数类
public static class myFunction extends ScalarFunction{
public int eval(String value) {
return value.length();
}
}
}
Flink-funcation.png
网友评论