lag,lead平移
from pyspark.sql import SparkSession
from pyspark.sql.functions import lag, lead
import pyspark.sql.functions as F
from pyspark.sql.window import Window
spark = SparkSession.builder.appName('my_first_app').getOrCreate()
df = spark.createDataFrame([(1,2,3), (4,2,6), (7,2,9)],
["a", "b", "c"])
df.show()
df.withColumn('aa', lag('a').over(
Window.partitionBy('b').orderBy('a')
)
).show()
scala加载xxx.scala文件
# 前面冒号不可省略
:load xxx.scala
join
# 列名相同时,明确父类
df1.join(df2, df1("a") == df2("a")).select(df1("f")).show(2)
# 设置表别名
df1.alias("df1").join(df2.alias("df2"), "a").select($"a", $"df1.f"+$"df2.f").show(5)
计算缺失值比例
test.agg(*[(1-(F.count(c) /F.count('*'))).alias(c+'_missing') for c in test.columns]).show()
自定义函数F.udf
def get_week(time):
return time.weekday()
toWeekUDF = F.udf(get_week, IntegerType()) # 参数1为自定义函数, 参数2为返回类型
test.select(toWeekUDF(test.operTime)).show(5)
test.select(F.dayofweek('operTime')).show(5)
# 多参数传递
def map_dict(x, col):
return feature_dict_sc.value[col][x]
mapDictUDF = F.udf(map_dict, IntegerType())
test.select(mapDictUDF(test.siteId, F.lit('siteId')).alias('ss')).show(5)
def mapDict(col):
return F.udf(lambda x:feature_dict_sc.value[col][x])
test.withColumn('ss', mapDict('siteId')(test.siteId)).show(5)
F函数库
agg
需要用整列处理函数对应,如min
,max
, select
需要用每行处理函数,如hour
,dayofweek
网友评论