美文网首页
P2 pyspark项目(处理产品信息)

P2 pyspark项目(处理产品信息)

作者: 山猪打不过家猪 | 来源:发表于2023-12-29 11:48 被阅读0次

0.重点

  1. 上传本地文件到dfs
  2. 创建schema 读取csv文件
from pyspark.sql.types import StructType,StructField,IntegerType,StringType,DecimalType,DateType,FloatType

schema = StructType([
            StructField('product_id',IntegerType(),True),
            StructField('product_name',StringType(),True),
            StructField('price',FloatType(),True),
])
  1. pyspark的连表和分组,表1.join(表2,字段,方式)
  2. databricks的数据显示

1. 项目需求

image.png
image.png

2. 项目前准备

上传2个table到databricks 文件系统


image.png

3. Pyspark

3.1 创建dataframe

  • 创建结构,并读取文件
from pyspark.sql.types import StructType,StructField,IntegerType,StringType,DecimalType,DateType,FloatType

schema = StructType([
            StructField('product_id',IntegerType(),True),
            StructField('customer_id',StringType(),True),
            StructField('order_data',DateType(),True),
            StructField('location',StringType(),True),
            StructField('source_order',StringType(),True)
])

df = spark.read.csv('/FileStore/tables/sales_csv.txt',header=True,schema = schema)
display(df)

3.2 创建年月季的列

from pyspark.sql.functions import year,month,quarter

df = df.withColumn('order_year',year(df.order_date))
df = df.withColumn('order_month',month(df.order_date))
df = df.withColumn('order_quarter',quarter(df.order_date))
display(df)

3.3 创建另一个menu表

from pyspark.sql.types import StructType,StructField,IntegerType,StringType,DecimalType,DateType,FloatType

schema = StructType([
            StructField('product_id',IntegerType(),True),
            StructField('product_name',StringType(),True),
            StructField('price',FloatType(),True),
])

df_menu = spark.read.csv('/FileStore/tables/menu_csv.txt',header=True,schema = schema)
display(df_menu)

3.4 根据KPI的需求,处理数据

  1. Total Amount spent by each custormer
from pyspark.sql.functions import col, sum

total_amount_spent = (df.join(df_menu,'product_id').groupBy('customer_id').sum('price').orderBy('customer_id'))
display(total_amount_spent)
  1. Total amount spent by each food category
total_amount_food_category = (df.join(df_menu,'product_id').groupBy('product_name').sum('price'))
display(total_amount_food_category)
  1. Year Sales
year_sales = df.join(df_menu,'product_id').groupBy('order_year').sum('price')
display(year_sales)
  1. Quarterly Sales
quarter_sales = df.join(df_menu,'product_id').groupBy('order_quarter').sum('price').orderBy('order_quarter')
display(quarter_sales)
  1. Total number of order by each category
from pyspark.sql.functions import count,desc

times_purchased = df.join(df_menu,'product_id').groupBy('product_name').agg(count('product_id').alias('product_count')).orderBy(desc('product_count'))
display(times_purchased)
image.png
  1. Top 5 ordered items
Top_5 = df.join(df_menu,'product_id').groupBy('product_name').agg(count('product_id').alias('product_count')).orderBy(desc('product_count')).limit(5).drop('product_count')
display(Top_5)
image.png
  1. Frequency of customer visited

  2. Total sales by each country

相关文章

  • Spark Python API Docs(part one)

    pyspark package subpackages pyspark.sql module pyspark.st...

  • pyspark空值处理

    类似sklearn中的Inputer

  • P2课程第一天复习内容

    P2课程第一天复习内容: 1. P2定义项目是个临时性组织。项目有5大特征,可以据此做项目的复杂度画像,界定项目情...

  • 02.PRINCE2 大解密 之 框架

    PRINCE2简称P2。P2=7个主题+7个流程+7个原则+5个阶段!核心,P2告诉我们:应该如何来管一个项目! ...

  • pyspark整理

    pyspark入门资料 公众号回复:pyspark (会有pyspark资料大礼包:Learning PySpar...

  • ETL 基本操作

    由于工作中处理数据的工具太多,从 Oracle 到 MySQL、Python、PySpark、Scala 等造成数...

  • PySpark初见

    PySpark PySpark 是 Spark 为 Python 开发者提供的 API。 子模块pyspark.s...

  • Jupyter配置教程

    将jupyter notebook作为pyspark的默认编辑器 安装pyspark通过拷贝pyspark包安装源...

  • PySpark Recipes A Problem-Soluti

    PySpark Recipes A Problem-Solution Approach with PySpark2...

  • Spark-pyspark

    pyspark介绍 pyspark是Spark官方提供的API接口,同时pyspark也是Spark中的一个程序。...

网友评论

      本文标题:P2 pyspark项目(处理产品信息)

      本文链接:https://www.haomeiwen.com/subject/dfbjndtx.html