美文网首页Python新世界
Python技术栈与Spark交叉数据分析双向整合技术实战!

Python技术栈与Spark交叉数据分析双向整合技术实战!

作者: 919b0c54458f | 来源:发表于2018-12-30 13:56 被阅读2次

    版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。Q群:960410445 即可获取数十套PDF!

    Python Spark DataFrame 基础

    df = spark.read.parquet('/sql/users.parquet')

    df.show()

    +------+--------------+----------------+

    | name|favorite_color|favorite_numbers|

    +------+--------------+----------------+

    |Alyssa| null| [3, 9, 15, 20]|

    | Ben| red| []|

    +------+--------------+----------------+

    复制代码

    Python Spark DataFrame 聚合统计

    CustomerID,Genre,Age,Annual Income (k$),Spending Score (1-100)

    0001,Male,19,15,39

    0002,Male,21,15,81

    0003,Female,20,16,6

    0004,Female,23,16,77

    0005,Female,31,17,40

    0006,Female,22,17,76

    df = spark.read.csv('/sql/customers.csv',header=True)

    df.printSchema()

    df.show()

    root

    |-- CustomerID: string (nullable = true)

    |-- Genre: string (nullable = true)

    |-- Age: string (nullable = true)

    |-- Annual Income (k$): string (nullable = true)

    |-- Spending Score (1-100): string (nullable = true)

    +----------+------+---+------------------+----------------------+

    |CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)|

    +----------+------+---+------------------+----------------------+

    | 0001| Male| 19| 15| 39|

    | 0002| Male| 21| 15| 81|

    | 0003|Female| 20| 16| 6|

    | 0004|Female| 23| 16| 77|

    | 0005|Female| 31| 17| 40|

    | 0006|Female| 22| 17| 76|

    | 0007|Female| 35| 18| 6|

    | 0008|Female| 23| 18| 94|

    | 0009| Male| 64| 19| 3|

    | 0010|Female| 30| 19| 72|

    | 0011| Male| 67| 19| 14|

    | 0012|Female| 35| 19| 99|

    | 0013|Female| 58| 20| 15|

    | 0014|Female| 24| 20| 77|

    | 0015| Male| 37| 20| 13|

    | 0016| Male| 22| 20| 79|

    | 0017|Female| 35| 21| 35|

    | 0018| Male| 20| 21| 66|

    | 0019| Male| 52| 23| 29|

    | 0020|Female| 35| 23| 98|

    +----------+------+---+------------------+----------------------+

    df.agg({"Age": "max","Annual Income (k$)":"mean","Spending Score (1-100)":"mean"}).show()

    +---------------------------+-----------------------+--------+

    |avg(Spending Score (1-100))|avg(Annual Income (k$))|max(Age)|

    +---------------------------+-----------------------+--------+

    | 50.2| 60.56| 70|

    +---------------------------+-----------------------+--------+

    复制代码

    alias(alias)为DataFrame定义一个别名,稍后再函数中就可以利用这个别名来做相关的运 算,例如说自关联Join:

    df1 = df.alias('cus1')

    type(df1)

    df2 = df.alias('cus2')

    df3 = df1.join(df2,col('cus1.CustomerId')==col('cus2.CustomerId'),'inner')

    df3.count()

    200

    +----------+------+---+------------------+----------------------+----------+------+---+------------------+----------------------+

    |CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)|CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)|

    +----------+------+---+------------------+----------------------+----------+------+---+------------------+----------------------+

    | 0001| Male| 19| 15| 39| 0001| Male| 19| 15| 39|

    | 0002| Male| 21| 15| 81| 0002| Male| 21| 15| 81|

    | 0003|Female| 20| 16| 6| 0003|Female| 20| 16| 6|

    | 0004|Female| 23| 16| 77| 0004|Female| 23| 16| 77|

    | 0005|Female| 31| 17| 40| 0005|Female| 31| 17| 40|

    | 0006|Female| 22| 17| 76| 0006|Female| 22| 17| 76|

    | 0007|Female| 35| 18| 6| 0007|Female| 35| 18| 6|

    | 0008|Female| 23| 18| 94| 0008|Female| 23| 18| 94|

    | 0009| Male| 64| 19| 3| 0009| Male| 64| 19| 3|

    | 0010|Female| 30| 19| 72| 0010|Female| 30| 19| 72|

    +----------+------+---+------------------+----------------------+----------+------+---+------------------+----------------------+

    only showing top 10 rows

    复制代码

    cache(),将DataFrame缓存到StorageLevel对应的缓存级别中,默认是 MEMORY_AND_DISK

    df = spark.read.csv('/sql/customers.csv',header=True)

    a = df.cache()

    a.show()

    +----------+------+---+------------------+----------------------+

    |CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)|

    +----------+------+---+------------------+----------------------+

    | 0001| Male| 19| 15| 39|

    | 0002| Male| 21| 15| 81|

    | 0003|Female| 20| 16| 6|

    | 0004|Female| 23| 16| 77|

    | 0005|Female| 31| 17| 40|

    | 0006|Female| 22| 17| 76|

    | 0007|Female| 35| 18| 6|

    | 0008|Female| 23| 18| 94|

    | 0009| Male| 64| 19| 3|

    | 0010|Female| 30| 19| 72|

    | 0011| Male| 67| 19| 14|

    | 0012|Female| 35| 19| 99|

    复制代码

    checkpoint(eager=True) 对DataFrame设置断点,这个方法是Spark2.1引入的方法,这个方法的调用会斩断在这个 DataFrame上的逻辑执行计划,将前后的依赖关系持久化到checkpoint文件中去。

    sc

    sc.setCheckpointDir('/datas/checkpoint')

    a.checkpoint()

    a.show()

    复制代码

    coalesce(numPartitions) 重分区算法,传入的参数是DataFrame的分区数量。

    注意通过read方法读取文件,创建的DataFrame默认的分区数为文件的个数,即一个文件对

    应一个分区,在分区数少于coalesce指定的分区数的时候,调用coalesce是不起作用的

    df = spark.read.csv('/sql/customers.csv',header=True)

    df.rdd.getNumPartitions()

    1

    spark.read.csv('/sql/customers.csv',header=True).coalesce(3).rdd.getNumPartitions()

    1

    df = spark.range(0,20,2,3)

    df.rdd.getNumPartitions()

    df.coalesce(2).rdd.getNumPartitions()

    2

    复制代码

    repartition(numPartitions, *cols)这个方法和coalesce(numPartitions) 方法一样,都是 对DataFrame进行重新的分区,但是repartition这个方法会使用hash算法,在整个集群中进 行shuffle,效率较低。repartition方法不仅可以指定分区数,还可以指定按照哪些列来做分 区。

    df = spark.read.csv('/sql/customers.csv',header=True)

    df.rdd.getNumPartitions()

    1

    df2 = df.repartition(3)

    df2.rdd.getNumPartitions()

    3

    df2.columns

    df3 = df2.repartition(6,'Genre')

    df3.show(20)

    +----------+------+---+------------------+----------------------+

    |CustomerID| Genre|Age|Annual Income (k$)|Spending Score (1-100)|

    +----------+------+---+------------------+----------------------+

    | 0003|Female| 20| 16| 6|

    | 0004|Female| 23| 16| 77|

    | 0005|Female| 31| 17| 40|

    | 0006|Female| 22| 17| 76|

    | 0007|Female| 35| 18| 6|

    | 0008|Female| 23| 18| 94|

    | 0010|Female| 30| 19| 72|

    | 0012|Female| 35| 19| 99|

    | 0013|Female| 58| 20| 15|

    df3.rdd.getNumPartitions()

    6

    复制代码

    colRegex(colName)用正则表达式的方式返回我们想要的列。

    df = spark.createDataFrame([("a", 1), ("b", 2), ("c", 3)], ["Col1", "a"])

    df.select(df.colRegex("`(Col1)?+.+`")).show()

    +---+

    | a|

    +---+

    | 1|

    | 2|

    | 3|

    +---+

    复制代码

    collect(),返回DataFrame中的所有数据,注意数据量大了容易造成Driver节点内存溢 出!

    df = spark.createDataFrame([("a", 1), ("b", 2), ("c", 3)], ["Col1", "a"])

    df.collect()

    [Row(Col1='a', a=1), Row(Col1='b', a=2), Row(Col1='c', a=3)]

    复制代码

    columns,以列表的形式返回DataFrame的所有列名

    df = spark.read.csv('/sql/customers.csv',header=True)

    df.columns

    df = spark.read.csv('/sql/customers.csv',header=True)

    df.columns

    ['CustomerID', 'Genre', 'Age', 'Annual Income (k$)', 'Spending Score (1-100)']

    复制代码

    SparkSQL DataFrame 转换为 PandasDataFrame

    df = spark.read.csv('/sql/customers.csv',header=True)

    pdf = df.toPandas()

    pdf.info()

    RangeIndex: 200 entries, 0 to 199

    Data columns (total 5 columns):

    CustomerID 200 non-null object

    Genre 200 non-null object

    Age 200 non-null object

    Annual Income (k$) 200 non-null object

    Spending Score (1-100) 200 non-null object

    dtypes: object(5)

    memory usage: 7.9+ KB

    pdf['Age'] = pdf['Age'].astype('int')

    pdf["Annual Income (k$)"]=pdf["Annual Income (k$)"].astype('int')

    pdf["Spending Score (1-100)"]=pdf["Spending Score (1-100)"].astype('int')

    pdf.info()

    RangeIndex: 200 entries, 0 to 199

    Data columns (total 5 columns):

    CustomerID 200 non-null object

    Genre 200 non-null object

    Age 200 non-null int64

    Annual Income (k$) 200 non-null int64

    Spending Score (1-100) 200 non-null int64

    dtypes: int64(3), object(2)

    memory usage: 7.9+ KB

    复制代码

    PandasDataFrame 转换为 SparkSQL DataFrame

    df1 = spark.createDataFrame(pdf)

    df1.corr("Age","Annual Income (k$)")

    df1.corr("Spending Score (1-100)","Annual Income (k$)")

    0.009902848094037492

    复制代码

    count()返回DataFrame中Row的数量

    df = spark.read.csv('/sql/customers.csv',header=True)

    df.count()

    200

    复制代码

    createGlobalTempView(name)使用DataFrame创建一个全局的临时表,其生命周期 和启动的app的周期一致,即启动的spark应用存在则这个临时的表就一直能访问。直到 sparkcontext的stop方法的调用退出应用为止。创建的临时表保存在global_temp这个库 中

    相关文章

      网友评论

        本文标题:Python技术栈与Spark交叉数据分析双向整合技术实战!

        本文链接:https://www.haomeiwen.com/subject/mujvlqtx.html