美文网首页
Flink 嵌套json 解析成Table

Flink 嵌套json 解析成Table

作者: loukey_j | 来源:发表于2019-11-16 15:57 被阅读0次

    背景

    JSON作为常用的数据格式,在消息中间件中用json做为消息格式也很常见。在flink table中消息可以理解为表的一行记录。所以对于一个消息队列中的一个topic来说,可以根据json数据格式映射成一张表。flink自身是支持json格式的,但是对于复杂格式支持不是太友好。笔者也是在flink table的应用中遇到了各种json格式,发布出来给大家看看,或有其他好的解析方式可留言探讨。

    概述

    下面3图可以很直观看出我理想的解析思想,左边是源json格式右边是table 格式

    只有嵌套对象类json

    图1

    单一嵌套数组类json

    图2

    多嵌套数组类json

    图3

    针对多嵌套数组的笔者的最终实现和图3有差别。选择的方式为扁平化的实现方式,后面根据条件去选择tablename拆分成2张表

    图4

    实现 直接上源码。

    package com.paic.phflink.core.util

    import java.util

    import org.apache.flink.api.common.typeinfo.TypeInformation

    import org.apache.flink.api.java.typeutils.RowTypeInfo

    import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper

    import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.{ArrayNode, JsonNodeFactory, ObjectNode}

    import org.apache.flink.types.Row

    import scala.collection.JavaConverters._

    import scala.collection.mutable

    /**

      * 给定JSON串,和目标schema描述,生成对于的 Row

    */

    object JsonNodeUtil {

    def main(args: Array[String]):Unit = {

    //    objects_test()

    //    oneList_test()

        twoList_test()

    //    oneList_no_flatMap_test()

    //    twoList_no_flatmap_test()

      }

    val objects =

    """

    {"a":"1","b":{"c":"2","d":"3"},"e":{"f":"4","g":"5"}}

        """.stripMargin

    .getBytes()

    val oneList =

    """

    {"a":"1","b":[{"c":"2","d":"3"},{"c":"4","d":"5"}],"e":"6","f":{"g":"7","h":"8"}}

        """.stripMargin

    .getBytes()

    val twoList =

    """

    {"a":"1","b":[{"c":"2","d":"3"},{"c":"4","d":"5"}],"e":"6","f":[{"g":"7","h":"8"},{"g":"9","h":"10"}],"i":{"j":"11","k":"12"}}

        """.stripMargin

    .getBytes()

    /**

        * 嵌套对象打平 和嵌套list一个不打平一个打平

        */

      def twoList_no_flatmap_test() = {

    val objectMapper =new ObjectMapper()

    val json =twoList

        val map =new util.LinkedHashMap[String, String]()

    map.put("a", TableSchemaUtil.STRING)

    map.put("b", TableSchemaUtil.OBJECT_ARRAY)

    map.put("b_c", TableSchemaUtil.STRING)

    map.put("b_d", TableSchemaUtil.STRING)

    map.put("e", TableSchemaUtil.STRING)

    map.put("f", TableSchemaUtil.OBJECT_ARRAY)

    map.put("f_g", TableSchemaUtil.STRING)

    map.put("f_h", TableSchemaUtil.STRING)

    map.put("i_j", TableSchemaUtil.STRING)

    map.put("i_k", TableSchemaUtil.STRING)

    map.put("tableName", TableSchemaUtil.STRING)

    val rows:util.ArrayList[Row] =getRows(objectMapper,json,map)

    rows

    }

    /**

        * 嵌套对象打平 和嵌套list不打平

        */

      def oneList_no_flatMap_test() = {

    val objectMapper =new ObjectMapper

    val json =oneList

        val map =new util.LinkedHashMap[String, String]()

    map.put("a", TableSchemaUtil.STRING)

    map.put("b", TableSchemaUtil.OBJECT_ARRAY)

    map.put("b_c", TableSchemaUtil.STRING)

    map.put("b_d", TableSchemaUtil.STRING)

    map.put("e", TableSchemaUtil.STRING)

    map.put("f_g", TableSchemaUtil.STRING)

    map.put("f_h", TableSchemaUtil.STRING)

    val rows:util.ArrayList[Row] =getRows(objectMapper,json,map)

    rows

    }

    /**

        * 嵌套对象 和嵌套多个list  打平

        * 多个数组的一定要指定 一个 tableName 列

        * map.put("tableName", TableSchemaUtil.STRING)

        * 后续根据这个tableName 进行查询分表

        * @return

        */

      def twoList_test() = {

    val objectMapper =new ObjectMapper

    val json =twoList

        val map =new util.LinkedHashMap[String, String]()

    map.put("a", TableSchemaUtil.STRING)

    map.put("b_c", TableSchemaUtil.STRING)

    map.put("b_d", TableSchemaUtil.STRING)

    map.put("e", TableSchemaUtil.STRING)

    map.put("f_g", TableSchemaUtil.STRING)

    map.put("f_h", TableSchemaUtil.STRING)

    map.put("i_j", TableSchemaUtil.STRING)

    map.put("i_k", TableSchemaUtil.STRING)

    map.put("tableName", TableSchemaUtil.STRING)

    val rows:util.ArrayList[Row] =getRows(objectMapper,json,map)

    rows

    }

    /**

        * 嵌套对象 和嵌套list  打平

        * @return

        */

      def oneList_test() = {

    val objectMapper =new ObjectMapper

    val json =oneList

        val map =new util.LinkedHashMap[String, String]()

    map.put("a", TableSchemaUtil.STRING)

    map.put("b_c", TableSchemaUtil.STRING)

    map.put("b_d", TableSchemaUtil.STRING)

    map.put("e", TableSchemaUtil.STRING)

    map.put("f_g", TableSchemaUtil.STRING)

    map.put("f_h", TableSchemaUtil.STRING)

    val rows:util.ArrayList[Row] =getRows(objectMapper,json,map)

    rows

    }

    /**

        * 嵌套对象打平

        * @return

        */

      def objects_test() = {

    val objectMapper =new ObjectMapper

    val json =objects

        val map =new util.LinkedHashMap[String, String]()

    map.put("a", TableSchemaUtil.STRING)

    map.put("b_c", TableSchemaUtil.STRING)

    map.put("b_d", TableSchemaUtil.STRING)

    map.put("e_f", TableSchemaUtil.STRING)

    map.put("e_g", TableSchemaUtil.STRING)

    val rows:util.ArrayList[Row] =getRows(objectMapper,json,map)

    rows

    }

    def getRows(objectMapper: ObjectMapper,json:Array[Byte],map:util.LinkedHashMap[String, String]) ={

    val objectNode:ObjectNode = objectMapper.readValue(json,classOf[ObjectNode])

    //最外层的数据

        val rootColums =new ObjectNode(JsonNodeFactory.instance)

    //嵌套的数组

        val tables = mutable.LinkedHashMap[String, ArrayNode]()

    val list =new util.ArrayList[Row]()

    //解析json

        parse(objectNode,rootColums,tables,"")

    val saclaLinkMap = mutable.LinkedHashMap[String, String]()

    map.asScala.foreach{case (k:String,v:String) => saclaLinkMap += (k -> v)}

    //对json转化成 row的时候选择打平跳过

        val feidMapScala = TableSchemaUtil.toFlinkType(saclaLinkMap)

    val types: Array[TypeInformation[_]] = feidMapScala.map(_._2).toArray

    val fieldNames: Array[String] = feidMapScala.map(_._1).toArray

    val info =new RowTypeInfo(types,fieldNames)

    val tableSize = tables.size

    if(tableSize ==0 && rootColums.size() >0){

    val row = JsonToRowUtil.convertRow(rootColums,info)

    list.add(row)

    }

    //循环每个嵌套的数组 每个数组理解为一个表

        for( (table,value)<- tables) {//循环每个表

          //判断表是否需要打平

          if(map.containsKey(table) && map.get(table).startsWith(TableSchemaUtil.OBJECT_ARRAY)){

    //不需要打平就把list数据弄成一个row 数组

            val tableLine = rootColums.deepCopy()//每一行初始化的ObjectNode

            if(tableSize >1){//如果有多个table 就要加一列table 名字做区分

              tableLine.put("tableName",table)//把每行数据都加一个table名字

            }

    tableLine.put(table,value)

    val row = JsonToRowUtil.convertRow(tableLine,info)

    list.add(row)

    }else{

    val tableLines = value.elements()//表中的所有行

            while (tableLines.hasNext) {//循环每一行

              val line = tableLines.next()//获取每一行

              val child = line.fields()//每一行的所有列

              val tableLine = rootColums.deepCopy()//每一行初始化的ObjectNode

              if(tableSize >1){//如果有多个table 就要加一列table 名字做区分

                tableLine.put("tableName",table)//把每行数据都加一个table名字

              }

    while (child.hasNext) {//循环每一列

                val colum = child.next()//获取每一列

                val columName = colum.getKey//列名

                val filedFullName = table +"_" + columName//组合列名

                val columValue = colum.getValue.asText()//列对应的值

                //          println(filedFullName, columValue)

                val dataValue = colum.getValue.asText()

    tableLine.put(filedFullName,dataValue)

    }

    val row = JsonToRowUtil.convertRow(tableLine,info)

    list.add(row)

    }

    }

    }

    //    returnJson.put("root",json)

        list

    }

    def parse(objectNode: ObjectNode,

                objectSchema:ObjectNode,

                listSchema:mutable.LinkedHashMap[String, ArrayNode],

                parentName:String =""

              ):Unit ={

    val fieldNames = objectNode.fieldNames()

    //得到第一层

        while(fieldNames.hasNext){

    val field = fieldNames.next()

    var node = objectNode.get(field)

    val filedFullName =if(parentName.length >0){

    parentName+"_"+field

    }else{

    field

    }

    if(node.isObject){

    parse(objectNode.`with`(field),objectSchema,listSchema,filedFullName)

    }else if(node.isArray){

    val list:ArrayNode = node.asInstanceOf[ArrayNode]

    if(list.size() >0){

    //获取第0个解析一下

              //判断list里面是否还有嵌套,如果没有则直接去

              if(false){

    //            node = list.get(0)

    //            parse(node,objectSchema,listSchema,filedFullName)

              }else{

    listSchema += (filedFullName -> list)

    }

    }else{//不为空则填充默认值

            }

    }else{

    val dataValue = node.asText()

    //        println(filedFullName,dataValue)

            objectSchema.put(filedFullName,dataValue.toString)

    //        val dataType = schema.get(filedFullName)

          }

    }

    }

    }

    相关文章

      网友评论

          本文标题:Flink 嵌套json 解析成Table

          本文链接:https://www.haomeiwen.com/subject/hswdpctx.html