美文网首页
转《Spark Window 入门介绍》

转《Spark Window 入门介绍》

作者: 井底蛙蛙呱呱呱 | 来源:发表于2020-04-24 21:37 被阅读0次

    转自https://lotabout.me/2019/Spark-Window-Function-Introduction/

    对于一个数据集,map 是对每行进行操作,为每行得到一个结果;reduce 则是对多行进行操作,得到一个结果;而 window 函数则是对多行进行操作,得到多个结果(每行一个)。本文会以实例介绍 window 函数的基本概念和用法。

    示例:计算成绩排名

    例如大学里有许多专业,每个专业有若干个班级,每个班级又有许多学生,这次考试,每个学生的成绩用 pyspark 表示如下:

    df = sqlContext.createDataFrame([
        ["Student A", 1, "Science", 10],
        ["Student B", 1, "Science", 20],
        ["Student C", 2, "Science", 30],
        ["Student D", 2, "Science", 40],
        ["Student D", 3, "Science", 50],
        ["Student E", 4, "Art", 10],
        ["Student F", 4, "Art", 20],
        ["Student G", 5, "Art", 30],
        ["Student H", 5, "Art", 40],
        ["Student I", 6, "Art", 50],
        ], ["name", "class", "subject", "score"])
    

    现在我们的需求是:计算每个学生在专业里的成绩排名。

    首先,我们将学生按专业分成两组:



    接着我们按分数从高到低进行排序:



    之后是执行窗口函数。对于每个学生,我们将排在它之前的所有学生取出,再计算当前学生排在第几名:

    对应的 pyspark 代码如下:

    windowSpec = Window.partitionBy(df.subject)
    windowSpec = windowSpec.orderBy(df.score.desc())
    windowSpec = windowSpec.rowsBetween(Window.unboundedPreceding, Window.currentRow)
    
    df.withColumn('rank', func.rank().over(windowSpec)).show()
    
    # +---------+-------+-----+-----+----+
    # |     name|subject|class|score|rank|
    # +---------+-------+-----+-----+----+
    # |Student B|Science|    1|   70|   1|
    # |Student D|Science|    2|   60|   2|
    # |Student E|Science|    3|   50|   3|
    # |Student C|Science|    2|   30|   4|
    # |Student A|Science|    1|   10|   5|
    # |Student G|    Art|    4|   60|   1|
    # |Student H|    Art|    5|   50|   2|
    # |Student J|    Art|    6|   40|   3|
    # |Student I|    Art|    5|   30|   4|
    # |Student F|    Art|    4|   10|   5|
    # +---------+-------+-----+-----+----+
    

    如何定义窗口

    一个窗口需要定义三个部分:

      1. 分组,如何将行分组?在选取窗口数据时,只对组内数据生效
      1. 排序,按何种方式进行排序?选取窗口数据时,会首先按指定方式排序
      1. 帧(frame)选取,以当前行为基准,如何选取周围行?

    Row Frame(行帧)

    行帧,即选择帧的时候通过行数指定。语法为 rowsBetween(x, y),其中 x, y 可以是数字,-n表示向前数 n 行,n 表示向后数 n 行。除此之外,还可以是:

    • Window.unboundedPreceding 表示当前行之前的无限行
    • Window.currentRow 表示当前行
    • Window.unboundedFollowing 表示当前行之后的无限行

    例如,要选择当前行的前一行和后一行,则 pyspark 的写法为 rowsBetween(-1, 1),对应 SQL 的写法为 ROWS BETWEEN 1 PRECEEDING AND 1 FOLLOWING,表示如下图:


    Range Frame(范围帧)

    有时,我们想根据当前行列值的范围来选取窗口,语法为 rangeBetween(x, y)。例如,当前的分数为 60,选择范围帧 rangeBetween(-20, 20),则会选择所有分数落在 [40, 80] 范围内的行。如下图:


    窗口函数

    从通用性的角度来说,选定帧内数据后,做何种计算,需要让用户自行定义。考虑到效率和便利性等因素,Spark SQL 不支持自定义的窗口函数[1],而是提供了一些内置的优化过的函数,来满足日常的需求。

    Spark SQL 支持三种类型的窗口函数:排名函数(ranking function)、分析函数 (analytic functions)和聚合函数(aggregate functions)。其中聚合函数(如 max, min, avg 等)常用在 reduce 操作中,不再介绍,其它函数如下:


    这些函数在使用时,只需要将函数应用在窗口定义上,例如 avg(df.score).over(windowSpec)。

    小结

    文章给出一个使用窗口函数的示例,并尝试说清如何定义一个窗口,包括帧的选择,最后给出一些常用的窗口函数。

    注意的是窗口函数在 Spark 1.4 开始支持,一些窗口函数在 Spark 1.* 中需要使用 HiveContext 才能运行。

    参考

    相关文章

      网友评论

          本文标题:转《Spark Window 入门介绍》

          本文链接:https://www.haomeiwen.com/subject/rlwqwhtx.html