美文网首页
10.1 Pyspark(01)

10.1 Pyspark(01)

作者: 山猪打不过家猪 | 来源:发表于2023-09-29 18:11 被阅读0次

pyspark视频合集

0.创建dataframe

data=[(1,'jay'),(2,'tom')]
schema=['id', 'name']

df = spark.createDataFrame(data=data,schema=schema)
df.show()

1. 读取/写入csv

  • 读取单个csv
df = spark.read.csv(path='dbfs://Filestore/data/employee1.csv',head= True)
df.display()
df.printSchema()
  • 读取多个csv
df = spark.read.csv(path=['dbfs://Filestore/data/employee1.csv','dbfs://Filestore/data/employee2.csv'],head= True)
  • 读取文件夹下所有的csv
df = spark.read.csv(path='dbfs://Filestore/data/',header = True)
  • 读取所有文件并且添加指定类型
from pyspark.sql.types import *

schema = StructType().add(field = 'id', data_type=IntegerType()).
add(field = 'name', data_type=StringType()).
add(field = 'gender', data_type=StringType()).
add(field = 'salary', data_type=StringType())
df = spark.read.csv(path='dbfs://Filestore/data/',schema = schema,header = True)
df.printSchema()
  • 写入数据
df.write.csv(path='dbfs://Filestore/data/',header = True,mode='ignore')
  • 注意:overwrite:将其覆盖并写入新的数据。这会删除先前存在的数据;append:如果指定的输出路径已经存在,将新数据附加到现有数据的末尾;ignore:如果指定的输出路径已经存在,不会进行任何操作。如果输出路径不存在,将创建新的输出文件;error:如果指定的输出路径已经存在,会引发异常并阻止写入。

2.读取/写入json到dataframe

  • 通常一个json文件有一个json对象,但是如果有多个那么需要将multiline设置为true
df =spark.read.json(path='dbfs://Filestore/data/emps.json' multiline=True)

3.读取/写parquet

  • 读取parquet
df= spark.read.parquet(path='dbfs://Filestore/data/demo.parquet')
df= spark.write.parquet(path='dbfs://Filestore/data/demo2.parquet')

4.显示数据

  • 显示数据,其中truncate表示是否截断数据
df.show(truncate =False)

5.withcolumn对列进行操作

添加新列

df = df.withcolumn('age_plus_10',df['age']+10)

替换之前的列

df = df.withcolumn('age',df['age']**2)

删除列

df = df.drop('age_plus_10')

类型转换

将整列转换类型

# 将 "count" 列的数据类型从字符串转换为整数
df = df.withColumn("count", df["count"].cast("integer"))

# 将 "amount" 列的数据类型从浮点数转换为整数
df = df.withColumn("amount", df["amount"].cast("integer"))

重命名列名

df.withColumnRenamed("gender","sex")

条件处理

from pyspark.sql.functions import when

df = df.withColumn("age_group", when(df["age"] < 30, "Young").otherwise("Old"))

日期处理

from pyspark.sql.functions import datediff, year, month
df = df.withColumn("days_since_birth", datediff(current_date(), df["birth_date"]))

窗口函数

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# 定义窗口规范
window_spec = Window.partitionBy("department").orderBy("salary")

# 使用窗口函数为每个部门计算排名并创建新列
df = df.withColumn("rank_in_department", row_number().over(window_spec))

使用UDF用户自定义函数

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

# 定义一个 Python 函数
def double_age(age):
    return age * 2

# 注册 UDF
double_age_udf = udf(double_age, IntegerType())

# 使用 UDF 创建一个新列
df = df.withColumn("double_age", double_age_udf(df["age"]))

6. 定义结构

  • 复杂结构的字符串 结构
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

data = [(1,'Maheer','Shark'),3000),(2,('Wafa','Shaik'),4000]

structName=StructType([StructField(‘firstName',StringType()), StructField('lastName',StringType())])

schema = StructType([StructField(name='id', datatype=IntegerType()),StructField(name='name', dataType=structName,StructField(name='salary',dataType=IntegerType())])

df=spark.createDataframe(data,schema)
display(df)
df.printSchema()
  • 复杂的数组结构
data = [('abc',[1,2]),('mon',[4,5]),('xyz',[6,7])]

schema = StuctType(StructField('id',StringType()),StructField('number',ArrayType(IntegerType()))])

df = spark.createDataFrame(data,schema)
df.show()
df.printSchema()
display(df)
  • 查看数组的值
df.withcolumn('firstNumber',df.numbers[0]).show()

7.explode, split,array

  • explode:用于展开数组类型的列,将一个数组中的元素拆分成多行,每行包含一个元素。
from pyspark.sql.functions import explode
# 创建一个包含数组的 DataFrame
data = [(1, [1, 2, 3]), (2, [4, 5]), (3, [6])]
df = spark.createDataFrame(data, ["id", "values"])

# 使用 explode 将数组列 "values" 拆分为多行
exploded_df = df.select("id", explode("values").alias("value"))
exploded_df.show()
  • split:用于将字符串列拆分为数组,根据指定的分隔符将字符串拆分为多个元素。
from pyspark.sql.functions import split

# 创建一个包含字符串的 DataFrame
data = [(1, "apple,banana,cherry"), (2, "grape,kiwi"), (3, "orange")]
df = spark.createDataFrame(data, ["id", "fruits"])

# 使用 split 将字符串列 "fruits" 拆分为数组
df = df.withColumn("fruit_list", split(df["fruits"], ","))

df.show()

8. 创建定义字典结构的DataFrame

  • 使用MapType来定义字典结构
from pyspark.sql.types import StructType, StructField, StringType, MapType

data =[('maher',{'hair':'black','eye':'brown'}),('wafa',{'hair':'black','eye':'blue'})]
schema = StructType([StructField('name':StringType()),StructField('properties',MapType(StringType(),StringType()))])
df = spark.createDataFrame(data,schema)
df.show(truncate = False)
df.printSchema()
display(df)
  • 访问上面字典的值
df1 = df.withColunm('hair',df.properties.getItem('hair'))
df1.show(truncate=False)
  • may_keys()获取字典的key,返回列表
df1=df.withColumn('keys',map_keys(df.properties))
df1.show(truncate=False)
image.png
  • may_values()获取字典的values,返回列表
df1=df.withColumn('keys',map_values(df.properties))
df1.show(truncate=False)
image.png

9.Row()来创建DataFrame

  • 通过行的形式创建DataFrame
from pyspark.sql import Row

row1 = Row(name='maheer', salary= 2000)
row2 = Row(name= 'wafa', salary= 30000)
data = [row1,row2]
df = spark.createDataFrame(data)
df.show()
  • 创建嵌套结构
from pyspark.sql import Row

data = [Row(name='maheer',prop=Row(hair='black',eye='blue')),Row(name='wafa',prop=Row(hair='grey',eye='black'))]

df=spark.createDataFrame(data)
df.printSchema()

10.列操作

  • 获取一列的值
from pyspark.sql.functions import col
#way1:
df.select(df.gender).show()
#way2:
df.select(df['gender'].show()
#way3:
df.select(col('gender')).show()
  • 获取嵌套结构的值
from pyspark.sql.functions import col

df.select(df.props.harir).show()
df.select(df['props.hair']).show()
df.select(col('props.hair')).show()

11.when()&otherwise()

  • 等于case when
df1 = df.select(df.id,df.name,when(df.gender='M','male').when(df.gender=='F','female').otherwise('unknown').alias(''gender'))
df1.show()
image.png

12.alias,cast

  • alias:别名
df.select(df.id.alias('emp_id'),df.name.alias('emp_name').show()
  • cast: 转换格式
df.withcolumn('id',df.id.cast('int'))

13.filter,where

  • filter条件过滤
df.filter(df.age>3).show()
df.filter(df.name.like('ga%')).show()
  • where条件过滤
df.where(df.age>3).show()
  • 除了写法不同,用法完全一样

14.去重

  • distinct:查看重复
distinct_df = df.distinct(["first_name", "last_name", "age"])
  • dropDuplicates:删除重复
df.dropDuplicates(["first_name", "last_name", "age"])

15. 排序

  • sort和orderBy功能一样
df.orderby(df.age.desc(),df.salary.asc()).show()

16. 分组groupBy

  • 类似于sql的分组
df1= df.groupBy('dep','gender').count()
df2= df.groupBy('dep').min('salary')
image.png
  • agg()分组之后传入多个运算
from pyspark.sql.funtions import count,min,max

df.groupBy('dep').agg(count('*).alias('countOfEmps'),min('salary').alias('minSal'),max('salary').alias('maxSal')).show()
image.png

17.合并相同格式DataFrame,union&unionAll

  • union:合并,不进行处理,不删除重复
newDf = df1.union(df2)
image.png
  • unionAll:合并,并且删除重复
newDf = df1.unionAll(df2)
newDf.distinct().show()

18.合并不同格式的DataFrame,unionByName()

  • unionByName:将不同格式的字段,显示为Null
newDf = df1.unionByName(df2,allowMissingColumns=True).show()
image.png

19.Select()选择列

  • 选择多列
df.select('id','name').show()
df.select(df.id,df.name).show()
df.select(df['id'],df['name']).show()

20. 内连接,左连接

  • 内连接:左右表只显示匹配到的数据
empDf.join(depDf,empDf.dep==depDf.id,'inner').show()
image.png
  • 左连接:保留左表数据,没匹配到的显示为null
empDf.join(depDf,empDf.dep==depDf.id,'left').show()
image.png

21.leftsemi,leftanti

  • leftsemi:只返回左表匹配到的数据,没有匹配到不显示
empDf.join(depDf,empDf.dep==depDf.id,'leftsemi').show()
image.png
  • leftanti:只返回左表没匹配到的数据
empDf.join(depDf,empDf.dep==depDf.id,'leftanti').show()
image.png
  • 自链接


    image.png

22. 行转列pivot

  • 直接行转列
df.groupBy('dep').pivot('gender').count().show()
image.png
  • 可以进行筛选,传入一个或者多个
df.groupBy('dep').pivot('gender',['male']).count().show()
  • 列转行
from pyspark.sql.functions import expr
unpivotDf = df.select('dep',expr("stack(2,'M',male,'F','female') as (gender,count)")).show()

23. 填充空值

  • fillna:将null值全部填充
df.fillna('unkown').show()
  • fill: 选取指定的列填充
df.na.fill('unknown',['dep','gender']).show()

24. 创建临时表

  • 创建临时表,并且使用sql查询
df.createOrReplaceTempView('employees')
%sql
select id, name from employees

25. UDF自定义

  • 和withColumn使用
@udf(returnType=IntegerTyep())
def totalPayment(a,b):
    return a+b

df.withColumn('totalPay',TotalPayment(df.salary,df.bonus).show()

26.使用partitionBy分区存储parquet

df.write.parqeut('/FileStore/employees',mode ='overwrite', partitionBy = 'dep')
image.png

27.使用MapType/StructType解析Json

  • 创建DataFrame
data = [('maheer','{"hair": "black", "eye": "brown"}')]
schema = ['id', 'props']
df = spark.createDataFrame(data, shema)
df.show(truncate=False)
image.png
  • 解析Json
from pyspark.sql.funtions import from_json
from pyspark.sql.types import MapType,StringType

MapTypeSchema = MapType(StringType(), StringType())
df1 = df.withColumn('propMap', from_json(df.props, MapTypeSchema))
df1.show()
df1.printSchema()
image.png
  • 将解析出来的json写入新的表
df2 = df1.withColumn('hair', df1.propsMap.hair).withColumn('eye', df1.propsMap.eye)
df2.show()
image.png
  • 使用StructType解析json
StructTypeSchema = StructType([StructField('hair', StringType()), StructField('eye', StringType())])
df1 = df.withColumn('propsStruct', from_json(df.props, StructTypeSchema)
df1.show(truncate = False)
df1.printSchema()

df2 = df1.withColumn('hair', df1.propsStruct.hair).withColumn('eye', df1.propsStruct.eye)
df2.show()
  • 转为json字符串
df1 = df.withColumn('propJsonString', to_json(df.properties))
image.png
  • 直接获取json里值
from pyspark.sql.funtions import jons_tuple

df.select('name', json_tuple(df.props, 'eye', 'skin').alias('eye', 'skin')).show()

image.png
  • 根据属性的键从 JSON 数据中提取值get_json_object
df1 = df.select('name', get_json_object('props', '$.gender').alias('gender'))
df1.show()

28.时间转换

  • 获取当前时间
df1  = df.withColumn('currentDate', current_data())
  • 时间日期转换
df2 = df1.withColumn('currentData', data_format(df1.currentDate, 'yyyy-MM-dd'))
df2.show()
df2 = df1.withColumn('currentData', data_format(df1.currentDate, 'yyyy-MM-dd'))

29. 时间加减

from pyspark.sql.functions import datediff

spark.createDataFrame(['2023-01-13','2023-03-12')], ['d1', 'd2'])

#时间相减
df.withColumn('datediff', datediff(df.d2, df.d1)).show()
#月份相减
df.withColumn('monthsBetween', monts_between(df.d2, df.d1)).show()
#增加月份
df.withColumn('addmonths', add_months(df.d2, 3)).show()
#减少月份
df.withColumn('submonths', add_months(df.d2, -3)).show()

相关文章

网友评论

      本文标题:10.1 Pyspark(01)

      本文链接:https://www.haomeiwen.com/subject/wvlrbdtx.html