1. 挂载blob
mount blob to DBFS in Azure
![](https://img.haomeiwen.com/i14814834/746a7897c8bb6382.png)
- mount
dbutils.fs.mount(source = 'wasbs://input@dl108lg.blob.core.windows.net',
mount_point= '/mnt/blob108_input',
extra_configs = {'fs.azure.account.key.dl108lg.blob.core.windows.net':'fuDFNS1ziD9Lw4aeH/N6gw7+4'}
)
- 查看文件,必须给出具体路径
dbutils.fs.ls('/mnt/blob108_input')
- 查看所有的mount点
dbutils.fs.mounts()
2. 读取csv文件,并查看文件
- 读取blob的文件
df_sales = spark.read.csv('/mnt/blob108_input/Sales.csv', header=True)
- 查看文件
display(df_sales)
df_sales.show()
df_sales.printSchema()
3.写入文件
- 写成csv
df = df_sales.write.csv('/mnt/output/sales_db',header = True,mode='overwrite')
- 写成Parquet
df = df_sales.write.parquet('/mnt/output/sales_db',mode='overwrite')
4. pyspark里的数据类型
![](https://img.haomeiwen.com/i14814834/d95f38314c1648bc.png)
5. 定义数据类型
from pyspark.sql.types import StructType,StructField,IntegerType,StringType,DecimalType,DataType,FloatType
data = [(1,"Jack",30,4500.0),(2,"Ray",12,6000.0)]
schema = StructType([
StructField('Id',IntegerType(),True),
StructField('Name',StringType(),True),
StructField('Age',IntegerType(),True),
StructField('Salary',FloatType(),True),
])
df = spark.createDataFrame(data,schema)
display(df)
- 如果是给已经读取的DataFrame添加结构
df = spark.read.csv('/mnt/input/Sales.csv',header=True,schema = schema)
6. 读取json
1.读取无嵌套的Json
#单行
df_sl = spark.read.json('/mnt/input/sales.json',singleLine=True)
#多行
df_ml = spark.read.json('/mnt/input/sales.json', multiLine=True)
- 读取复杂json,需要先定义schema
AddressSchema = StructType([
StructField('City',StringType(),False),
StructField('State',StringType(),False)
])
CustSchema = StructType([
StructField('name',StringType(),False),
StructField('Age',StringType(),False),
StructField('Address',AddressSchema)
])
df_nsl = spark.read.json('/mnt/input/sales_nsl.json',singleLine = True,schema = CustSchema)
7. 逻辑运算
1. filter按照条件过滤
from pyspark.sql.functions import *
#重命名列
df = df.withColumnRenamed('Item Name', 'ItemName')
df1 = df.filter(df.ItemName == 'Total income')
#另外一种写法
df1 = df.filter(col('ItemName') == 'Total income')
display(df1)
2. 使用like()模糊查找字符串
df1 = df.filter(df.ItemName.like(%Total%))
3. and 和or
df1 = df.filter(df.ItemName.like(%Total%) & (df.Qty == 5) | (df.Id <100))
4. startwith()和endwith()
df1 = df.filter(df.ItemName.startwith('Total'))
df2 = df.filter(df.ItemName.endwith('income'))
5. isin() 和 not ~
df1 = df.filter(df.ItemName.isin('Total income', 'Total ex')
df2 = ~ df.filter(df.ItemName.isin('Total income', 'Total ex')
6. select()选择需要的列
df1 = df.select('SOID', 'SODate', 'Qty')
7. alias()重命名列
df.select('*', (df.Qty * df.Value).alias('income'))
8. when 相当于sql的case when
df1 = df.select('*', when(df.ItemName == 'Total income', df.Qty +100).
when(df.ItemName == 'Sales income', df.Qty +200).
when(df.ItemName == 'Insert income', df.Qty +300).
otherwise(500).alias('QtyNew')
)
9. isNull() 和isNotNull()
df.filter(df.ItemName.isNull())
df.filter(df.ItemName.isNotNull())
- fill()和fillna()
#给ItemName一列填充
df.fillna('NA',['ItemName'])
11. groupBy()
df1 = df.groupBy('ItemName').count()
df2 = df.groupBy('ItemName').max('Qty')
df3 = df.groupBy('ItemName').sum('Qty','Value')
- 多列聚合计算的时候需使用
agg
df1 = df.groupBy('ItemCode','ItemName').agg(sum('Qty'),avg('value'))
12.count()
- count()
df.count()
- count with groupBy
df.groupBy('ItemName').count()
- distinct().count()
df.distinct().count()
- countDIstinct()
from pyspark.sql.functions import countDistinct
#单独一列
df1 = df.select(countDistinct('ItemName'))
#多列
df2 = df.select(countDistinct('ItemName','Id'))
13. orderBy()
- 正序
df1 = df.sort('Qty')
df1 = df.orderBy('Qty')
- 倒叙
df1 = df.orderBy(df.Qty.desc())
df2 = df.orderBy(df.Qty.desc(),df.value.asc())
14. distinct()
df_no_duplicates = df.dropDuplicates(["id", "value"])
15. join()
类似于sql的join.,表1.join(表2,链接条件,链接方式)
网友评论