目的:
从复杂json中提取关心的字段数据,利用ROW的方式, 可以让复杂的json转变为可操作的schema,然后可以通过 field as xx.xx.xx
来使用
version flink 1.13.0
原始json
{
"sln":"itn",
"pl":"js",
"sdk":"zg-js",
"sdkv":"2.0",
"owner":"zg",
"ut":"2021-7-15 16:07:01",
"tz":28800000,
"debug":0,
"ak":"xxxx",
"usr":{
"did":"171ebd0070d8a-02a71a3974197b-11096a4c-46500-171ebd0070e57",
"$zg_did":12590353
},
"data":[
{
"dt":"evt",
"pr":{
"$ct":1626336421471,
"$tz":28800000,
"$sid":1626336206600,
"$url":"https://a.xxx.com.cn/2656/m46991/#zoneclick=102782",
"$ref":"https://a.xxx.com.cn/2656/",
"$referrer_domain":"",
"$eid":"viehcle_detail_page_view",
"_platform_type":"M",
"_project_name":"XCAR",
"_login_status":0,
"$zg_did":12590353,
"$zg_sid":1626336206600,
"$uuid":"828c1425345a42379ff4de1e6d0f6567",
"$zg_zgid":12612355,
"$zg_eid":115,
"$zg_epid#_level":1498,
"$zg_eptp#_level":"string",
"$zg_epid#_page_name":2255,
"$zg_eptp#_page_name":"string",
"$zg_epid#_model_id":2613,
"$zg_eptp#_model_id":"string",
"$zg_epid#_sub_level":1502,
"$zg_eptp#_sub_level":"string",
"$zg_epid#_series_id":400,
"$zg_eptp#_series_id":"string",
"$zg_epid#_first_category":398,
"$zg_eptp#_first_category":"string",
"$zg_epid#_series_or_model":1503,
"$zg_eptp#_series_or_model":"string",
"$zg_epid#_brand":1501,
"$zg_eptp#_brand":"string",
"$zg_epid#_model":2614,
"$zg_eptp#_model":"string",
"$zg_epid#_platform_type":1490,
"$zg_eptp#_platform_type":"string",
"$zg_epid#_sub_brand":1497,
"$zg_eptp#_sub_brand":"string",
"$zg_epid#_login_status":1488,
"$zg_eptp#_login_status":"number",
"$zg_epid#_sub_brand_id":2611,
"$zg_eptp#_sub_brand_id":"string",
"$zg_epid#_energy":2612,
"$zg_eptp#_energy":"string",
"$zg_epid#_year":1499,
"$zg_eptp#_year":"string",
"$zg_epid#_structure":1500,
"$zg_eptp#_structure":"string",
"$zg_epid#_price_range":1506,
"$zg_eptp#_price_range":"string",
"$zg_epid#_brand_id":1507,
"$zg_eptp#_brand_id":"string",
"$zg_epid#_status":2677,
"$zg_eptp#_status":"string",
"$zg_epid#_project_name":1489,
"$zg_eptp#_project_name":"string",
"$zg_epid#_series":1505,
"$zg_eptp#_series":"string"
}
},
{
"dt":"zgid",
"pr":{
"$ct":1626336421471,
"$zg_did":12590353,
"$tz":28800000,
"$zg_zgid":12612355
}
},
{
"dt":"pl",
"pr":{
"$dv":"x'x.io",
"$zg_did":12590353
}
}
],
"ip":"39.144.27.100",
"st":1626336451683,
"ua":"Mozilla/5.0 (Linux; Android 10; MED-AL00; HMSCore 6.0.0.305) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.93 HuaweiBrowser/11.1.2.301 Mobile Safari/537.36",
"plat":3,
"app_id":6,
"@version":"1",
"@timestamp":"2021-07-15T08:07:12.661Z"
}
CREATE TABLE kafka_test (
usr ROW<did string,`$zg_did` string>,
data ARRAY<ROW<dt STRING,pr ROW<`$zg_zgid` STRING,`$cuid` STRING,`_jpush_id` STRING>>>,
deviceId as usr.`$zg_did`,
zg_id as data[1].pr.`$zg_zgid`,
uid as data[1].pr.`$cuid`,
jpush_id as data[1].pr.`_jpush_id`
)
WITH (
'connector' = 'kafka',
'topic' = 'zg_log',
'properties.bootstrap.servers' = 'xxx:9092',
'properties.group.id' = 'test_group_zg_log',
'format' = 'json',
'scan.startup.mode' = 'latest-offset',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'false'
);
CREATE TABLE print_table (
deviceId STRING,
zg_id STRING,
uid STRING,
jpush_id STRING
) WITH (
'connector' = 'print'
);
insert into print_table select deviceId,zg_id,uid,jpush_id from kafka_test;
参考
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/json/
https://blog.csdn.net/xianpanjia4616/article/details/112690791
https://blog.csdn.net/YouLoveItY/article/details/108276799
网友评论