美文网首页
spark udfjoin

spark udfjoin

作者: hehehehe | 来源:发表于2021-12-27 16:16 被阅读0次

udf的两种注册方式

此时注册的方法 只能在sql()中可见,对DataFrame API不可见
register(self, name, f, returnType=None)
`returnType` can be optionally specified when `f` is a Python function but not
        when `f` is a user-defined function. Please see below.
spark.udf.register('sjoin2', lambda s1,s2: True if s1[:1] == s2[:1] else False, BooleanType())


此时注册的方法,对外部可见
@udf(returnType=StringType())
def add_col(s1: str):
    s11 = s1[:1]
    return s11+"xx"

spark.udf.register('add_col', add_col)

from pyspark.sql.functions import udf
from pyspark.sql.types import StructType, StringType, StructField, FloatType, TimestampType, IntegerType, ArrayType,BooleanType

df1 = sc.parallelize([['1a',1],['2b',2],['3c',3]]).toDF()
df2 = sc.parallelize([['1a',1],['1b',2],['1c',3]]).toDF()
@udf(returnType=StringType())
def add_col(s1: str):
    s11 = s1[:1]
    return s11+"xx"
df3 = df1.withColumn('_3',add_col(df1._1))
df3.show()
+---+---+---+
| _1| _2| _3|
+---+---+---+
| 1a|  1|1xx|
| 2b|  2|2xx|
| 3c|  3|3xx|
+---+---+---+
df1.createOrReplaceTempView('t1')
df2.createOrReplaceTempView('t2')

spark.udf.register('sjoin2', lambda s1,s2: True if s1[:1] == s2[:1] else False, BooleanType())

spark.sql('select * from t1, t2 where sjoin2(t1._1,t2._1)').show()
+---+---+---+---+
| _1| _2| _1| _2|
+---+---+---+---+
| 1a|  1| 1a|  1|
| 1a|  1| 1b|  2|
| 1a|  1| 1c|  3|
+---+---+---+---+
@udf(returnType=BooleanType())
def sjoin1(s1: str, s2: str):
    if s1[:1] == s2[:1]:
        return True
    return False

df1.join(df2, sjoin1(df1._1,df2._1)).show()
+---+---+---+---+
| _1| _2| _1| _2|
+---+---+---+---+
| 1a|  1| 1a|  1|
| 1a|  1| 1b|  2|
| 1a|  1| 1c|  3|
+---+---+---+---+

相关文章

网友评论

      本文标题:spark udfjoin

      本文链接:https://www.haomeiwen.com/subject/kzpyqrtx.html