- 加载json文件的时候,如果schema设置的属性,如果存在非字符串类型,那么转成column就都变成了null,eg.
json文件内容如下:
- 加载json文件的时候,如果schema设置的属性,如果存在非字符串类型,那么转成column就都变成了null,eg.
{"reId": "1","ratingFlowId": "1001","workFlowId":"1"}
{"reId": "2","ratingFlowId": "1002","workFlowId":"2"}
{"reId": "3","ratingFlowId": "1003","workFlowId":"3"}
{"reId": "4","ratingFlowId": "1004","workFlowId":"4"}
{"reId": "1","ratingFlowId": "1005","workFlowId":"5"}
代码如下:
import org.apache.spark.sql.{Encoders, SparkSession}
/**
* <Description> <br>
*
* @author Sunny<br>
* @taskId: <br>
* @version 1.0<br>
* @createDate 2018/06/27 9:29 <br>
* @see com.spark.sunny.structuredstreaming <br>
*/
case class CdrData(reId: Long, ratingFlowId: String, workFlowId: String)
object StructuredHdfsJson {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName("StructuredHdfsJson")
.master("local")
.getOrCreate()
val schema = Encoders.product[CdrData].schema
val lines = spark.readStream
.format("json")
.schema(schema)
.load("C:\\Users\\yaj\\Desktop\\dashboard\\tmp")
//.load("hdfs://iotsparkmaster:9000/json")
val query = lines.writeStream
.outputMode("update")
.format("console")
.start()
query.awaitTermination()
}
}
结果显示如下:
-------------------------------------------
Batch: 0
-------------------------------------------
18/06/27 18:01:09 INFO CodeGenerator: Code generated in 42.711764 ms
18/06/27 18:01:09 INFO CodeGenerator: Code generated in 17.850871 ms
18/06/27 18:01:09 INFO WriteToDataSourceV2Exec: Data source writer org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@6f9366bc committed.
+----+------------+----------+
|reId|ratingFlowId|workFlowId|
+----+------------+----------+
|null| null| null|
|null| null| null|
|null| null| null|
|null| null| null|
|null| null| null|
+----+------------+----------+
如果将case class CdrData的reId的Long的类型改成String,则展示正常,eg.
case class CdrData(reId: String, ratingFlowId: String, workFlowId: String)
打印结果:
-------------------------------------------
Batch: 0
-------------------------------------------
+----+------------+----------+
|reId|ratingFlowId|workFlowId|
+----+------------+----------+
| 1| 1001| 1|
| 2| 1002| 2|
| 3| 1003| 3|
| 4| 1004| 4|
| 1| 1005| 5|
+----+------------+----------+
- 如果json文件中的key不带引号,则也不能把json正常转换成column,eg.
json文件内容如下:
- 如果json文件中的key不带引号,则也不能把json正常转换成column,eg.
{reId: "1",ratingFlowId: "1001",workFlowId:"1"}
{reId: "2",ratingFlowId: "1002",workFlowId:"2"}
{reId: "3",ratingFlowId: "1003",workFlowId:"3"}
{reId: "4",ratingFlowId: "1004",workFlowId:"4"}
{reId: "1",ratingFlowId: "1005",workFlowId:"5"}
打印结果如下:
-------------------------------------------
Batch: 0
-------------------------------------------
+----+------------+----------+
|reId|ratingFlowId|workFlowId|
+----+------------+----------+
|null| null| null|
|null| null| null|
|null| null| null|
|null| null| null|
|null| null| null|
+----+------------+----------+
- json文件的后缀可以不是json,json文件中的json属性的位置变化不影响,eg.
{"reId": "1","workFlowId":"1","ratingFlowId": ""}
{"reId": "2","workFlowId":"2","ratingFlowId": "1002"}
{"reId": "3","ratingFlowId": "1003","workFlowId":"3"}
{"reId": "4","ratingFlowId": "1004","workFlowId":"4"}
{"reId": "1","ratingFlowId": "1005","workFlowId":"5"}
打印结果:
-------------------------------------------
Batch: 0
-------------------------------------------
+----+------------+----------+
18/06/27 18:09:50 INFO WriteToDataSourceV2Exec: Data source writer org.apache.spark.sql.execution.streaming.sources.MicroBatchWriter@2c833e4b committed.
|reId|ratingFlowId|workFlowId|
+----+------------+----------+
| 1| | 1|
| 2| 1002| 2|
| 3| 1003| 3|
| 4| 1004| 4|
| 1| 1005| 5|
+----+------------+----------+
网友评论