美文网首页
Pyspark Join操作

Pyspark Join操作

作者: iE简 | 来源:发表于2021-02-25 17:25 被阅读0次

    Spark Join 操作

    [TOC]

    官方文档:https://spark.apache.org/docs/2.4.7/api/python/pyspark.sql.html?highlight=join#pyspark.sql.DataFrame.join

    从文档中可以看到关于join的介绍:join(other, on=None, how=None)

    从函数中可以看到有三个参数:

    • other:需要合并的DataFrame格式的数据。官方写的是Right side of the join,翻译过来就是放在右侧的DataFrame数据。
    • on:用来执行对等连接的列名,可以是字符串、字符串列表或者表达式。如果是字符串或者字符串列表,那么两边的数据都得存在该列。spark的横向合并不向pandas那么简单,直接横向拼接。spark合并必须有对应的列作为参照,列值形同的就合并,不存在的就填充空值。
    • how:合并方式。默认的是inner,其他的还有cross,outer,full,full_outer,left,left_outer,right,right_outer,left_semi,left_anti。

    一.创建数据

    首先创建如下两组数据:

    score.png

    我将在这两组数据基础上做些测试。

    创建代码:

    # -*- coding: utf-8 -*-
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
        .builder \
        .master("local") \
        .appName("create df") \
        .getOrCreate()
    
    # 第一组数据,包含年龄、体重、身高信息
    body_info = [["Bom", 20, 97.6, 165],
                 ["Alice", 23, 90.0, 160],
                 ["kuke", 33, 190.0, 170],
                 ["jike", 19, 120.0, 170],
                 ["Joe", 24, 89.0, 162]]
    
    body_df = spark.createDataFrame(body_info, ["name", "age", "weight", "height"])
    body_df.show()
    
    # 第二组数据,包含一些成绩信息,语数外
    score_info = [["Bom", 88, 97, 90],
                 ["Alice", 85, 99, 92],
                 ["kuke", 77, 82, 80],
                 ["jike", 65, 58, 30],
                 ["Joe", 90, 100, 92]]
    score_df = spark.createDataFrame(score_info, ["name", "Chinese", "Math", "English"])
    score_df.show()
    
    
    # 输出
    +-----+---+------+------+
    | name|age|weight|height|
    +-----+---+------+------+
    |  Bom| 20|  97.6|   165|
    |Alice| 23|  90.0|   160|
    | kuke| 33| 190.0|   170|
    | jike| 19| 120.0|   170|
    |  Joe| 24|  89.0|   162|
    +-----+---+------+------+
    
    +-----+-------+----+-------+
    | name|Chinese|Math|English|
    +-----+-------+----+-------+
    |  Bom|     88|  97|     90|
    |Alice|     85|  99|     92|
    | kuke|     77|  82|     80|
    | jike|     65|  58|     30|
    |  Joe|     90| 100|     92|
    +-----+-------+----+-------+
    

    二.合并操作

    这节的操作只针对第三个参数how做实验,第二个参数on都设为name。

    1.inner

    情况一:第一组数据行数 = 第二组数据行数

    合并代码:

    merge_df = body_df.join(score_df, on="name", how="inner")
    merge_df.show()
    

    输出

    +-----+---+------+------+-------+----+-------+
    | name|age|weight|height|Chinese|Math|English|
    +-----+---+------+------+-------+----+-------+
    | jike| 19| 120.0|   170|     65|  58|     30|
    |  Bom| 20|  97.6|   165|     88|  97|     90|
    |  Joe| 24|  89.0|   162|     90| 100|     92|
    |Alice| 23|  90.0|   160|     85|  99|     92|
    | kuke| 33| 190.0|   170|     77|  82|     80|
    +-----+---+------+------+-------+----+-------+
    

    结果显而易见,是把第二组数据除name列拼到了第一组的右边。

    情况二:第一组数据行数 > 第二组数据行数

    删掉第二组数据的2和4行,变为:

    +-----+-------+----+-------+
    | name|Chinese|Math|English|
    +-----+-------+----+-------+
    |  Bom|     88|  97|     90|
    | kuke|     77|  82|     80|
    |  Joe|     90| 100|     92|
    +-----+-------+----+-------+
    

    合并代码:

    merge_df = body_df.join(score_df, on="name", how="inner")
    merge_df.show()
    

    输出:

    +----+---+------+------+-------+----+-------+
    |name|age|weight|height|Chinese|Math|English|
    +----+---+------+------+-------+----+-------+
    | Bom| 20|  97.6|   165|     88|  97|     90|
    | Joe| 24|  89.0|   162|     90| 100|     92|
    |kuke| 33| 190.0|   170|     77|  82|     80|
    +----+---+------+------+-------+----+-------+
    

    从输出结果可以看到,inner操作先根据两组的name列求交集,再合并数据。

    情况三:第一组数据行数 < 第二组数据行数

    删掉第一组数据的2和4行,变为:

    +-----+---+------+------+
    | name|age|weight|height|
    +-----+---+------+------+
    |  Bom| 20|  97.6|   165|
    | kuke| 33| 190.0|   170|
    |  Joe| 24|  89.0|   162|
    +-----+---+------+------+
    

    合并代码:

    merge_df = body_df.join(score_df, on="name", how="inner")
    merge_df.show()
    

    输出:

    +----+---+------+------+-------+----+-------+
    |name|age|weight|height|Chinese|Math|English|
    +----+---+------+------+-------+----+-------+
    | Bom| 20|  97.6|   165|     88|  97|     90|
    | Joe| 24|  89.0|   162|     90| 100|     92|
    |kuke| 33| 190.0|   170|     77|  82|     80|
    +----+---+------+------+-------+----+-------+
    

    结果和情况二一样,先求交集,在合并。

    2.cross

    情况一:第一组数据行数 = 第二组数据行数

    合并代码:

    merge_df = body_df.join(score_df, on="name", how="cross")
    merge_df.show()
    

    在spark2.4.7中用上边的代码运行会毫不客气的报错:

    pyspark.sql.utils.IllegalArgumentException: 'requirement failed: Unsupported using join type Cross'
    

    之后通过Google,搜索到不是这么用:

    改成如下:

    merge_df = body_df.crossJoin(score_df)
    merge_df.show()
    

    输出:

    +-----+---+------+------+-----+-------+----+-------+
    | name|age|weight|height| name|Chinese|Math|English|
    +-----+---+------+------+-----+-------+----+-------+
    |  Bom| 20|  97.6|   165|  Bom|     88|  97|     90|
    |  Bom| 20|  97.6|   165|Alice|     85|  99|     92|
    |  Bom| 20|  97.6|   165| kuke|     77|  82|     80|
    |  Bom| 20|  97.6|   165| jike|     65|  58|     30|
    |  Bom| 20|  97.6|   165|  Joe|     90| 100|     92|
    |Alice| 23|  90.0|   160|  Bom|     88|  97|     90|
    |Alice| 23|  90.0|   160|Alice|     85|  99|     92|
    |Alice| 23|  90.0|   160| kuke|     77|  82|     80|
    |Alice| 23|  90.0|   160| jike|     65|  58|     30|
    |Alice| 23|  90.0|   160|  Joe|     90| 100|     92|
    | kuke| 33| 190.0|   170|  Bom|     88|  97|     90|
    | kuke| 33| 190.0|   170|Alice|     85|  99|     92|
    | kuke| 33| 190.0|   170| kuke|     77|  82|     80|
    | kuke| 33| 190.0|   170| jike|     65|  58|     30|
    | kuke| 33| 190.0|   170|  Joe|     90| 100|     92|
    | jike| 19| 120.0|   170|  Bom|     88|  97|     90|
    | jike| 19| 120.0|   170|Alice|     85|  99|     92|
    | jike| 19| 120.0|   170| kuke|     77|  82|     80|
    | jike| 19| 120.0|   170| jike|     65|  58|     30|
    | jike| 19| 120.0|   170|  Joe|     90| 100|     92|
    +-----+---+------+------+-----+-------+----+-------+
    

    看到这结果开始有点摸不着头脑,后来看懂了是第一组数据的每一行都会和第二组的每一行生成新的一行。

    情况二:第一组数据行数 > 第二组数据行数

    删掉第二组数据的2和4行,变为:

    +-----+-------+----+-------+
    | name|Chinese|Math|English|
    +-----+-------+----+-------+
    |  Bom|     88|  97|     90|
    | kuke|     77|  82|     80|
    |  Joe|     90| 100|     92|
    +-----+-------+----+-------+
    

    合并代码:

    merge_df = body_df.crossJoin(score_df)
    merge_df.show()
    

    输出:

    +-----+---+------+------+----+-------+----+-------+
    | name|age|weight|height|name|Chinese|Math|English|
    +-----+---+------+------+----+-------+----+-------+
    |  Bom| 20|  97.6|   165| Bom|     88|  97|     90|
    |  Bom| 20|  97.6|   165|kuke|     77|  82|     80|
    |  Bom| 20|  97.6|   165| Joe|     90| 100|     92|
    |Alice| 23|  90.0|   160| Bom|     88|  97|     90|
    |Alice| 23|  90.0|   160|kuke|     77|  82|     80|
    |Alice| 23|  90.0|   160| Joe|     90| 100|     92|
    | kuke| 33| 190.0|   170| Bom|     88|  97|     90|
    | kuke| 33| 190.0|   170|kuke|     77|  82|     80|
    | kuke| 33| 190.0|   170| Joe|     90| 100|     92|
    | jike| 19| 120.0|   170| Bom|     88|  97|     90|
    | jike| 19| 120.0|   170|kuke|     77|  82|     80|
    | jike| 19| 120.0|   170| Joe|     90| 100|     92|
    |  Joe| 24|  89.0|   162| Bom|     88|  97|     90|
    |  Joe| 24|  89.0|   162|kuke|     77|  82|     80|
    |  Joe| 24|  89.0|   162| Joe|     90| 100|     92|
    +-----+---+------+------+----+-------+----+-------+
    

    虽然少了两行,但不影响情况一对结论。

    情况三:第一组数据行数 < 第二组数据行数

    删掉第一组数据的2和4行,变为:

    +-----+---+------+------+
    | name|age|weight|height|
    +-----+---+------+------+
    |  Bom| 20|  97.6|   165|
    | kuke| 33| 190.0|   170|
    |  Joe| 24|  89.0|   162|
    +-----+---+------+------+
    

    合并代码:

    merge_df = body_df.crossJoin(score_df)
    merge_df.show()
    

    输出:

    +----+---+------+------+-----+-------+----+-------+
    |name|age|weight|height| name|Chinese|Math|English|
    +----+---+------+------+-----+-------+----+-------+
    | Bom| 20|  97.6|   165|  Bom|     88|  97|     90|
    | Bom| 20|  97.6|   165|Alice|     85|  99|     92|
    | Bom| 20|  97.6|   165| kuke|     77|  82|     80|
    | Bom| 20|  97.6|   165| jike|     65|  58|     30|
    | Bom| 20|  97.6|   165|  Joe|     90| 100|     92|
    |kuke| 33| 190.0|   170|  Bom|     88|  97|     90|
    |kuke| 33| 190.0|   170|Alice|     85|  99|     92|
    |kuke| 33| 190.0|   170| kuke|     77|  82|     80|
    |kuke| 33| 190.0|   170| jike|     65|  58|     30|
    |kuke| 33| 190.0|   170|  Joe|     90| 100|     92|
    | Joe| 24|  89.0|   162|  Bom|     88|  97|     90|
    | Joe| 24|  89.0|   162|Alice|     85|  99|     92|
    | Joe| 24|  89.0|   162| kuke|     77|  82|     80|
    | Joe| 24|  89.0|   162| jike|     65|  58|     30|
    | Joe| 24|  89.0|   162|  Joe|     90| 100|     92|
    +----+---+------+------+-----+-------+----+-------+
    

    结论和情况一一样。

    3.outer

    情况一:第一组数据行数 = 第二组数据行数

    合并代码:

    merge_df = body_df.join(score_df, on="name", how="outer")
    merge_df.show()
    

    输出:

    +-----+---+------+------+-------+----+-------+
    | name|age|weight|height|Chinese|Math|English|
    +-----+---+------+------+-------+----+-------+
    | jike| 19| 120.0|   170|     65|  58|     30|
    |  Bom| 20|  97.6|   165|     88|  97|     90|
    |  Joe| 24|  89.0|   162|     90| 100|     92|
    |Alice| 23|  90.0|   160|     85|  99|     92|
    | kuke| 33| 190.0|   170|     77|  82|     80|
    +-----+---+------+------+-------+----+-------+
    

    结果和inner合并的情况一一样,结论暂时未知。

    情况二:第一组数据行数 > 第二组数据行数

    删掉第二组数据的2和4行,变为:

    +-----+-------+----+-------+
    | name|Chinese|Math|English|
    +-----+-------+----+-------+
    |  Bom|     88|  97|     90|
    | kuke|     77|  82|     80|
    |  Joe|     90| 100|     92|
    +-----+-------+----+-------+
    

    合并代码:

    merge_df = body_df.join(score_df, on="name", how="outer")
    merge_df.show()
    

    输出:

    +-----+---+------+------+-------+----+-------+
    | name|age|weight|height|Chinese|Math|English|
    +-----+---+------+------+-------+----+-------+
    | jike| 19| 120.0|   170|   null|null|   null|
    |  Bom| 20|  97.6|   165|     88|  97|     90|
    |  Joe| 24|  89.0|   162|     90| 100|     92|
    |Alice| 23|  90.0|   160|   null|null|   null|
    | kuke| 33| 190.0|   170|     77|  82|     80|
    +-----+---+------+------+-------+----+-------+
    

    从输出结果可以看到,没有的数据用null填充。

    情况三:第一组数据行数 < 第二组数据行数

    删掉第一组数据2和4行,变为:

    +-----+---+------+------+
    | name|age|weight|height|
    +-----+---+------+------+
    |  Bom| 20|  97.6|   165|
    | kuke| 33| 190.0|   170|
    |  Joe| 24|  89.0|   162|
    +-----+---+------+------+
    

    合并代码:

    merge_df = body_df.join(score_df, on="name", how="outer")
    merge_df.show()
    

    输出:

    +-----+----+------+------+-------+----+-------+
    | name| age|weight|height|Chinese|Math|English|
    +-----+----+------+------+-------+----+-------+
    | jike|null|  null|  null|     65|  58|     30|
    |  Bom|  20|  97.6|   165|     88|  97|     90|
    |  Joe|  24|  89.0|   162|     90| 100|     92|
    |Alice|null|  null|  null|     85|  99|     92|
    | kuke|  33| 190.0|   170|     77|  82|     80|
    +-----+----+------+------+-------+----+-------+
    

    结合前面两种情况,先根据name列的值求并集,有数据的直接合并,没有的填充null。

    相关文章

      网友评论

          本文标题:Pyspark Join操作

          本文链接:https://www.haomeiwen.com/subject/ccuufltx.html