摘要:Spark SQL
,Scala
由于Scala限制Tuple最大元素为22个导致的血案
问题复现
实际的业务场景是使用Spark SQL加工数仓的表,由于逻辑比较复杂如果直接Spark SQL自带的算子实现需要多好几次groupBy和join,因此直接使用DataFrame.map
算子,转为RDD操作再转为Dataframe一次搞定。固定格式如下
DataFrame
.map {
row => {
// TODO业务逻辑
// 列字段输出
(element1, element2, element3...)
}
}.toDF(columnName1, columnName2, columnName3...)
最后输出在一个Tuple中使用toDF即可完成要求,但是Scala的Tuple要求元素最大22个
scala> (1, 2, 3, 4, 5, 6, 7, 8, 9, 10,1,1,1,1,1,1,1,1,1,1,1,1,1,1,1)
<console>:1: error: too many elements for tuple: 25, allowed: 22
Spark SQL要加工的业务字段为25个,显然得想其他办法去实现
初步解决方案
百度使用Array的方式解决,将所有字段包装在一个Array中,在获得Dataframe之后使用select($array字段名(索引))
来完成,这个方法是正确,直接绕过了Tuple,以一个例子代码试一下
val a = Seq(("a", 1), ("b", 2)).toDF("a", "b")
.map {
row => {
val one: String = row.getAs[String]("a")
val two: Int = 1
(one, Array(one, two))
}
}.toDF("a", "b").select($"a", $"b"(0).as("two"), $"b"(1).as("three"))
以上例子假设one和tow分别是业务要求加工的两个字段,执行报错
java.lang.ClassNotFoundException: scala.Any
直接报错在map那一行,第二个bug出现了,这个问题的原因是Array中元素类型不一致,Scala直接推断为Any,而Array[Any]
不能toDF
scala> Seq(("a", Array(1, "a")), ("b", Array(2, "b"))).toDF("a", "b")
java.lang.ClassNotFoundException: scala.Any
继续修改将所有Array中元素改为String类型,在最后select的时候再将各别字段使用cast
转回来,新代码如下
val a = Seq(("a", 1), ("b", 2)).toDF("a", "b")
.map {
row => {
val one: String = row.getAs[String]("a")
val two: Int = 1
(one, Array(one, two.toString))
}
}.toDF("a", "b").select($"a", $"b"(0).as("two"), $"b"(1).as("three").cast("int"))
问题似乎解决了,但是实际业务代码继续报错,第三个bug
java.lang.ClassNotFoundException: <refinement>
深入检查时候发现Array中有Option[String]
对象,Array中存在String和Option[String]两种类型。因为某些业务指标有空值,所有使用了Some和None类型,因此改成了最终方案如下
最终解决方案
解决方案是引入多个Array,将同一数据类型的字段放在一个Array中,代码如下
val a = Seq(("a", 1), ("b", 2)).toDF("a", "b")
.map {
row => {
val one = row.getAs[String]("a")
(one, Array(1, 2), Array(Some(one), None))
}
}.toDF("a", "b", "c").select($"a", $"b"(0).as("two"), $"c"(0).as("three"))
这样直接避免了数据类型不一致的问题,并且在最后也不需要在转化类型了
网友评论