美文网首页
Structured Streaming 将json转成colu

Structured Streaming 将json转成colu

作者: SunnyMore | 来源:发表于2018-06-27 18:10 被阅读122次
      1. 加载json文件的时候,如果schema设置的属性,如果存在非字符串类型,那么转成column就都变成了null,eg.
        json文件内容如下:
    {"reId": "1","ratingFlowId": "1001","workFlowId":"1"}
    {"reId": "2","ratingFlowId": "1002","workFlowId":"2"}
    {"reId": "3","ratingFlowId": "1003","workFlowId":"3"}
    {"reId": "4","ratingFlowId": "1004","workFlowId":"4"}
    {"reId": "1","ratingFlowId": "1005","workFlowId":"5"}
    

    代码如下:

    import org.apache.spark.sql.{Encoders, SparkSession}
    
    /**
      * <Description> <br>
      *
      * @author Sunny<br>
      * @taskId: <br>
      * @version 1.0<br>
      * @createDate 2018/06/27 9:29 <br>
      * @see com.spark.sunny.structuredstreaming <br>
      */
    case class CdrData(reId: Long, ratingFlowId: String, workFlowId: String)
    
    object StructuredHdfsJson {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession
          .builder()
          .appName("StructuredHdfsJson")
          .master("local")
          .getOrCreate()
    
        val schema = Encoders.product[CdrData].schema
        val lines =  spark.readStream
          .format("json")
          .schema(schema)
          .load("C:\\Users\\yaj\\Desktop\\dashboard\\tmp")
          //.load("hdfs://iotsparkmaster:9000/json")
    
        val query = lines.writeStream
          .outputMode("update")
          .format("console")
          .start()
    
        query.awaitTermination()
      }
    
    }
    
    

    结果显示如下:

    -------------------------------------------
    Batch: 0
    -------------------------------------------
    18/06/27 18:01:09 INFO CodeGenerator: Code generated in 42.711764 ms
    18/06/27 18:01:09 INFO CodeGenerator: Code generated in 17.850871 ms
    18/06/27 18:01:09 INFO WriteToDataSourceV2Exec: Data source writer org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@6f9366bc committed.
    +----+------------+----------+
    |reId|ratingFlowId|workFlowId|
    +----+------------+----------+
    |null|        null|      null|
    |null|        null|      null|
    |null|        null|      null|
    |null|        null|      null|
    |null|        null|      null|
    +----+------------+----------+
    

    如果将case class CdrData的reId的Long的类型改成String,则展示正常,eg.

    case class CdrData(reId: String, ratingFlowId: String, workFlowId: String)
    

    打印结果:

    -------------------------------------------
    Batch: 0
    -------------------------------------------
    +----+------------+----------+
    |reId|ratingFlowId|workFlowId|
    +----+------------+----------+
    |   1|        1001|         1|
    |   2|        1002|         2|
    |   3|        1003|         3|
    |   4|        1004|         4|
    |   1|        1005|         5|
    +----+------------+----------+
    
      1. 如果json文件中的key不带引号,则也不能把json正常转换成column,eg.
        json文件内容如下:
    {reId: "1",ratingFlowId: "1001",workFlowId:"1"}
    {reId: "2",ratingFlowId: "1002",workFlowId:"2"}
    {reId: "3",ratingFlowId: "1003",workFlowId:"3"}
    {reId: "4",ratingFlowId: "1004",workFlowId:"4"}
    {reId: "1",ratingFlowId: "1005",workFlowId:"5"}
    

    打印结果如下:

    -------------------------------------------
    Batch: 0
    -------------------------------------------
    +----+------------+----------+
    |reId|ratingFlowId|workFlowId|
    +----+------------+----------+
    |null|        null|      null|
    |null|        null|      null|
    |null|        null|      null|
    |null|        null|      null|
    |null|        null|      null|
    +----+------------+----------+
    
      1. json文件的后缀可以不是json,json文件中的json属性的位置变化不影响,eg.
    {"reId": "1","workFlowId":"1","ratingFlowId": ""}
    {"reId": "2","workFlowId":"2","ratingFlowId": "1002"}
    {"reId": "3","ratingFlowId": "1003","workFlowId":"3"}
    {"reId": "4","ratingFlowId": "1004","workFlowId":"4"}
    {"reId": "1","ratingFlowId": "1005","workFlowId":"5"}
    

    打印结果:

    -------------------------------------------
    Batch: 0
    -------------------------------------------
    +----+------------+----------+
    18/06/27 18:09:50 INFO WriteToDataSourceV2Exec: Data source writer org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@2c833e4b committed.
    |reId|ratingFlowId|workFlowId|
    +----+------------+----------+
    |   1|            |         1|
    |   2|        1002|         2|
    |   3|        1003|         3|
    |   4|        1004|         4|
    |   1|        1005|         5|
    +----+------------+----------+
    
    

    相关文章

      网友评论

          本文标题:Structured Streaming 将json转成colu

          本文链接:https://www.haomeiwen.com/subject/qnobyftx.html