美文网首页
PySpark dataframe入门笔记

PySpark dataframe入门笔记

作者: 三楼绝对是二货 | 来源:发表于2019-05-17 17:20 被阅读0次

    https://www.analyticsvidhya.com/blog/2016/10/spark-dataframe-and-operations/

    背景

    大数据量的取数、特征处理、数据清洗要占用大量的时间,之前的工作流程是:sql查数、传到服务器、再进行处理、最后存到hive里,用docker或者是GPU集群消费训练模型。

    数据传输花费了大量时间,并且多个脚本跑起来也有些麻烦。
    显然直接用spark集群做数据提取、预处理是更好的选择。

    学习Scala的语法有一些成本,Spark提供Python API,所以用PySpark快速上手是一个好选择。

    基础知识

    RDD,Spark工作原理,PySpark API

    快速开始

    1. 构建session

    from pyspark.sql import SparkSession
    spark = SparkSession \
        .builder \
        .enableHiveSupport()\ 
        .getOrCreate()
    

    2. 查询SQL,返回值data的类型为一个dataframe

    begindate = '20190501'
    enddate = '20190514'
    sql1 = '''
          select a.id,
           dist(lat, lng, user_start_lng, user_start_lat) as dit,
          timestamp
      from (
            select id,
                  dt, 
                  user_start_lng, user_start_lat
              from test.test_detail
             where dt BETWEEN {0} and {1}
          ) a
      join (
            select id,
                  dt,
                  lng,
                  lat,
                  timestamp
              from test.test_trail
             where dt BETWEEN {0} and {1} and status in ('a', 'b')
          ) b
        on a.dt = b.dt
      and a.id = b.id
            '''.format(begindate, enddate)
    data = spark.sql(sql1)
    

    2.1 注册自定义函数UDF
    在查询的时候可以自定义一些简单的函数,并注册。比如上面SQL中的dist()。

    import math
    def dist(x, y, a, b):
        return math.sqrt((x - a) ** 2 + (y - b) ** 2)
    spark.udf.register("dist", dist)
    

    获得dataframe之后,我们就可以对它进行更灵活的处理

    DataFrame简介

    大部分内容翻译自:https://www.analyticsvidhya.com/blog/2016/10/spark-dataframe-and-operations/

    dataframe是代名称的列的集合(类似excel中的数据,每列有名称、类型)。同时它还有RDD的特性。

    1. 不变性:一旦创建就不能改变
    2. 惰性求值:只有在action时才会计算
    3. 分布式

    dataframe的优点

    • 对数据量比较大结构化、半结构化的数据比较友好
    • 有数据的schema,方便Spark做优化
    • 可以处理PB级别的数据
    • 支持的数据源丰富

    DataFrame操作

    一些操作和pandas DataFrame很像
    看数据结构

    train.printSchema()
    Output:
    root
     |-- User_ID: integer (nullable = true)
     |-- Product_ID: string (nullable = true)
     |-- Gender: string (nullable = true)
     |-- Age: string (nullable = true)
     |-- Occupation: integer (nullable = true)
     |-- City_Category: string (nullable = true)
     |-- Stay_In_Current_City_Years: string (nullable = true)
     |-- Marital_Status: integer (nullable = true)
     |-- Product_Category_1: integer (nullable = true)
     |-- Product_Category_2: integer (nullable = true)
     |-- Product_Category_3: integer (nullable = true)
     |-- Purchase: integer (nullable = true)
    

    看前几行

    train.show(2,truncate= True)
    Output:
    +-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
    |User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|
    +-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
    |1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|              null|              null|    8370|
    |1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|
    +-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+
    only showing top 2 rows
    

    数总行数

    train.count(),test.count()
    Output:
    (550068, 233599)
    

    选择特定列

    train.select('User_ID','Age').show(5)
    Output:
    +-------+----+
    |User_ID| Age|
    +-------+----+
    |1000001|0-17|
    |1000001|0-17|
    |1000001|0-17|
    |1000001|0-17|
    |1000002| 55+|
    +-------+----+
    

    添加列:withColumn()
    withColumn需要2个参数

    • 需要添加的列名
    • 创建列的表达式
    train.withColumn('Purchase_new', train.Purchase /2.0).select('Purchase','Purchase_new').show(5)
    Output:
    +--------+------------+
    |Purchase|Purchase_new|
    +--------+------------+
    |    8370|      4185.0|
    |   15200|      7600.0|
    |    1422|       711.0|
    |    1057|       528.5|
    |    7969|      3984.5|
    +--------+------------+
    only showing top 5 rows
    

    相关文章

      网友评论

          本文标题:PySpark dataframe入门笔记

          本文链接:https://www.haomeiwen.com/subject/eecoaqtx.html