美文网首页
SparkSQL之自定义UDF

SparkSQL之自定义UDF

作者: 阿坤的博客 | 来源:发表于2018-11-01 10:42 被阅读20次

    当SparkSQL里内置的函数无法满足我们业务需求时,我们可以通过自定义UDF来实现。

    1、自定义ConcatLongStringUDF

    /**
      * 将两个字段拼接起来(使用指定的分隔符)
      */
    class ConcatLongStringUDF extends UDF3[Long, String, String, String] {
      override def call(v1: Long, v2: String, spilt: String): String = {
        v1.toString + spilt + v2
      }
    }
    

    这里自定义UDF来使用指定的分隔符来拼接Long和String

    2、使用
    首先注册自定义UDF并指定返回类型

    spark.udf.register("concat_long_str", new ConcatLongStringUDF(), DataTypes.StringType)
    

    使用自定义UDF

    spark.sql("select concat_long_str(session_id,page_id,':') from temp_session_page").show()
    

    完成代码UDFTest

    object UDFTest {
      def main(args: Array[String]): Unit = {
        //权限问题
        System.setProperty("HADOOP_USER_NAME", "hadoop")
        val spark =
          SparkSession.builder()
            .appName("UDFTest")
            .master("local[1]")
            .getOrCreate()
    
        spark.udf.register("concat_long_str", new ConcatLongStringUDF(), DataTypes.StringType)
    
        val tempSchema: StructType = StructType(Seq(
          StructField("session_id", LongType, false),
          StructField("page_id", StringType, false)
        ))
    
        val rowRDD =
          spark.sparkContext.parallelize(Array("1,page1", "2,page2"))
            .map(line => {
              val fields = line.split(",")
              Row(fields(0).toLong, fields(1))
            })
    
        val tempDF = spark.createDataFrame(rowRDD, tempSchema)
        tempDF.createOrReplaceTempView("temp_session_page")
    
        spark.sql("select * from temp_session_page").show()
        spark.sql("select concat_long_str(session_id,page_id,':') from temp_session_page").show()
      }
    }
    

    打印结果

    +----------+-------+
    |session_id|page_id|
    +----------+-------+
    |         1|  page1|
    |         2|  page2|
    +----------+-------+
    
    +-------------------------------------------+
    |UDF:concat_long_str(session_id, page_id, :)|
    +-------------------------------------------+
    |                                    1:page1|
    |                                    2:page2|
    +-------------------------------------------+
    

    可以根据自己业务的需求定制更多UDF

    相关文章

      网友评论

          本文标题:SparkSQL之自定义UDF

          本文链接:https://www.haomeiwen.com/subject/lmptxqtx.html