1、sparkSQL中的UDF
- UDF(User Defined Function):通常sparkSQL的内置函数满足大部分需求,面对特定需求,往往需要用户自定义函数。
- UDF的开发
package com.bw.spark.udf
import org.apache.commons.lang3.StringUtils
object UDF {
# 获取主要字段拼接后的hashCode
def getMainColsHash(a: String, b: String): String = {
val cols = checkBlank(a) + checkBlank(b)
cols.hashCode.toString
}
private def checkBlank(str: String): String = {
if (StringUtils.isBlank(str)){
""
}else{
str
}
}
}
- UDF的使用
import com.bw.spark.udf.UDF
spark.udf.register("getMainColsHash",UDF.getMainColsHash _)
spark.sql(
"""
|select
|a,
|b,
|getMainColsHash(a, b) as zyzdhx
|from test
""".stripMargin).show(3)
2、sparkSQL中的UDF参数的动态拓展
- UDF参数的拓展:如上udf,如果开发好了,但有需要求3个字段拼接后的hashCode,就需要代码改造,如果开发成动态拓展参数,就可以避免因参数数量变化带来的代码改动。
- UDF参数拓展的开发:
package com.bw.spark.udf
import org.apache.commons.lang3.StringUtils
object UDF {
# 获取更多字段拼接后的hashCode
def getMoreColsHash(a: String, b: String, args: String*): String = {
val cols = checkBlank(a) + checkBlank(b)
val colAdd = if (args.isEmpty){
""
}else{
val argArray = new Array[String](args.length)
for (i <- 0 until args.length){
argArray(i) = checkBlank(args(i))
}
argArray.mkString("")
}
val colStr = cols + colAdd
colStr.hashCode.toString
}
private def checkBlank(str: String): String = {
if (StringUtils.isBlank(str)){
""
}else{
str
}
}
}
- UDF的使用
import com.bw.spark.udf.UDF
spark.udf.register("getMoreColsHash", UDF.getMoreColsHash _)
spark.sql(
"""
|select
|a,
|b,
|c,
|d,
|getMoreColsHash(a, b, array(c, d)) as gdzdhx
|from test
""".stripMargin).show(3)
注意:拓展字段以Array的形式放在参数最后的位置,Array里元素的类型与参数定义类型保持一致。
网友评论