美文网首页
Structured Streaming 将json转成colu

Structured Streaming 将json转成colu

作者: SunnyMore | 来源:发表于2018-06-27 18:10 被阅读122次
    1. 加载json文件的时候,如果schema设置的属性,如果存在非字符串类型,那么转成column就都变成了null,eg.
      json文件内容如下:
{"reId": "1","ratingFlowId": "1001","workFlowId":"1"}
{"reId": "2","ratingFlowId": "1002","workFlowId":"2"}
{"reId": "3","ratingFlowId": "1003","workFlowId":"3"}
{"reId": "4","ratingFlowId": "1004","workFlowId":"4"}
{"reId": "1","ratingFlowId": "1005","workFlowId":"5"}

代码如下:

import org.apache.spark.sql.{Encoders, SparkSession}

/**
  * <Description> <br>
  *
  * @author Sunny<br>
  * @taskId: <br>
  * @version 1.0<br>
  * @createDate 2018/06/27 9:29 <br>
  * @see com.spark.sunny.structuredstreaming <br>
  */
case class CdrData(reId: Long, ratingFlowId: String, workFlowId: String)

object StructuredHdfsJson {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .appName("StructuredHdfsJson")
      .master("local")
      .getOrCreate()

    val schema = Encoders.product[CdrData].schema
    val lines =  spark.readStream
      .format("json")
      .schema(schema)
      .load("C:\\Users\\yaj\\Desktop\\dashboard\\tmp")
      //.load("hdfs://iotsparkmaster:9000/json")

    val query = lines.writeStream
      .outputMode("update")
      .format("console")
      .start()

    query.awaitTermination()
  }

}

结果显示如下:

-------------------------------------------
Batch: 0
-------------------------------------------
18/06/27 18:01:09 INFO CodeGenerator: Code generated in 42.711764 ms
18/06/27 18:01:09 INFO CodeGenerator: Code generated in 17.850871 ms
18/06/27 18:01:09 INFO WriteToDataSourceV2Exec: Data source writer org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@6f9366bc committed.
+----+------------+----------+
|reId|ratingFlowId|workFlowId|
+----+------------+----------+
|null|        null|      null|
|null|        null|      null|
|null|        null|      null|
|null|        null|      null|
|null|        null|      null|
+----+------------+----------+

如果将case class CdrData的reId的Long的类型改成String,则展示正常,eg.

case class CdrData(reId: String, ratingFlowId: String, workFlowId: String)

打印结果:

-------------------------------------------
Batch: 0
-------------------------------------------
+----+------------+----------+
|reId|ratingFlowId|workFlowId|
+----+------------+----------+
|   1|        1001|         1|
|   2|        1002|         2|
|   3|        1003|         3|
|   4|        1004|         4|
|   1|        1005|         5|
+----+------------+----------+
    1. 如果json文件中的key不带引号,则也不能把json正常转换成column,eg.
      json文件内容如下:
{reId: "1",ratingFlowId: "1001",workFlowId:"1"}
{reId: "2",ratingFlowId: "1002",workFlowId:"2"}
{reId: "3",ratingFlowId: "1003",workFlowId:"3"}
{reId: "4",ratingFlowId: "1004",workFlowId:"4"}
{reId: "1",ratingFlowId: "1005",workFlowId:"5"}

打印结果如下:

-------------------------------------------
Batch: 0
-------------------------------------------
+----+------------+----------+
|reId|ratingFlowId|workFlowId|
+----+------------+----------+
|null|        null|      null|
|null|        null|      null|
|null|        null|      null|
|null|        null|      null|
|null|        null|      null|
+----+------------+----------+
    1. json文件的后缀可以不是json,json文件中的json属性的位置变化不影响,eg.
{"reId": "1","workFlowId":"1","ratingFlowId": ""}
{"reId": "2","workFlowId":"2","ratingFlowId": "1002"}
{"reId": "3","ratingFlowId": "1003","workFlowId":"3"}
{"reId": "4","ratingFlowId": "1004","workFlowId":"4"}
{"reId": "1","ratingFlowId": "1005","workFlowId":"5"}

打印结果:

-------------------------------------------
Batch: 0
-------------------------------------------
+----+------------+----------+
18/06/27 18:09:50 INFO WriteToDataSourceV2Exec: Data source writer org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@2c833e4b committed.
|reId|ratingFlowId|workFlowId|
+----+------------+----------+
|   1|            |         1|
|   2|        1002|         2|
|   3|        1003|         3|
|   4|        1004|         4|
|   1|        1005|         5|
+----+------------+----------+

相关文章

网友评论

      本文标题:Structured Streaming 将json转成colu

      本文链接:https://www.haomeiwen.com/subject/qnobyftx.html