UDF
为了满足用户的不同的分析需求,Spark允许使用者自己定义函数,供用户在Spark SQL中使用。例如数据科学家可以将一个机器学习模型封装在一个函数内,提供给数据分析师在无需知道模型内部复杂的知识下,直接使用。
例子:创建一个返回立方的函数
# in Python
from pyspark.sql.types import LongType
# create function
def cubed(num):
return num ** 3
# register UDF
spark.udf.register('cubed', cubed, LongType())
# generate a temp view
spark.range(1,9).createOrReplaceTempView('udf_test')
# query
spark.sql('SELECT id, cubed(id) AS id_cubed FROM udf_test').show()
+---+--------+
| id|id_cubed|
+---+--------+
| 1| 1|
| 2| 8|
| 3| 27|
| 4| 64|
| 5| 125|
| 6| 216|
| 7| 343|
| 8| 512|
+---+--------+
Pandas-UDF
为了提升UDF的计算效率,可以使用Python中的Pandas包来创建Pandas UDF(或者叫向量化(Vectorized)UDF)。
关于向量化函数,在Pandas包以及R中的dply族函数,都是很好的例子。
# In Python
import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType
def cubed(a: pd.Series) -> pd.Series:
return a ** 3
cubed_udf = pandas_udf(cubed, returnType = LongType())
spark.range(1,9).createOrReplaceTempView('udf_test')
spark.sql('SELECT id, cubed(id) AS id_cubed FROM udf_test').show()
+---+--------+
| id|id_cubed|
+---+--------+
| 1| 1|
| 2| 8|
| 3| 27|
| 4| 64|
| 5| 125|
| 6| 216|
| 7| 343|
| 8| 512|
+---+--------+
网友评论