美文网首页
spark UDF UDAF

spark UDF UDAF

作者: 400476cab371 | 来源:发表于2017-12-27 15:28 被阅读141次

    spark中也可以自定义函数,而且非常简单就是一把梭

    UDF:user Defined Function用户自定义函数,通过输入值,对值进行操作

    * UDAF:User Defined Aggregation Function 用户自定义的聚合函数,函数本身作用于数据集合

    能够在聚合操作的基础上进行自定义操作 UDF和UDAF都要进行注册,udaf唯一要注意的是UserDefinedAggregateFunction的那8个方法的重写,就在这里你可以自定义你的算法)

    代码如下:

    val conf =new SparkConf().setMaster("local").setAppName("SparkUdf")

    val sc =new SparkContext(conf)

    val sqlContext =new SQLContext(sc)

    //模拟数据

    val bigdata =Array("spark","hadoop","flink","storm","hadoop","spark","flink","storm","hadoop","hadoop")

    //创建df

    val bigDataRdd = sc.parallelize(bigdata)

    val bigRDDRow = bigDataRdd.map(Row(_))

    val structType = StructType(Array(

    new StructField("word", StringType,true)

    ))

    val bigDataDF = sqlContext.createDataFrame(bigRDDRow, structType)

    //注册临时表

    bigDataDF.registerTempTable("bigdatatable")

    /**

    * 通过sqlcontext注册UDF

    */

    sqlContext.udf.register("computeLength", (key:String) => key.length)

    //通过上面即可使用这个函数了

    //sqlContext.sql("select word,computeLength(word) as length from bigdatatable").show

    sqlContext.udf.register("wordcount",new firstUDAF)

    sqlContext.sql("select word,wordcount(word) as count ,computeLength(word) as length " +

    "from bigdatatable GROUP BY word").show

    firstUDAF类如下:

    override def inputSchema: StructType = StructType(

    Array(StructField("input",StringType,true))

    )

    /**

    * 在进行聚合操作的时候,所要处理的数据的结果的类型

    */

    override def bufferSchema: StructType = StructType(

    Array(StructField("count",IntegerType,true))

    )

    /**

    * 指定UDAF计算后返回的结果类型

    *

    */

    override def dataType: DataType = IntegerType

    /**

    * 确保一致性,一般都用true

    */

    override def deterministic: Boolean =true

    /**

    * 在Aggregate之前魅族数据的初始化结果

    */

    override def initialize(buffer: MutableAggregationBuffer): Unit = {

    buffer(0) =0

    }

    /**

    * 在进行聚合的时候,每当有新的值进来,对分组后的聚合如何进行计算

    * 本地的聚合操作

    */

    override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {

    //getAs返回位置i的值

      buffer(0) = buffer.getAs[Int](0)+1

    }

    /**

    * 最后在分布式节点进行local reduce完成后需要进行全局级别的Merge操作

    */

    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {

    buffer1(0) = buffer1.getAs[Int](0) + buffer2.getAs[Int](0)

    }

    /**

    * 返回最终结果

    */

    override def evaluate(buffer: Row): Any = {

    buffer.getAs[Int](0)

    }

    相关文章

      网友评论

          本文标题:spark UDF UDAF

          本文链接:https://www.haomeiwen.com/subject/udjugxtx.html