需求
源kafka消息数据中有个字段是Map类型, 我希望读取该字段并且写入clickhouse中, 这是个Map<String, Object>类型的数据, 还有可能是嵌套结构。就像这样
{
"name":"hello",
"info": {
"age": 18,
"gender": "male",
"other": {
"car": "川A8888888",
"what": 100
}
}
}
问题
flinksql原生支持Map类型, 但是必须制定key和value的类型, 无法满足需求。所以打算以字符串的方式写入clickhouse, 查询的时候再解析, 于是我在sql定义中将map类型的数据类型写成String。就像这样
CREATE TABLE long_long_ago (
name STRING,
info STRING
) WITH (…)
可事情没有向预计的方向发展, 程序不报错, 可是写入到clickhouse中的info字段为空, 一脸懵逼, 开始进入正题。
方案
为什么将Map类型的数据定义成String后解析出来的数据是空呢?直接看代码, 不一会儿就定位到所在代码的位置了。
所在包:flink-json
所在类:org.apache.flink.formats.json.JsonRowDataDeserializationSchema
所在方法:convertToString
原因:
因为flink解析出来的info本质还是jsonNode, 即使我们在sql中定义其为String。
而jsonNode的asText方法是没实现的(也就是空)。
所以适当的修改一下代码就可以了, jsonNode实现了toString类。
修改如下
private StringData convertToString(JsonNode jsonNode) {
if (jsonNode.asText() == "") {
return StringData.fromString(jsonNode.toString());
} else {
return StringData.fromString(jsonNode.asText());
}
}
修改完上面的方法后, 打包flink-json包, 替换jar包。
好了, 现在flink程序就可以将Map对象转成String, 然后落地到clickhouse了
网友评论