0.创建dataframe
data=[(1,'jay'),(2,'tom')]
schema=['id', 'name']
df = spark.createDataFrame(data=data,schema=schema)
df.show()
1. 读取/写入csv
- 读取单个csv
df = spark.read.csv(path='dbfs://Filestore/data/employee1.csv',head= True)
df.display()
df.printSchema()
- 读取多个csv
df = spark.read.csv(path=['dbfs://Filestore/data/employee1.csv','dbfs://Filestore/data/employee2.csv'],head= True)
- 读取文件夹下所有的csv
df = spark.read.csv(path='dbfs://Filestore/data/',header = True)
- 读取所有文件并且添加指定类型
from pyspark.sql.types import *
schema = StructType().add(field = 'id', data_type=IntegerType()).
add(field = 'name', data_type=StringType()).
add(field = 'gender', data_type=StringType()).
add(field = 'salary', data_type=StringType())
df = spark.read.csv(path='dbfs://Filestore/data/',schema = schema,header = True)
df.printSchema()
- 写入数据
df.write.csv(path='dbfs://Filestore/data/',header = True,mode='ignore')
- 注意:overwrite:将其覆盖并写入新的数据。这会删除先前存在的数据;append:如果指定的输出路径已经存在,将新数据附加到现有数据的末尾;ignore:如果指定的输出路径已经存在,不会进行任何操作。如果输出路径不存在,将创建新的输出文件;error:如果指定的输出路径已经存在,会引发异常并阻止写入。
2.读取/写入json到dataframe
- 通常一个json文件有一个json对象,但是如果有多个那么需要将multiline设置为true
df =spark.read.json(path='dbfs://Filestore/data/emps.json' multiline=True)
3.读取/写parquet
- 读取parquet
df= spark.read.parquet(path='dbfs://Filestore/data/demo.parquet')
- 写
df= spark.write.parquet(path='dbfs://Filestore/data/demo2.parquet')
4.显示数据
- 显示数据,其中truncate表示是否截断数据
df.show(truncate =False)
5.withcolumn对列进行操作
添加新列
df = df.withcolumn('age_plus_10',df['age']+10)
替换之前的列
df = df.withcolumn('age',df['age']**2)
删除列
df = df.drop('age_plus_10')
类型转换
将整列转换类型
# 将 "count" 列的数据类型从字符串转换为整数
df = df.withColumn("count", df["count"].cast("integer"))
# 将 "amount" 列的数据类型从浮点数转换为整数
df = df.withColumn("amount", df["amount"].cast("integer"))
重命名列名
df.withColumnRenamed("gender","sex")
条件处理
from pyspark.sql.functions import when
df = df.withColumn("age_group", when(df["age"] < 30, "Young").otherwise("Old"))
日期处理
from pyspark.sql.functions import datediff, year, month
df = df.withColumn("days_since_birth", datediff(current_date(), df["birth_date"]))
窗口函数
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
# 定义窗口规范
window_spec = Window.partitionBy("department").orderBy("salary")
# 使用窗口函数为每个部门计算排名并创建新列
df = df.withColumn("rank_in_department", row_number().over(window_spec))
使用UDF用户自定义函数
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
# 定义一个 Python 函数
def double_age(age):
return age * 2
# 注册 UDF
double_age_udf = udf(double_age, IntegerType())
# 使用 UDF 创建一个新列
df = df.withColumn("double_age", double_age_udf(df["age"]))
6. 定义结构
- 复杂结构的字符串 结构
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
data = [(1,'Maheer','Shark'),3000),(2,('Wafa','Shaik'),4000]
structName=StructType([StructField(‘firstName',StringType()), StructField('lastName',StringType())])
schema = StructType([StructField(name='id', datatype=IntegerType()),StructField(name='name', dataType=structName,StructField(name='salary',dataType=IntegerType())])
df=spark.createDataframe(data,schema)
display(df)
df.printSchema()
- 复杂的数组结构
data = [('abc',[1,2]),('mon',[4,5]),('xyz',[6,7])]
schema = StuctType(StructField('id',StringType()),StructField('number',ArrayType(IntegerType()))])
df = spark.createDataFrame(data,schema)
df.show()
df.printSchema()
display(df)
- 查看数组的值
df.withcolumn('firstNumber',df.numbers[0]).show()
7.explode, split,array
- explode:用于展开数组类型的列,将一个数组中的元素拆分成多行,每行包含一个元素。
from pyspark.sql.functions import explode
# 创建一个包含数组的 DataFrame
data = [(1, [1, 2, 3]), (2, [4, 5]), (3, [6])]
df = spark.createDataFrame(data, ["id", "values"])
# 使用 explode 将数组列 "values" 拆分为多行
exploded_df = df.select("id", explode("values").alias("value"))
exploded_df.show()
- split:用于将字符串列拆分为数组,根据指定的分隔符将字符串拆分为多个元素。
from pyspark.sql.functions import split
# 创建一个包含字符串的 DataFrame
data = [(1, "apple,banana,cherry"), (2, "grape,kiwi"), (3, "orange")]
df = spark.createDataFrame(data, ["id", "fruits"])
# 使用 split 将字符串列 "fruits" 拆分为数组
df = df.withColumn("fruit_list", split(df["fruits"], ","))
df.show()
8. 创建定义字典结构的DataFrame
- 使用MapType来定义字典结构
from pyspark.sql.types import StructType, StructField, StringType, MapType
data =[('maher',{'hair':'black','eye':'brown'}),('wafa',{'hair':'black','eye':'blue'})]
schema = StructType([StructField('name':StringType()),StructField('properties',MapType(StringType(),StringType()))])
df = spark.createDataFrame(data,schema)
df.show(truncate = False)
df.printSchema()
display(df)
- 访问上面字典的值
df1 = df.withColunm('hair',df.properties.getItem('hair'))
df1.show(truncate=False)
- may_keys()获取字典的key,返回列表
df1=df.withColumn('keys',map_keys(df.properties))
df1.show(truncate=False)
![](https://img.haomeiwen.com/i14814834/ffc201e2010ff26f.png)
- may_values()获取字典的values,返回列表
df1=df.withColumn('keys',map_values(df.properties))
df1.show(truncate=False)
![](https://img.haomeiwen.com/i14814834/9946284c8f5324f8.png)
9.Row()来创建DataFrame
- 通过行的形式创建DataFrame
from pyspark.sql import Row
row1 = Row(name='maheer', salary= 2000)
row2 = Row(name= 'wafa', salary= 30000)
data = [row1,row2]
df = spark.createDataFrame(data)
df.show()
- 创建嵌套结构
from pyspark.sql import Row
data = [Row(name='maheer',prop=Row(hair='black',eye='blue')),Row(name='wafa',prop=Row(hair='grey',eye='black'))]
df=spark.createDataFrame(data)
df.printSchema()
10.列操作
- 获取一列的值
from pyspark.sql.functions import col
#way1:
df.select(df.gender).show()
#way2:
df.select(df['gender'].show()
#way3:
df.select(col('gender')).show()
- 获取嵌套结构的值
from pyspark.sql.functions import col
df.select(df.props.harir).show()
df.select(df['props.hair']).show()
df.select(col('props.hair')).show()
11.when()&otherwise()
- 等于case when
df1 = df.select(df.id,df.name,when(df.gender='M','male').when(df.gender=='F','female').otherwise('unknown').alias(''gender'))
df1.show()
![](https://img.haomeiwen.com/i14814834/5e361315130733d4.png)
12.alias,cast
- alias:别名
df.select(df.id.alias('emp_id'),df.name.alias('emp_name').show()
- cast: 转换格式
df.withcolumn('id',df.id.cast('int'))
13.filter,where
- filter条件过滤
df.filter(df.age>3).show()
df.filter(df.name.like('ga%')).show()
- where条件过滤
df.where(df.age>3).show()
- 除了写法不同,用法完全一样
14.去重
- distinct:查看重复
distinct_df = df.distinct(["first_name", "last_name", "age"])
- dropDuplicates:删除重复
df.dropDuplicates(["first_name", "last_name", "age"])
15. 排序
- sort和orderBy功能一样
df.orderby(df.age.desc(),df.salary.asc()).show()
16. 分组groupBy
- 类似于sql的分组
df1= df.groupBy('dep','gender').count()
df2= df.groupBy('dep').min('salary')
![](https://img.haomeiwen.com/i14814834/5aff3e13a147cd53.png)
- agg()分组之后传入多个运算
from pyspark.sql.funtions import count,min,max
df.groupBy('dep').agg(count('*).alias('countOfEmps'),min('salary').alias('minSal'),max('salary').alias('maxSal')).show()
![](https://img.haomeiwen.com/i14814834/c99e11083636288e.png)
17.合并相同格式DataFrame,union&unionAll
- union:合并,不进行处理,不删除重复
newDf = df1.union(df2)
![](https://img.haomeiwen.com/i14814834/e7b8aad1d8328ec1.png)
- unionAll:合并,并且删除重复
newDf = df1.unionAll(df2)
newDf.distinct().show()
18.合并不同格式的DataFrame,unionByName()
- unionByName:将不同格式的字段,显示为Null
newDf = df1.unionByName(df2,allowMissingColumns=True).show()
![](https://img.haomeiwen.com/i14814834/2c27d8782bf60fda.png)
19.Select()选择列
- 选择多列
df.select('id','name').show()
df.select(df.id,df.name).show()
df.select(df['id'],df['name']).show()
20. 内连接,左连接
- 内连接:左右表只显示匹配到的数据
empDf.join(depDf,empDf.dep==depDf.id,'inner').show()
![](https://img.haomeiwen.com/i14814834/c17a75157059edfd.png)
- 左连接:保留左表数据,没匹配到的显示为null
empDf.join(depDf,empDf.dep==depDf.id,'left').show()
![](https://img.haomeiwen.com/i14814834/8c446763b36ac180.png)
21.leftsemi,leftanti
- leftsemi:只返回左表匹配到的数据,没有匹配到不显示
empDf.join(depDf,empDf.dep==depDf.id,'leftsemi').show()
![](https://img.haomeiwen.com/i14814834/2ef14c6fadf696ec.png)
- leftanti:只返回左表没匹配到的数据
empDf.join(depDf,empDf.dep==depDf.id,'leftanti').show()
![](https://img.haomeiwen.com/i14814834/ac9ff1ebf51d7ca3.png)
-
自链接
image.png
22. 行转列pivot
- 直接行转列
df.groupBy('dep').pivot('gender').count().show()
![](https://img.haomeiwen.com/i14814834/40547ac54dd4bbd6.png)
- 可以进行筛选,传入一个或者多个
df.groupBy('dep').pivot('gender',['male']).count().show()
- 列转行
from pyspark.sql.functions import expr
unpivotDf = df.select('dep',expr("stack(2,'M',male,'F','female') as (gender,count)")).show()
23. 填充空值
- fillna:将null值全部填充
df.fillna('unkown').show()
- fill: 选取指定的列填充
df.na.fill('unknown',['dep','gender']).show()
24. 创建临时表
- 创建临时表,并且使用sql查询
df.createOrReplaceTempView('employees')
%sql
select id, name from employees
25. UDF自定义
- 和withColumn使用
@udf(returnType=IntegerTyep())
def totalPayment(a,b):
return a+b
df.withColumn('totalPay',TotalPayment(df.salary,df.bonus).show()
26.使用partitionBy分区存储parquet
df.write.parqeut('/FileStore/employees',mode ='overwrite', partitionBy = 'dep')
![](https://img.haomeiwen.com/i14814834/a6b0e7b13cc8eab6.png)
27.使用MapType/StructType解析Json
- 创建DataFrame
data = [('maheer','{"hair": "black", "eye": "brown"}')]
schema = ['id', 'props']
df = spark.createDataFrame(data, shema)
df.show(truncate=False)
![](https://img.haomeiwen.com/i14814834/9033a0d99da034ea.png)
- 解析Json
from pyspark.sql.funtions import from_json
from pyspark.sql.types import MapType,StringType
MapTypeSchema = MapType(StringType(), StringType())
df1 = df.withColumn('propMap', from_json(df.props, MapTypeSchema))
df1.show()
df1.printSchema()
![](https://img.haomeiwen.com/i14814834/fe37d1a16c59bf6d.png)
- 将解析出来的json写入新的表
df2 = df1.withColumn('hair', df1.propsMap.hair).withColumn('eye', df1.propsMap.eye)
df2.show()
![](https://img.haomeiwen.com/i14814834/278d0133a362c543.png)
- 使用StructType解析json
StructTypeSchema = StructType([StructField('hair', StringType()), StructField('eye', StringType())])
df1 = df.withColumn('propsStruct', from_json(df.props, StructTypeSchema)
df1.show(truncate = False)
df1.printSchema()
df2 = df1.withColumn('hair', df1.propsStruct.hair).withColumn('eye', df1.propsStruct.eye)
df2.show()
- 转为json字符串
df1 = df.withColumn('propJsonString', to_json(df.properties))
![](https://img.haomeiwen.com/i14814834/c89719ec0cbaf67e.png)
- 直接获取json里值
from pyspark.sql.funtions import jons_tuple
df.select('name', json_tuple(df.props, 'eye', 'skin').alias('eye', 'skin')).show()
![](https://img.haomeiwen.com/i14814834/d10b2391960e7941.png)
- 根据属性的键从 JSON 数据中提取值get_json_object
df1 = df.select('name', get_json_object('props', '$.gender').alias('gender'))
df1.show()
28.时间转换
- 获取当前时间
df1 = df.withColumn('currentDate', current_data())
- 时间日期转换
df2 = df1.withColumn('currentData', data_format(df1.currentDate, 'yyyy-MM-dd'))
df2.show()
df2 = df1.withColumn('currentData', data_format(df1.currentDate, 'yyyy-MM-dd'))
29. 时间加减
from pyspark.sql.functions import datediff
spark.createDataFrame(['2023-01-13','2023-03-12')], ['d1', 'd2'])
#时间相减
df.withColumn('datediff', datediff(df.d2, df.d1)).show()
#月份相减
df.withColumn('monthsBetween', monts_between(df.d2, df.d1)).show()
#增加月份
df.withColumn('addmonths', add_months(df.d2, 3)).show()
#减少月份
df.withColumn('submonths', add_months(df.d2, -3)).show()
网友评论