美文网首页
sparkSQL的UDF及其参数的动态拓展

sparkSQL的UDF及其参数的动态拓展

作者: 不像狮子座 | 来源:发表于2019-12-17 14:37 被阅读0次

    1、sparkSQL中的UDF

    • UDF(User Defined Function):通常sparkSQL的内置函数满足大部分需求,面对特定需求,往往需要用户自定义函数。
    • UDF的开发
    package com.bw.spark.udf
    
    import org.apache.commons.lang3.StringUtils
    
    object UDF {
        
        # 获取主要字段拼接后的hashCode
        def getMainColsHash(a: String, b: String): String = {
          
            val cols = checkBlank(a) + checkBlank(b)
          
            cols.hashCode.toString
        }
        
        private def checkBlank(str: String): String = {
          if (StringUtils.isBlank(str)){
              ""
          }else{
              str
          }
        }
    }
    
    • UDF的使用
    import com.bw.spark.udf.UDF
    spark.udf.register("getMainColsHash",UDF.getMainColsHash _)
    spark.sql(
            """
              |select
              |a,
              |b,
              |getMainColsHash(a, b) as zyzdhx
              |from test
            """.stripMargin).show(3)
    
    

    2、sparkSQL中的UDF参数的动态拓展

    • UDF参数的拓展:如上udf,如果开发好了,但有需要求3个字段拼接后的hashCode,就需要代码改造,如果开发成动态拓展参数,就可以避免因参数数量变化带来的代码改动。
    • UDF参数拓展的开发:
    package com.bw.spark.udf
    
    import org.apache.commons.lang3.StringUtils
    
    object UDF {
        # 获取更多字段拼接后的hashCode
        def getMoreColsHash(a: String, b: String, args: String*): String = {
      
            val cols = checkBlank(a) + checkBlank(b)
            val colAdd = if (args.isEmpty){
                ""
            }else{
                val argArray = new Array[String](args.length)
                for (i <- 0 until args.length){
                argArray(i) = checkBlank(args(i))
                }
                argArray.mkString("")
            }
            val colStr = cols + colAdd
            
            colStr.hashCode.toString
        }
        
        private def checkBlank(str: String): String = {
          if (StringUtils.isBlank(str)){
              ""
          }else{
              str
          }
        }
    }
    
    • UDF的使用
    import com.bw.spark.udf.UDF
    spark.udf.register("getMoreColsHash", UDF.getMoreColsHash _)
    spark.sql(
                """
                  |select
                  |a,
                  |b,
                  |c,
                  |d,
                  |getMoreColsHash(a, b, array(c, d)) as gdzdhx
                  |from test
                """.stripMargin).show(3)
    

    注意:拓展字段以Array的形式放在参数最后的位置,Array里元素的类型与参数定义类型保持一致。

    相关文章

      网友评论

          本文标题:sparkSQL的UDF及其参数的动态拓展

          本文链接:https://www.haomeiwen.com/subject/qovsnctx.html