from pyspark.sql import functions as F
from pyspark.sql.functions import lower, col # 小写
from pyspark.sql.functions import upper, col # 大写
from pyspark.sql.functions import lit # 增加列
from pyspark.sql.functions import when # ifelse
from pyspark.sql.functions import split, explode, concat, concat_ws # split(列数据的分割), explode(一行分成多行) concat,concat_ws(列数据合并)
from pyspark.sql.types import StringType # 导入数据类型
from pyspark.sql.functions import UserDefinedFunction # 定义函数
from pyspark.sql.functions import desc #降序排列
from pyspark.sql.functions import trim # 去空格
a.createOrReplaceTempView("a")
a = spark.sql("select * from a").cache() # 生成pyspark的dataframe
a.show(10) # 查看数据head
df = df.dropDuplicates() / df.select('A_field').distinct().count() # 去重
a.count() # 行数
a.columns # 查看列名
a.dtypes # 查看字段类型
a.printSchema() # 查看数据结构
a.withColumnRenamed("CUST_ID",'ShipToNumber').withColumnRenamed("SKU",'SKUNumber') # 修改列名
a.select('col').describe().show() # 选择某列summariy
b1 = b.drop("col").show() # 删除某列
a.filter(a.col== 504943) # 筛选满足条件的行数
a.filter(col.UPDT_DT >= '2020-01-05') \
.filter(col.INSTIT_NM == 'Unknown').show() # 多条件筛选(and 必须换行)
a1= a.filter(lower(a.current_pagename).like('products:%') # 筛选以a开头的记录
方法一:# 时间戳转换成日期格式
a= a.withColumn('UPDT_DT',F.to_date(a.UPDT_DT))
a= a.withColumn('CRT_DT',F.to_date(a.CRT_DT))
方法二:# 时间戳转换成日期格式
a.select('UPDT_DT').withColumn("UPDT_DT_1",col("UPDT_DT").cast("date"))
from pyspark.sql.functions import lower, col # 字段转换成小写
WEB_USER = spark.table('WEB_USER').withColumn('CONTACT_ID_1', lower(col('WEB_USER.CONTACT_ID'))) # 小写去空格
a.withColumn("USER_NM", upper(trim(col("USER_NM")))).show() # 操作在dataframe上
# 去除开头和结尾的空格
def single_space(col):
return F.trim(F.regexp_replace(col, " +", " "))
# 去除中间的空格
def remove_all_whitespace(col):
return F.regexp_replace(col, "\\s+", "")
spark.table('a').withColumn('a1', lower(remove_all_whitespace(single_space(col("USER_NM"))))).show() # 操作Table上
from pyspark.sql.types import StringType
from pyspark.sql.functions import UserDefinedFunction
to_none = UserDefinedFunction(lambda x: None, StringType())
a1= a.withColumn('new_column', to_none(a['login'])
a.sort('CONTACT_ID_1','USER_NM_1',ascending = False).show() #降序排列 默认为升序 (同升同降)
a.sort(WEB_USER_3.CONTACT_ID_1.desc(),WEB_USER_3.USER_NM_1.asc()).show() # 自定义升降
a.groupBy('CONTACT_ID_1').agg(f.count('CONTACT_ID_1').alias('count')).sort(desc('count')).show() #分组
a.groupBy("login").count().sort(desc("count")).show()
df.groupBy('level').agg(sf.concat_ws(',', sf.collect_list(df.name))).show()
from pyspark.sql.functions import when # ifelse
df = df.withColumn("profile", when(df.age >= 40,"Senior") .otherwise("Executive")) # ifelse
frame3_1 = WEB_USER_3.withColumn("name_length", f.length(WEB_USER_3.USER_NM_1)) # 新生成一列 (查看每个字段的字符长度)
ST_SKU_1.withColumn('Input',F.lit('Viewed')).show()
from pyspark.sql.functions import lit
new_df = df1.withColumn('newCol', lit(0)).show() # 新列为0
new_df = fy_cx_sessions_2.withColumn('new_column_1', lit(None).cast(StringType())) #新列为NULL
df = df1.join(df2, ta.name == tb.name, how='inner'/'outer'/'left'/'right') # 表连接
df.show()
from pyspark.sql.functions import split, explode, concat, concat_ws # 列数据的分割
df_split = df.withColumn("s", split(df['score'], " ")) #切分字段score,生成为s
df_split.show()
ST_SKU_2.withColumn('STSKU',concat(ST_SKU_2['ShipToNumber'],ST_SKU_2['SKUNumber'])) #列数据合并 (没有分隔符)
a.withColumn('STSKU',concat_ws("",a['ShipToNumber'],a['SKUNumber'])) #列数据合并 (指定分隔符)
from pyspark.sql.functions import split, explode, concat, concat_ws # 把数据拉竖(melt)
a1= a.withColumn("SKU", explode(split(a['prod_list'], ","))) # 把数据拉竖(R:melt)
from pyspark.sql.functions import pandas_udf,pandasUDFType
@pandas_udf("user string,PL string,Order_Number integer",pandasUDFType.GROUPED_MAP)
def data_partiotion(df):
V=df.select('Order_Number')
return spark.createDataFrame()
df.withColumn("datetime", col("datetime").cast("timestamp"))
.groupBy("userId", "memberId")
.agg(max_("datetime"))
#注意事项
1 filter (命名)
test = a.groupBy('USER_NM').agg(F.count('USER_NM').alias('count')).sort(desc('count'))
test.filter(test.count > 1).show() 会报错:'>' not supported between instances of 'method' and 'int'
修改成:test.filter(test['count'] > 1).show()
报错原因:'count'为默认方法,名字冲突
网友评论