美文网首页
pySpark and Databricks

pySpark and Databricks

作者: 山猪打不过家猪 | 来源:发表于2023-12-21 11:38 被阅读0次

1. 挂载blob

mount blob to DBFS in Azure


image.png
  1. mount
dbutils.fs.mount(source = 'wasbs://input@dl108lg.blob.core.windows.net',
                          mount_point= '/mnt/blob108_input',
                          extra_configs = {'fs.azure.account.key.dl108lg.blob.core.windows.net':'fuDFNS1ziD9Lw4aeH/N6gw7+4'}
)
  1. 查看文件,必须给出具体路径
dbutils.fs.ls('/mnt/blob108_input')
  1. 查看所有的mount点
dbutils.fs.mounts()

2. 读取csv文件,并查看文件

  • 读取blob的文件
df_sales  = spark.read.csv('/mnt/blob108_input/Sales.csv', header=True)
  • 查看文件
display(df_sales)
df_sales.show()
df_sales.printSchema()

3.写入文件

  • 写成csv
df = df_sales.write.csv('/mnt/output/sales_db',header = True,mode='overwrite')
  • 写成Parquet
df = df_sales.write.parquet('/mnt/output/sales_db',mode='overwrite')

4. pyspark里的数据类型

image.png

5. 定义数据类型

from pyspark.sql.types import StructType,StructField,IntegerType,StringType,DecimalType,DataType,FloatType

data = [(1,"Jack",30,4500.0),(2,"Ray",12,6000.0)]

schema = StructType([
            StructField('Id',IntegerType(),True),
            StructField('Name',StringType(),True),
            StructField('Age',IntegerType(),True),
            StructField('Salary',FloatType(),True),
])

df = spark.createDataFrame(data,schema)
display(df)
  • 如果是给已经读取的DataFrame添加结构
df = spark.read.csv('/mnt/input/Sales.csv',header=True,schema = schema)

6. 读取json

1.读取无嵌套的Json

#单行
df_sl = spark.read.json('/mnt/input/sales.json',singleLine=True)
#多行
df_ml = spark.read.json('/mnt/input/sales.json', multiLine=True)

  1. 读取复杂json,需要先定义schema
AddressSchema = StructType([
                               StructField('City',StringType(),False),
                               StructField('State',StringType(),False)
])

CustSchema = StructType([
                               StructField('name',StringType(),False),
                               StructField('Age',StringType(),False),
                               StructField('Address',AddressSchema)
])

df_nsl = spark.read.json('/mnt/input/sales_nsl.json',singleLine = True,schema = CustSchema)

7. 逻辑运算

1. filter按照条件过滤

from pyspark.sql.functions import *

#重命名列
df = df.withColumnRenamed('Item Name', 'ItemName')
df1 = df.filter(df.ItemName == 'Total income')
#另外一种写法
df1 = df.filter(col('ItemName') == 'Total income')
display(df1)

2. 使用like()模糊查找字符串

df1 = df.filter(df.ItemName.like(%Total%))

3. and 和or

df1 =  df.filter(df.ItemName.like(%Total%) & (df.Qty == 5) | (df.Id <100))

4. startwith()和endwith()

df1 = df.filter(df.ItemName.startwith('Total'))
df2 = df.filter(df.ItemName.endwith('income'))

5. isin() 和 not ~

df1 = df.filter(df.ItemName.isin('Total income', 'Total ex')
df2 = ~ df.filter(df.ItemName.isin('Total income', 'Total ex')

6. select()选择需要的列

df1 = df.select('SOID', 'SODate', 'Qty')

7. alias()重命名列

df.select('*', (df.Qty * df.Value).alias('income'))

8. when 相当于sql的case when

df1 = df.select('*', when(df.ItemName == 'Total income', df.Qty +100).
                            when(df.ItemName == 'Sales income', df.Qty +200).
                            when(df.ItemName == 'Insert income', df.Qty +300).
                            otherwise(500).alias('QtyNew')
                       )

9. isNull() 和isNotNull()

df.filter(df.ItemName.isNull())
df.filter(df.ItemName.isNotNull())

  1. fill()和fillna()
#给ItemName一列填充
df.fillna('NA',['ItemName'])

11. groupBy()

df1 = df.groupBy('ItemName').count()
df2 = df.groupBy('ItemName').max('Qty')
df3 = df.groupBy('ItemName').sum('Qty','Value')

  • 多列聚合计算的时候需使用agg
df1 = df.groupBy('ItemCode','ItemName').agg(sum('Qty'),avg('value'))

12.count()

  1. count()
df.count()
  1. count with groupBy
df.groupBy('ItemName').count()
  1. distinct().count()
df.distinct().count()
  1. countDIstinct()
from pyspark.sql.functions import countDistinct
#单独一列
df1 = df.select(countDistinct('ItemName'))
#多列
df2 = df.select(countDistinct('ItemName','Id'))

13. orderBy()

  • 正序
df1 = df.sort('Qty')
df1 = df.orderBy('Qty')
- 倒叙
df1 = df.orderBy(df.Qty.desc())
df2 = df.orderBy(df.Qty.desc(),df.value.asc())

14. distinct()


df_no_duplicates = df.dropDuplicates(["id", "value"])

15. join()

类似于sql的join.,表1.join(表2,链接条件,链接方式)


相关文章

网友评论

      本文标题:pySpark and Databricks

      本文链接:https://www.haomeiwen.com/subject/npmxndtx.html