0.重点
- 上传本地文件到dfs
- 创建schema 读取csv文件
from pyspark.sql.types import StructType,StructField,IntegerType,StringType,DecimalType,DateType,FloatType
schema = StructType([
StructField('product_id',IntegerType(),True),
StructField('product_name',StringType(),True),
StructField('price',FloatType(),True),
])
- pyspark的连表和分组,
表1.join(表2,字段,方式)
- databricks的数据显示
1. 项目需求
![](https://img.haomeiwen.com/i14814834/ea986d6fa2e74c8b.png)
![](https://img.haomeiwen.com/i14814834/170436fdb8d8433b.png)
2. 项目前准备
上传2个table到databricks 文件系统
![](https://img.haomeiwen.com/i14814834/7b5dcc8046030df8.png)
3. Pyspark
3.1 创建dataframe
- 创建结构,并读取文件
from pyspark.sql.types import StructType,StructField,IntegerType,StringType,DecimalType,DateType,FloatType
schema = StructType([
StructField('product_id',IntegerType(),True),
StructField('customer_id',StringType(),True),
StructField('order_data',DateType(),True),
StructField('location',StringType(),True),
StructField('source_order',StringType(),True)
])
df = spark.read.csv('/FileStore/tables/sales_csv.txt',header=True,schema = schema)
display(df)
3.2 创建年月季的列
from pyspark.sql.functions import year,month,quarter
df = df.withColumn('order_year',year(df.order_date))
df = df.withColumn('order_month',month(df.order_date))
df = df.withColumn('order_quarter',quarter(df.order_date))
display(df)
3.3 创建另一个menu表
from pyspark.sql.types import StructType,StructField,IntegerType,StringType,DecimalType,DateType,FloatType
schema = StructType([
StructField('product_id',IntegerType(),True),
StructField('product_name',StringType(),True),
StructField('price',FloatType(),True),
])
df_menu = spark.read.csv('/FileStore/tables/menu_csv.txt',header=True,schema = schema)
display(df_menu)
3.4 根据KPI的需求,处理数据
- Total Amount spent by each custormer
from pyspark.sql.functions import col, sum
total_amount_spent = (df.join(df_menu,'product_id').groupBy('customer_id').sum('price').orderBy('customer_id'))
display(total_amount_spent)
- Total amount spent by each food category
total_amount_food_category = (df.join(df_menu,'product_id').groupBy('product_name').sum('price'))
display(total_amount_food_category)
- Year Sales
year_sales = df.join(df_menu,'product_id').groupBy('order_year').sum('price')
display(year_sales)
- Quarterly Sales
quarter_sales = df.join(df_menu,'product_id').groupBy('order_quarter').sum('price').orderBy('order_quarter')
display(quarter_sales)
- Total number of order by each category
from pyspark.sql.functions import count,desc
times_purchased = df.join(df_menu,'product_id').groupBy('product_name').agg(count('product_id').alias('product_count')).orderBy(desc('product_count'))
display(times_purchased)
![](https://img.haomeiwen.com/i14814834/3c1815b0a8e89204.png)
- Top 5 ordered items
Top_5 = df.join(df_menu,'product_id').groupBy('product_name').agg(count('product_id').alias('product_count')).orderBy(desc('product_count')).limit(5).drop('product_count')
display(Top_5)
![](https://img.haomeiwen.com/i14814834/7d580972105e1757.png)
-
Frequency of customer visited
-
Total sales by each country
网友评论