- 下面介绍pandas_udf的三种函数使用
# 引入spark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('learnspark').getOrCreate()
sc = spark.sparkContext
Scalar UDFs函数
通常做一些简单计算使用该函数,输出长度与输入长度不变
# pandans udf 使用说明
# Scalar UDFs
# 使用PandasUDFType.SCALAR,当使用select和withColumn是使用该函数
# 输入pandas.Series输出pandas.Series,输入和输出长度相同
# 在函数内部,spark通过将列拆分来批量执行pandas udf,将每个批的函数作为数据的子集调用,然后合并
import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.functions import PandasUDFType
from pyspark.sql.types import LongType,FloatType
from pyspark.sql.types import StructField, StructType
# 形式1
def multiply_func(a, b):
return a * b
multiply = pandas_udf(multiply_func, returnType=FloatType(),functionType=PandasUDFType.SCALAR) # 这里使用PandasUDFType.SCALAR作为functiontype
# 形式2
@pandas_udf(returnType=FloatType(), functionType=PandasUDFType.SCALAR)
def multiply2(a,b):
return a+b
# 现造一个数据集
df = spark.createDataFrame(
[(1,1.0, 1.0, 1), (1,1.2, 2.0, 1), (1,5.6, 3.0, 2), (1,24.4, 5.0, 2), (1,22.3, 10.0, 2)],
('id',"X", "Y","Z"))
df.show()
# 运行函数
df = (df
.withColumn('M', multiply(df.X,df.Y))
.withColumn('N', multiply2(df.X,df.Y))
)
df.show()
[out]:
+---+----+----+---+
| id| X| Y| Z|
+---+----+----+---+
| 1| 1.0| 1.0| 1|
| 1| 1.2| 2.0| 1|
| 1| 5.6| 3.0| 2|
| 1|24.4| 5.0| 2|
| 1|22.3|10.0| 2|
+---+----+----+---+
+---+----+----+---+-----+----+
| id| X| Y| Z| M| N|
+---+----+----+---+-----+----+
| 1| 1.0| 1.0| 1| 1.0| 2.0|
| 1| 1.2| 2.0| 1| 2.4| 3.2|
| 1| 5.6| 3.0| 2| 16.8| 8.6|
| 1|24.4| 5.0| 2|122.0|29.4|
| 1|22.3|10.0| 2|223.0|32.3|
+---+----+----+---+-----+----+
Grouped map UDFs函数
当需要对每行做统计计算但不改变其长度,比如减均值等操作
# Grouped map UDFs
# 这个函数适用于groupBy().apply() 实现“拆分应用组合”模式,它分为三个步骤
# 1.使用DataFrame.groupBy分割数据
# 2.对每个group使用函数,输入输出都是pandas.DataFrame,输入数据包含每个groupy的所有行和列
# 3.合并结果称为一个新的DataFrame
# 使用groupBy().apply()你必须做以下定义
# 1.定义每个group的计算的python函数
# 2.一个StructType对象或者string定义输出DataFrame的schema
### 在应用函数之前,组的所有数据都将加载到内存中。这可能导致内存不足异常,特别是不同group是倾斜的
# 这里将每行减去goup的均值
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
# 形式1
schema = StructType([
StructField("id", LongType(),True),
StructField("v", FloatType(),True)
])
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())
df.groupby("id").apply(subtract_mean).show()
# 形式2
def _subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=v - v.mean())
subtract_mean2 = pandas_udf(_subtract_mean, returnType="id long, v double", functionType=PandasUDFType.GROUPED_MAP) # 这里使用PandasUDFType.GROUPED_MAP作为functiontype
df.groupby("id").apply(subtract_mean2).show()
Grouped aggregate UDFs函数
一些统计计算,返回与输入长度不同
# Grouped aggregate UDFs
# 使用groupBy().agg() 时使用,作为统计函数使用
from sklearn.metrics import silhouette_score # 计算轮廓系数
import pandas as pd
df = spark.createDataFrame(
[(1,1.0, 1.0, 1), (1,1.2, 2.0, 1), (1,5.6, 3.0, 2), (1,24.4, 5.0, 2), (1,22.3, 10.0, 2)],
('id',"X", "Y","Z"))
@pandas_udf(returnType='float',functionType=PandasUDFType.GROUPED_AGG)
def sc(call, Y, X):
merge_df = pd.concat([X, Y], axis=1)
try:
return silhouette_score(merge_df.values, call.values)
except:
return None
df.groupBy('id').agg(sc(df.Z,df.X,df.Y)).show()
[out]:
+---+-----------+
| id|sc(Z, X, Y)|
+---+-----------+
| 1| 0.4213251|
+---+-----------+
网友评论