摘要:Spark SQL
问题复现
需要对Spark SQL的DataFrame的一列做groupBy聚合其他所有特征,处理方式是将其他所有特征使用function.array
配合function.collect_list
聚合为数组,代码如下
val joinData = data.join(announCountData, Seq("ent_name"), "left_outer").groupBy($"ent_name")
.agg(collect_list(array("publish_date", "target_amt", "case_position", "case_reason", "plaintiff")).as("items"))
.map {
row => {
// 初始化
var involvedCount: Int = 0
var involvedCountLast360: Int = 0
var involvedDefendantCount: Int = 0
var involvedDefendantCountLast360: Int = 0
var lzjfCount: Int = 0
var lzjfCountLast360: Int = 0
var msqqCount: Int = 0
var msqqCountLast360: Int = 0
// 解析
val entName: String = row.getAs[String]("ent_name")
val items: Seq[Seq[String]] = row.getAs[Seq[Seq[String]]]("items")
for (item: Seq[String] <- items) {
if (item.head != null) {
val createCaseDate: String = item.head.split(" ")(0)
val casePosition: String = item(1)
val caseReason: String = item(2)
val plaintiff: String = item(3)
// TODO 业务统计逻辑
}
}
// 统计结果输出
(entName, involvedCount, involvedCountInc, involvedDefendantCount, involvedDefendantCountInc, lzjfCount, lzjfCountInc,
msqqCount, msqqCountInc)
}
}.toDF("ent_name", "involved_count", "involved_count_inc", "involved_defendant_count", "involved_defendant_count_inc",
"lzjf_count", "lzjf_count_inc", "msqq_count", "msqq_count_inc")
执行会报错
org.apache.spark.sql.AnalysisException: ...
due to data type mismatch: input to function array should all be the same type,
but it's [timestamp, double, string, string, string];;
报错说的很清楚,array函数内的列数据类型不一致,看下原始数据的数据类型
scala> announAmountData.printSchema
|-- ent_name: string (nullable = true)
|-- publish_date: timestamp (nullable = true)
|-- target_amt: double (nullable = true)
|-- case_position: string (nullable = true)
|-- case_reason: string (nullable = true)
|-- plaintiff: string (nullable = true)
里面包含string,double,timestamp三种类型,因此报错可以理解了,但是我发现这个问题的出现不是绝对的,因为类似这样的代码写了好多回,也有各种类型的数据类型,没有出过错(我怀疑array会自定将非string列改为string),理论上应该一起报错才对,下面开始测试一下
function.array测试
下面分别测试一下string,double,timestamp在使用array的各种场景下哪些会报错类型不一致
scala> val a = Seq(("a", "2021-01-1", 3.3, "1"),("b", "2022-01-01", 4.4, "2")).toDF("a", "b", "c", "d")
a: org.apache.spark.sql.DataFrame = [a: string, b: string ... 2 more fields]
scala> val b = a.select($"a", $"b".cast("timestamp"), $"c", $"d")
b: org.apache.spark.sql.DataFrame = [a: string, b: timestamp ... 2 more fields]
scala> b.printSchema
root
|-- a: string (nullable = true)
|-- b: timestamp (nullable = true)
|-- c: double (nullable = false)
|-- d: string (nullable = true)
(1)array(timestamp, string)和array(string, timestamp)
scala> b.withColumn("e", array("b", "d"))
res51: org.apache.spark.sql.DataFrame = [a: string, b: timestamp ... 2 more fields]
scala> b.withColumn("e", array("d", "b"))
res52: org.apache.spark.sql.DataFrame = [a: string, b: timestamp ... 2 more fields]
scala> b.withColumn("e", array("d", "b")).show(false)
+---+-------------------+---+---+------------------------+
|a |b |c |d |e |
+---+-------------------+---+---+------------------------+
|a |2021-01-01 00:00:00|3.3|1 |[1, 2021-01-01 00:00:00]|
|b |2022-01-01 00:00:00|4.4|2 |[2, 2022-01-01 00:00:00]|
+---+-------------------+---+---+------------------------+
scala> b.withColumn("e", array("d", "b")).printSchema
root
|-- a: string (nullable = true)
|-- b: timestamp (nullable = true)
|-- c: double (nullable = false)
|-- d: string (nullable = true)
|-- e: array (nullable = false)
| |-- element: string (containsNull = true)
这两个都是可以的,可见不需要类型一致,最终的array里面都是string,Spark SQL会自动将所有非string列转化为string
(2)array(double, string)和array(string, double)
scala> b.withColumn("e", array("c", "d"))
res58: org.apache.spark.sql.DataFrame = [a: string, b: timestamp ... 3 more fields]
scala> b.withColumn("e", array("d", "c"))
res59: org.apache.spark.sql.DataFrame = [a: string, b: timestamp ... 3 more fields]
这种也是可以的不报错,double和string可以组合array
(3)array(double, timestamp)和array(timestamp, double)
scala> b.withColumn("e", array("c", "b"))
org.apache.spark.sql.AnalysisException: cannot resolve 'array(`c`, `b`)' due to data type mismatch: input to function array should all be the same type, but it's [double, timestamp];;
'Project [a#271, b#279, c#273, d#274, array(c#273, b#279) AS e#478]
+- Project [a#271, cast(b#272 as timestamp) AS b#279, c#273, d#274]
+- Project [_1#266 AS a#271, _2#267 AS b#272, _3#268 AS c#273, _4#269 AS d#274]
+- LocalRelation [_1#266, _2#267, _3#268, _4#269]
直接报错,不论是array(timestamp, double)还是array(double, timestamp)都直接报错类型不一致,初步结论是array里面没有string列,因为只要将其中任一一列转化为string就可以执行
scala> b.withColumn("e", array($"c".cast("string"), $"b"))
res77: org.apache.spark.sql.DataFrame = [a: string, b: timestamp ... 3 more fields]
scala> b.withColumn("e", array($"c", $"b".cast("string")))
res78: org.apache.spark.sql.DataFrame = [a: string, b: timestamp ... 3 more fields]
(4)timestamp,double,string组合出现的情况
这种也是有时报错有时不报错,看顺序,直接Kian测试结果
组合 | 报错 |
---|---|
array(double,timestamp,string) | × |
array(timestamp,double,string) | × |
array(string,timestamp,double) | √ |
array(string,double,timestamp) | √ |
array(double,string,timestamp) | √ |
array(timestamp,string,double) | √ |
初步猜测array的书写顺序需要满足:在所有的非string类前面,一定要有至少一个string列
解决方案
解决方案就是手动将所有的非string列先转化为string即可,也就不需要关注书写顺序的问题,改写成如下代码即可
val joinData = data.join(announAmountData, Seq("ent_name"), "left_outer").groupBy($"ent_name")
.agg(collect_list(array($"publish_date".cast("string"), $"target_amt".cast("string"), $"case_position", $"case_reason", $"plaintiff")).as("items"))
.map {
row => {
()
}
}.toDF()
网友评论