Spark UDF使用详解及代码示例

作者: 董可伦 | 来源:发表于2018-08-09 14:50 被阅读1次

    我的原创地址:https://dongkelun.com/2018/08/02/sparkUDF/

    前言

    本文介绍如何在Spark Sql和DataFrame中使用UDF,如何利用UDF给一个表或者一个DataFrame根据需求添加几列,并给出了旧版(Spark1.x)和新版(Spark2.x)完整的代码示例。

    • 关于UDF:UDF:User Defined Function,用户自定义函数。

    1、创建测试用DataFrame

    下面以Spark2.x为例给出代码,关于Spark1.x创建DataFrame可在最后的完整代码里查看。

    // 构造测试数据,有两个字段、名字和年龄
    val userData = Array(("Leo", 16), ("Marry", 21), ("Jack", 14), ("Tom", 18))
    
    //创建测试df
    val userDF = spark.createDataFrame(userData).toDF("name", "age")
    userDF.show
    
    +-----+---+
    | name|age|
    +-----+---+
    |  Leo| 16|
    |Marry| 21|
    | Jack| 14|
    |  Tom| 18|
    +-----+---+
    
    // 注册一张user表
    userDF.createOrReplaceTempView("user")
    

    2、Spark Sql用法

    2.1 通过匿名函数注册UDF

    下面的UDF的功能是计算某列的长度,该列的类型为String

    2.1.1 注册

    • Spark2.x:
    spark.udf.register("strLen", (str: String) => str.length())
    
    • Spark1.x:
    sqlContext.udf.register("strLen", (str: String) => str.length())
    

    2.2.2 使用

    仅以Spark2.x为例

    spark.sql("select name,strLen(name) as name_len from user").show
    
    +-----+--------+
    | name|name_len|
    +-----+--------+
    |  Leo|       3|
    |Marry|       5|
    | Jack|       4|
    |  Tom|       3|
    +-----+--------+
    

    2.2 通过实名函数注册UDF

    实名函数的注册有点不同,要在后面加 _(注意前面有个空格)
    定义一个实名函数

    /**
     * 根据年龄大小返回是否成年 成年:true,未成年:false
    */
    def isAdult(age: Int) = {
      if (age < 18) {
        false
      } else {
        true
      }
    
    }
    

    注册(仅以Spark2.x为例)

    spark.udf.register("isAdult", isAdult _)
    

    至于使用都是一样的

    2.3 关于spark.udf和sqlContext.udf

    在Spark2.x里,两者实际最终都是调用的spark.udf
    sqlContext.udf源码

    def udf: UDFRegistration = sparkSession.udf
    

    可以看到调用的是sparkSession的udf,即spark.udf

    3、DataFrame用法

    DataFrame的udf方法虽然和Spark Sql的名字一样,但是属于不同的类,它在org.apache.spark.sql.functions里,下面是它的用法

    3.1注册

    import org.apache.spark.sql.functions._
    //注册自定义函数(通过匿名函数)
    val strLen = udf((str: String) => str.length())
    //注册自定义函数(通过实名函数)
    val udf_isAdult = udf(isAdult _)
    

    3.2 使用

    可通过withColumn和select使用,下面的代码已经实现了给user表添加两列的功能

    • 通过看源码,下面的withColumn和select方法Spark2.0.0之后才有的,关于spark1.xDataFrame怎么使用注册好的UDF没有研究
    //通过withColumn添加列
    userDF.withColumn("name_len", strLen(col("name"))).withColumn("isAdult", udf_isAdult(col("age"))).show
    //通过select添加列
    userDF.select(col("*"), strLen(col("name")) as "name_len", udf_isAdult(col("age")) as "isAdult").show
    

    结果均为

    +-----+---+--------+-------+
    | name|age|name_len|isAdult|
    +-----+---+--------+-------+
    |  Leo| 16|       3|  false|
    |Marry| 21|       5|   true|
    | Jack| 14|       4|  false|
    |  Tom| 18|       3|   true|
    +-----+---+--------+-------+
    

    3.3 withColumn和select的区别

    可通过withColumn的源码看出withColumn的功能是实现增加一列,或者替换一个已存在的列,他会先判断DataFrame里有没有这个列名,如果有的话就会替换掉原来的列,没有的话就用调用select方法增加一列,所以如果我们的需求是增加一列的话,两者实现的功能一样,且最终都是调用select方法,但是withColumn会提前做一些判断处理,所以withColumn的性能不如select好。

    • 注:select方法和sql 里的select一样,如果新增的列名在表里已经存在,那么结果里允许出现两列列名相同但数据不一样,大家可以自己试一下。
    /**
     * Returns a new Dataset by adding a column or replacing the existing column that has
     * the same name.
     *
     * @group untypedrel
     * @since 2.0.0
    */
    def withColumn(colName: String, col: Column): DataFrame = {
      val resolver = sparkSession.sessionState.analyzer.resolver
      val output = queryExecution.analyzed.output
      val shouldReplace = output.exists(f => resolver(f.name, colName))
      if (shouldReplace) {
        val columns = output.map { field =>
          if (resolver(field.name, colName)) {
            col.as(colName)
          } else {
            Column(field)
          }
        }
        select(columns : _*)
      } else {
        select(Column("*"), col.as(colName))
      }
    }
    

    4、完整代码

    下面的代码的功能是使用UDF给user表添加两列:name_len、isAdult,每个输出结果都是一样的

    +-----+---+--------+-------+
    | name|age|name_len|isAdult|
    +-----+---+--------+-------+
    |  Leo| 16|       3|  false|
    |Marry| 21|       5|   true|
    | Jack| 14|       4|  false|
    |  Tom| 18|       3|   true|
    +-----+---+--------+-------+
    

    代码:

    package com.dkl.leanring.spark.sql
    
    import org.apache.spark.SparkContext
    import org.apache.spark.sql.SQLContext
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    
    /**
     * Spark Sql 用户自定义函数示例
     */
    object UdfDemo {
    
      def main(args: Array[String]): Unit = {
        oldUdf
        newUdf
        newDfUdf
        oldDfUdf
      }
    
      /**
       * 根据年龄大小返回是否成年 成年:true,未成年:false
       */
      def isAdult(age: Int) = {
        if (age < 18) {
          false
        } else {
          true
        }
    
      }
    
      /**
       * 旧版本(Spark1.x)Spark Sql udf示例
       */
      def oldUdf() {
    
        //spark 初始化
        val conf = new SparkConf()
          .setMaster("local")
          .setAppName("oldUdf")
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
    
        // 构造测试数据,有两个字段、名字和年龄
        val userData = Array(("Leo", 16), ("Marry", 21), ("Jack", 14), ("Tom", 18))
        //创建测试df
        val userDF = sc.parallelize(userData).toDF("name", "age")
        // 注册一张user表
        userDF.registerTempTable("user")
    
        // 注册自定义函数(通过匿名函数)
        sqlContext.udf.register("strLen", (str: String) => str.length())
    
        sqlContext.udf.register("isAdult", isAdult _)
        // 使用自定义函数
        sqlContext.sql("select *,strLen(name)as name_len,isAdult(age) as isAdult from user").show
        //关闭
        sc.stop()
    
      }
    
      /**
       * 新版本(Spark2.x)Spark Sql udf示例
       */
      def newUdf() {
        //spark初始化
        val spark = SparkSession.builder().appName("newUdf").master("local").getOrCreate()
    
        // 构造测试数据,有两个字段、名字和年龄
        val userData = Array(("Leo", 16), ("Marry", 21), ("Jack", 14), ("Tom", 18))
    
        //创建测试df
        val userDF = spark.createDataFrame(userData).toDF("name", "age")
    
        // 注册一张user表
        userDF.createOrReplaceTempView("user")
    
        //注册自定义函数(通过匿名函数)
        spark.udf.register("strLen", (str: String) => str.length())
        //注册自定义函数(通过实名函数)
        spark.udf.register("isAdult", isAdult _)
        spark.sql("select *,strLen(name) as name_len,isAdult(age) as isAdult from user").show
    
        //关闭
        spark.stop()
    
      }
    
      /**
       * 新版本(Spark2.x)DataFrame udf示例
       */
      def newDfUdf() {
        val spark = SparkSession.builder().appName("newDfUdf").master("local").getOrCreate()
    
        // 构造测试数据,有两个字段、名字和年龄
        val userData = Array(("Leo", 16), ("Marry", 21), ("Jack", 14), ("Tom", 18))
    
        //创建测试df
        val userDF = spark.createDataFrame(userData).toDF("name", "age")
        import org.apache.spark.sql.functions._
        //注册自定义函数(通过匿名函数)
        val strLen = udf((str: String) => str.length())
        //注册自定义函数(通过实名函数)
        val udf_isAdult = udf(isAdult _)
    
        //通过withColumn添加列
        userDF.withColumn("name_len", strLen(col("name"))).withColumn("isAdult", udf_isAdult(col("age"))).show
        //通过select添加列
        userDF.select(col("*"), strLen(col("name")) as "name_len", udf_isAdult(col("age")) as "isAdult").show
    
        //关闭
        spark.stop()
      }
      /**
       * 旧版本(Spark1.x)DataFrame udf示例
       * 注意,这里只是用的Spark1.x创建sc的和df的语法,其中注册udf在Spark1.x也是可以使用的的
       * 但是withColumn和select方法Spark2.0.0之后才有的,关于spark1.xDataFrame怎么使用注册好的UDF没有研究
       */
      def oldDfUdf() {
        //spark 初始化
        val conf = new SparkConf()
          .setMaster("local")
          .setAppName("oldDfUdf")
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
    
        // 构造测试数据,有两个字段、名字和年龄
        val userData = Array(("Leo", 16), ("Marry", 21), ("Jack", 14), ("Tom", 18))
        //创建测试df
        val userDF = sc.parallelize(userData).toDF("name", "age")
        import org.apache.spark.sql.functions._
        //注册自定义函数(通过匿名函数)
        val strLen = udf((str: String) => str.length())
        //注册自定义函数(通过实名函数)
        val udf_isAdult = udf(isAdult _)
    
        //通过withColumn添加列
        userDF.withColumn("name_len", strLen(col("name"))).withColumn("isAdult", udf_isAdult(col("age"))).show
        //通过select添加列
        userDF.select(col("*"), strLen(col("name")) as "name_len", udf_isAdult(col("age")) as "isAdult").show
    
        //关闭
        sc.stop()
      }
    
    }
    
    
    image

    相关文章

      网友评论

        本文标题:Spark UDF使用详解及代码示例

        本文链接:https://www.haomeiwen.com/subject/brtcbftx.html