美文网首页
flink sql kafka 解析复杂json

flink sql kafka 解析复杂json

作者: 邵红晓 | 来源:发表于2021-07-16 09:19 被阅读0次

目的:

从复杂json中提取关心的字段数据,利用ROW的方式, 可以让复杂的json转变为可操作的schema,然后可以通过 field as xx.xx.xx来使用
version flink 1.13.0

原始json

{
    "sln":"itn",
    "pl":"js",
    "sdk":"zg-js",
    "sdkv":"2.0",
    "owner":"zg",
    "ut":"2021-7-15 16:07:01",
    "tz":28800000,
    "debug":0,
    "ak":"xxxx",
    "usr":{
        "did":"171ebd0070d8a-02a71a3974197b-11096a4c-46500-171ebd0070e57",
        "$zg_did":12590353
    },
    "data":[
        {
            "dt":"evt",
            "pr":{
                "$ct":1626336421471,
                "$tz":28800000,
                "$sid":1626336206600,
                "$url":"https://a.xxx.com.cn/2656/m46991/#zoneclick=102782",
                "$ref":"https://a.xxx.com.cn/2656/",
                "$referrer_domain":"",
                "$eid":"viehcle_detail_page_view",
                "_platform_type":"M",
                "_project_name":"XCAR",
                "_login_status":0,
                "$zg_did":12590353,
                "$zg_sid":1626336206600,
                "$uuid":"828c1425345a42379ff4de1e6d0f6567",
                "$zg_zgid":12612355,
                "$zg_eid":115,
                "$zg_epid#_level":1498,
                "$zg_eptp#_level":"string",
                "$zg_epid#_page_name":2255,
                "$zg_eptp#_page_name":"string",
                "$zg_epid#_model_id":2613,
                "$zg_eptp#_model_id":"string",
                "$zg_epid#_sub_level":1502,
                "$zg_eptp#_sub_level":"string",
                "$zg_epid#_series_id":400,
                "$zg_eptp#_series_id":"string",
                "$zg_epid#_first_category":398,
                "$zg_eptp#_first_category":"string",
                "$zg_epid#_series_or_model":1503,
                "$zg_eptp#_series_or_model":"string",
                "$zg_epid#_brand":1501,
                "$zg_eptp#_brand":"string",
                "$zg_epid#_model":2614,
                "$zg_eptp#_model":"string",
                "$zg_epid#_platform_type":1490,
                "$zg_eptp#_platform_type":"string",
                "$zg_epid#_sub_brand":1497,
                "$zg_eptp#_sub_brand":"string",
                "$zg_epid#_login_status":1488,
                "$zg_eptp#_login_status":"number",
                "$zg_epid#_sub_brand_id":2611,
                "$zg_eptp#_sub_brand_id":"string",
                "$zg_epid#_energy":2612,
                "$zg_eptp#_energy":"string",
                "$zg_epid#_year":1499,
                "$zg_eptp#_year":"string",
                "$zg_epid#_structure":1500,
                "$zg_eptp#_structure":"string",
                "$zg_epid#_price_range":1506,
                "$zg_eptp#_price_range":"string",
                "$zg_epid#_brand_id":1507,
                "$zg_eptp#_brand_id":"string",
                "$zg_epid#_status":2677,
                "$zg_eptp#_status":"string",
                "$zg_epid#_project_name":1489,
                "$zg_eptp#_project_name":"string",
                "$zg_epid#_series":1505,
                "$zg_eptp#_series":"string"
            }
        },
        {
            "dt":"zgid",
            "pr":{
                "$ct":1626336421471,
                "$zg_did":12590353,
                "$tz":28800000,
                "$zg_zgid":12612355
            }
        },
        {
            "dt":"pl",
            "pr":{
                "$dv":"x'x.io",
                "$zg_did":12590353
            }
        }
    ],
    "ip":"39.144.27.100",
    "st":1626336451683,
    "ua":"Mozilla/5.0 (Linux; Android 10; MED-AL00; HMSCore 6.0.0.305) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.93 HuaweiBrowser/11.1.2.301 Mobile Safari/537.36",
    "plat":3,
    "app_id":6,
    "@version":"1",
    "@timestamp":"2021-07-15T08:07:12.661Z"
}
CREATE TABLE kafka_test (
    usr ROW<did string,`$zg_did` string>,
    data ARRAY<ROW<dt STRING,pr ROW<`$zg_zgid` STRING,`$cuid` STRING,`_jpush_id` STRING>>>,
    deviceId as usr.`$zg_did`,
    zg_id as data[1].pr.`$zg_zgid`,
    uid as data[1].pr.`$cuid`,
    jpush_id as data[1].pr.`_jpush_id`
)
WITH (
    'connector' = 'kafka',
    'topic' = 'zg_log',
    'properties.bootstrap.servers' = 'xxx:9092',
    'properties.group.id' = 'test_group_zg_log',
    'format' = 'json',
    'scan.startup.mode' = 'latest-offset',
    'json.fail-on-missing-field' = 'false',
    'json.ignore-parse-errors' = 'false' 
);
CREATE TABLE print_table (
    deviceId STRING,
    zg_id STRING,
    uid STRING,
    jpush_id STRING
) WITH (
    'connector' = 'print'
);
insert into print_table select deviceId,zg_id,uid,jpush_id from kafka_test;

参考
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/formats/json/
https://blog.csdn.net/xianpanjia4616/article/details/112690791
https://blog.csdn.net/YouLoveItY/article/details/108276799

相关文章

网友评论

      本文标题:flink sql kafka 解析复杂json

      本文链接:https://www.haomeiwen.com/subject/iyvvpltx.html