美文网首页Spark
Learning Spark [6] - Spark SQL高级

Learning Spark [6] - Spark SQL高级

作者: 屹然1ran | 来源:发表于2021-01-28 16:19 被阅读0次

    collect, collect_list, collect_set

    collect常用的有两个函数:collect_list(不去重)和collect_set(去重)

    # build a test dataframe
    df = pd.DataFrame({'type1':['a1', 'a1', 'a1', 'a2', 'a2', 'a2'],
                       'type2':['b1', 'b2', 'b3', 'b4', 'b5', 'b5'],
                       'value':[1, 2, 3, 4, 5, 6]})
    df = spark.createDataFrame(df)
    df.createOrReplaceTempView('collect_test')
    df.view()
    
    +-----+-----+-----+
    |type1|type2|value|
    +-----+-----+-----+
    |   a1|   b1|    1|
    |   a1|   b2|    2|
    |   a1|   b3|    3|
    |   a2|   b4|    4|
    |   a2|   b5|    5|
    |   a2|   b6|    6|
    +-----+-----+-----+
    

    collect_list

    spark.sql('''SELECT type1, COLLECT_LIST(type2) as type2 
                 FROM collect_test 
                 GROUP BY type1''').show()
    
    +-----+------------+
    |type1|       type2|
    +-----+------------+
    |   a2|[b4, b5, b5]|
    |   a1|[b1, b2, b3]|
    +-----+------------+
    

    collect_set

    spark.sql('''SELECT type1, COLLECT_SET(type2) as type2 
                 FROM collect_test 
                 GROUP BY type1''').show()
    
    +-----+------------+
    |type1|       type2|
    +-----+------------+
    |   a2|    [b4, b5]|
    |   a1|[b1, b3, b2]|
    +-----+------------+
    

    collect后返回的是一个数组,可以通过array[x]来调用数据。通过这点我们可以进行透视表的操作,类似定义array[0] as a1, array[1] as a2...

    explode

    explode的定义是将数组的每个数据展开,如下我们就可以将上面的dataframe还原为最初的样式。

    spark.sql('''SELECT type1, EXPLODE(type2) as type2
                 FROM(SELECT type1, COLLECT_LIST(type2) as type2 
                      FROM collect_test 
                      GROUP BY type1) a''').show()
    
    +-----+-----+
    |type1|type2|
    +-----+-----+
    |   a2|   b4|
    |   a2|   b5|
    |   a2|   b5|
    |   a1|   b1|
    |   a1|   b2|
    |   a1|   b3|
    +-----+-----+
    

    posexplode可以在拆分列的同时,增加一列序号

    spark.sql('''SELECT type1, posexplode(type2) as (index, type2)
                 FROM(SELECT type1, COLLECT_LIST(type2) as type2 
                      FROM collect_test 
                      GROUP BY type1) a''').show()
    
    +-----+-----+-----+
    |type1|index|type2|
    +-----+-----+-----+
    |   a2|    0|   b4|
    |   a2|    1|   b5|
    |   a2|    2|   b5|
    |   a1|    0|   b1|
    |   a1|    1|   b2|
    |   a1|    2|   b3|
    +-----+-----+-----+
    

    但是如果表内有如下两个一一对应的数组,我们该如何拆分呢?

    +-----+------------+---------+
    |type1|       type2|    value|
    +-----+------------+---------+
    |   a2|[b4, b5, b5]|[4, 5, 6]|
    |   a1|[b1, b2, b3]|[1, 2, 3]|
    +-----+------------+---------+
    

    按照直觉,我们尝试分别explode()

    spark.sql('''SELECT type1, explode(type2) as type2, explode(value) as value
                 FROM(SELECT type1, COLLECT_LIST(type2) as type2
                        , COLLECT_LIST(value) as value 
                      FROM collect_test 
                      GROUP BY type1) a''').show()
    
    AnalysisException: Only one generator allowed per select clause but found 2: explode(type2), explode(value);
    

    解决这个问题,我们需要使用LATERAL VIEW

    lateral view

    lateral view可以理解为创建了一个表,然后JOIN到了查询的表上,这样就避免了两个生成器的问题

    spark.sql('''SELECT type1, exploded_type2.type2, exploded_value.value
                 FROM(SELECT type1, COLLECT_LIST(type2) as type2
                        , COLLECT_LIST(value) as value 
                      FROM collect_test 
                      GROUP BY type1) a
                 LATERAL VIEW POSEXPLODE(type2) exploded_type2 as type_index, type2
                 LATERAL VIEW POSEXPLODE(value) exploded_value as value_index, value
                 WHERE type_index = value_index -- 避免为笛卡尔积
                 ''').show()
    

    split

    split则是将一个字符串根据分隔符,变化为一个数组

    df = pd.DataFrame({'type1':['a', 'b', 'c'],
                       'type2':['1_2_3', '1_23', '_1']})
    df = spark.createDataFrame(df)
    df.createOrReplaceTempView('collect_test')
    spark.sql('''SELECT * FROM collect_test''').show()
    
    +-----+-----+
    |type1|type2|
    +-----+-----+
    |    a|1_2_3|
    |    b| 1_23|
    |    c|   _1|
    +-----+-----+
    
    spark.sql('''SELECT type1, split(type2, '_') as splited_type2 FROM collect_test''').show()
    
    +-----+-------------------+
    |type1|splited_type2|
    +-----+-------------------+
    |    a|          [1, 2, 3]|
    |    b|            [1, 23]|
    |    c|              [, 1]|
    +-----+-------------------+
    

    transform

    transform会引用一个函数在数组的每个元素上,返回一个数列

    schema = StructType([StructField('celsius', ArrayType(IntegerType()))])
    t_list = [[35, 36, 32, 30, 40, 42, 38]], [[31, 32, 34, 55, 56]] 
    t_c = spark.createDataFrame(t_list, schema) 
    t_c.createOrReplaceTempView("tC")
    t_c.show()
    
    +--------------------+
    |             celsius|
    +--------------------+
    |[35, 36, 32, 30, ...|
    |[31, 32, 34, 55, 56]|
    +--------------------+
    
    spark.sql(""" SELECT celsius, TRANSFORM(celsius, t -> ((t * 9) div 5) + 32) as fahrenheit 
                  FROM tC """).show()
    
    +--------------------+--------------------+
    |             celsius|          fahrenheit|
    +--------------------+--------------------+
    |[35, 36, 32, 30, ...|[95, 96, 89, 86, ...|
    |[31, 32, 34, 55, 56]|[87, 89, 93, 131,...|
    +--------------------+--------------------+
    

    filter

    filter为通过条件删选,返回一个数列

    spark.sql(""" SELECT celsius, FILTER(celsius, t -> t >= 40) as high_temp 
                  FROM tC """).show()
    
    +--------------------+---------+
    |             celsius|high_temp|
    +--------------------+---------+
    |[35, 36, 32, 30, ...| [40, 42]|
    |[31, 32, 34, 55, 56]| [55, 56]|
    +--------------------+---------+
    

    exists

    exists为判断是否包含该元素,返回一个布尔值

    spark.sql(""" SELECT celsius, EXISTS(celsius, t -> t >= 40) as is_high_temp 
                  FROM tC """).show()
    
    +--------------------+------------+
    |             celsius|is_high_temp|
    +--------------------+------------+
    |[35, 36, 32, 30, ...|        true|
    |[31, 32, 34, 55, 56]|        true|
    +--------------------+------------+
    

    reduce

    reduce为通过两个函数,将数组聚合为一个值,然后对该值进行运算

    spark.sql(""" SELECT celsius, 
                         reduce(celsius
                                , (t, acc) -> ((t * 9) div 5) + 32 + acc
                                , acc -> (acc div size(celsius))) as avgFahrenheit 
                  FROM tC """).show()
    
    +--------------------+-------------+ 
    |             celsius|avgFahrenheit| 
    +--------------------+-------------+ 
    |[35, 36, 32, 30, ...|           96| 
    |[31, 32, 34, 55, 56]|          105| 
    +--------------------+-------------+
    

    其他函数

    Spark SQL高级函数 part1
    Spark SQL高级函数 part2
    Spark SQL高级函数 part3

    Reference
    Learning Spark 2nd - Lightning Fast Big Data Analysis by Jules S. Damji, Brooke Wenig, Tathagata Das, and Denny Lee

    相关文章

      网友评论

        本文标题:Learning Spark [6] - Spark SQL高级

        本文链接:https://www.haomeiwen.com/subject/bsiazktx.html