什么是UDAF?
UDAF(User Defined Aggregate Function),即用户定义的聚合函数,聚合函数和普通函数的区别是什么呢,普通函数是接受一行输入产生一个输出,聚合函数是接受一组(一般是多行)输入然后产生一个输出,即将一组的值想办法聚合一下。类似于sum操作,spark的udf使用看这里
直接看下面的demo,计算1-10的平均值,代码也比较简单
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, IntegerType, StructField, StructType}
import org.apache.spark.sql.{Dataset, Row, SparkSession}
import java.lang
/**
* spark的UDAF使用
*/
object UDAF {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("UDAFDemo")
.master("local[1]")
.getOrCreate()
val ds: Dataset[lang.Long] = spark.range(1,10)
ds.createTempView("test")
spark.udf.register("jason",new MyUdaf)
spark.sql("select jason(id) as jason from test").show()
}
}
class MyUdaf extends UserDefinedAggregateFunction {
// 定义聚合函数的输入结构类型
override def inputSchema: StructType = StructType(Array(StructField("age",IntegerType)))
// 聚合缓冲区中值的数据类型
override def bufferSchema: StructType = StructType(Array(StructField("count",IntegerType),StructField("ages",IntegerType)))
// 聚合函数返回的数据类型
override def dataType: DataType = IntegerType
// 聚合函数是否是幂等的,即相同输入是否总是能得到相同输出
override def deterministic: Boolean = true
// 初始化缓冲区
override def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 0; buffer(1) = 0
}
// 写入新数据后更新缓冲区的值
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if (input.isNullAt(0)) return
buffer(0) = buffer.getInt(0) + 1
buffer(1) = buffer.getInt(1) + input.getInt(0)
}
// 合并聚合函数缓冲区
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getInt(0)+buffer2.getInt(0)
buffer1(1) = buffer1.getInt(1)+buffer2.getInt(1)
}
// 计算并返回结果
override def evaluate(buffer: Row): Any = buffer.getInt(1)/buffer.getInt(0)
}
熟悉Flink的朋友会发现,这个用法跟Flink的agg函数特别的像,用法几乎一模一样.有兴趣的可以去研究一下.
运行打印结果:
+-----+
|jason|
+-----+
| 5|
+-----+
如果有写的不对的地方,欢迎大家指正,如果有什么疑问,可以加QQ群:340297350,更多的Flink和spark的干货可以加入下面的星球
![image.png](https://img.haomeiwen.com/i13697815/b7d29652a77a84e3.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
网友评论