场景描述:Flink在处理实时数据时,假如其中一条数据时脏数据,例如格式错误导致Json转换异常,字段缺少等等,这个时候该怎么处理呢?
解决办法:
这种问题在Spark Sql或者Flink Sql中,最常见的办法就是直接过滤掉。
在实际中,遇到的情况会非常多,则我们可以自定义一个UDF,这个UDF的作用就是用来处理null或者空字符串或者其他各种异常情况的。
官方案例:
public class HashCode extends ScalarFunction {
private int factor = 12;
public HashCode(int factor) {
this.factor = factor;
}
public int eval(String s) {
return s.hashCode() * factor;
}
}
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
// register the function
tableEnv.registerFunction("hashCode", new HashCode(10));
// use the function in Java Table API
myTable.select("string, string.hashCode(), hashCode(string)");
// use the function in SQL API
tableEnv.sqlQuery("SELECT string, hashCode(string) FROM MyTable");
在实际工作中,在利用env.addSource方法对接Kafka数据源后,会利用map方法将对应json串转成对象,所以会try catch,即
this.source = this.env
.addSource(KafkaUtil.text("tgs-topic"))
.uid(Constants.UID_SOURCE_TGS)
.name("Kafka tgs-topic Source")
.map(json -> {
try {
return JSON.parseObject(json, Tgs.class);
} catch (Exception e) {
logger.error("Illegal JSON message: {}", json);
throw e;
}
})
.uid(Constants.UID_MAP_JSON_PARSE)
.name("Parse JSON to Tgs object");
这样在遇到脏数据时,也不会因为json转换出错导致任务失败。
网友评论