udf的两种注册方式
此时注册的方法 只能在sql()中可见,对DataFrame API不可见
register(self, name, f, returnType=None)
`returnType` can be optionally specified when `f` is a Python function but not
when `f` is a user-defined function. Please see below.
spark.udf.register('sjoin2', lambda s1,s2: True if s1[:1] == s2[:1] else False, BooleanType())
此时注册的方法,对外部可见
@udf(returnType=StringType())
def add_col(s1: str):
s11 = s1[:1]
return s11+"xx"
spark.udf.register('add_col', add_col)
from pyspark.sql.functions import udf
from pyspark.sql.types import StructType, StringType, StructField, FloatType, TimestampType, IntegerType, ArrayType,BooleanType
df1 = sc.parallelize([['1a',1],['2b',2],['3c',3]]).toDF()
df2 = sc.parallelize([['1a',1],['1b',2],['1c',3]]).toDF()
@udf(returnType=StringType())
def add_col(s1: str):
s11 = s1[:1]
return s11+"xx"
df3 = df1.withColumn('_3',add_col(df1._1))
df3.show()
+---+---+---+
| _1| _2| _3|
+---+---+---+
| 1a| 1|1xx|
| 2b| 2|2xx|
| 3c| 3|3xx|
+---+---+---+
df1.createOrReplaceTempView('t1')
df2.createOrReplaceTempView('t2')
spark.udf.register('sjoin2', lambda s1,s2: True if s1[:1] == s2[:1] else False, BooleanType())
spark.sql('select * from t1, t2 where sjoin2(t1._1,t2._1)').show()
+---+---+---+---+
| _1| _2| _1| _2|
+---+---+---+---+
| 1a| 1| 1a| 1|
| 1a| 1| 1b| 2|
| 1a| 1| 1c| 3|
+---+---+---+---+
@udf(returnType=BooleanType())
def sjoin1(s1: str, s2: str):
if s1[:1] == s2[:1]:
return True
return False
df1.join(df2, sjoin1(df1._1,df2._1)).show()
+---+---+---+---+
| _1| _2| _1| _2|
+---+---+---+---+
| 1a| 1| 1a| 1|
| 1a| 1| 1b| 2|
| 1a| 1| 1c| 3|
+---+---+---+---+
网友评论