collect, collect_list, collect_set
collect常用的有两个函数:collect_list(不去重)和collect_set(去重)
# build a test dataframe
df = pd.DataFrame({'type1':['a1', 'a1', 'a1', 'a2', 'a2', 'a2'],
'type2':['b1', 'b2', 'b3', 'b4', 'b5', 'b5'],
'value':[1, 2, 3, 4, 5, 6]})
df = spark.createDataFrame(df)
df.createOrReplaceTempView('collect_test')
df.view()
+-----+-----+-----+
|type1|type2|value|
+-----+-----+-----+
| a1| b1| 1|
| a1| b2| 2|
| a1| b3| 3|
| a2| b4| 4|
| a2| b5| 5|
| a2| b6| 6|
+-----+-----+-----+
collect_list
spark.sql('''SELECT type1, COLLECT_LIST(type2) as type2
FROM collect_test
GROUP BY type1''').show()
+-----+------------+
|type1| type2|
+-----+------------+
| a2|[b4, b5, b5]|
| a1|[b1, b2, b3]|
+-----+------------+
collect_set
spark.sql('''SELECT type1, COLLECT_SET(type2) as type2
FROM collect_test
GROUP BY type1''').show()
+-----+------------+
|type1| type2|
+-----+------------+
| a2| [b4, b5]|
| a1|[b1, b3, b2]|
+-----+------------+
collect后返回的是一个数组,可以通过array[x]来调用数据。通过这点我们可以进行透视表的操作,类似定义array[0] as a1, array[1] as a2...
explode
explode的定义是将数组的每个数据展开,如下我们就可以将上面的dataframe还原为最初的样式。
spark.sql('''SELECT type1, EXPLODE(type2) as type2
FROM(SELECT type1, COLLECT_LIST(type2) as type2
FROM collect_test
GROUP BY type1) a''').show()
+-----+-----+
|type1|type2|
+-----+-----+
| a2| b4|
| a2| b5|
| a2| b5|
| a1| b1|
| a1| b2|
| a1| b3|
+-----+-----+
posexplode可以在拆分列的同时,增加一列序号
spark.sql('''SELECT type1, posexplode(type2) as (index, type2)
FROM(SELECT type1, COLLECT_LIST(type2) as type2
FROM collect_test
GROUP BY type1) a''').show()
+-----+-----+-----+
|type1|index|type2|
+-----+-----+-----+
| a2| 0| b4|
| a2| 1| b5|
| a2| 2| b5|
| a1| 0| b1|
| a1| 1| b2|
| a1| 2| b3|
+-----+-----+-----+
但是如果表内有如下两个一一对应的数组,我们该如何拆分呢?
+-----+------------+---------+
|type1| type2| value|
+-----+------------+---------+
| a2|[b4, b5, b5]|[4, 5, 6]|
| a1|[b1, b2, b3]|[1, 2, 3]|
+-----+------------+---------+
按照直觉,我们尝试分别explode()
spark.sql('''SELECT type1, explode(type2) as type2, explode(value) as value
FROM(SELECT type1, COLLECT_LIST(type2) as type2
, COLLECT_LIST(value) as value
FROM collect_test
GROUP BY type1) a''').show()
AnalysisException: Only one generator allowed per select clause but found 2: explode(type2), explode(value);
解决这个问题,我们需要使用LATERAL VIEW
lateral view
lateral view可以理解为创建了一个表,然后JOIN到了查询的表上,这样就避免了两个生成器的问题
spark.sql('''SELECT type1, exploded_type2.type2, exploded_value.value
FROM(SELECT type1, COLLECT_LIST(type2) as type2
, COLLECT_LIST(value) as value
FROM collect_test
GROUP BY type1) a
LATERAL VIEW POSEXPLODE(type2) exploded_type2 as type_index, type2
LATERAL VIEW POSEXPLODE(value) exploded_value as value_index, value
WHERE type_index = value_index -- 避免为笛卡尔积
''').show()
split
split则是将一个字符串根据分隔符,变化为一个数组
df = pd.DataFrame({'type1':['a', 'b', 'c'],
'type2':['1_2_3', '1_23', '_1']})
df = spark.createDataFrame(df)
df.createOrReplaceTempView('collect_test')
spark.sql('''SELECT * FROM collect_test''').show()
+-----+-----+
|type1|type2|
+-----+-----+
| a|1_2_3|
| b| 1_23|
| c| _1|
+-----+-----+
spark.sql('''SELECT type1, split(type2, '_') as splited_type2 FROM collect_test''').show()
+-----+-------------------+
|type1|splited_type2|
+-----+-------------------+
| a| [1, 2, 3]|
| b| [1, 23]|
| c| [, 1]|
+-----+-------------------+
transform
transform会引用一个函数在数组的每个元素上,返回一个数列
schema = StructType([StructField('celsius', ArrayType(IntegerType()))])
t_list = [[35, 36, 32, 30, 40, 42, 38]], [[31, 32, 34, 55, 56]]
t_c = spark.createDataFrame(t_list, schema)
t_c.createOrReplaceTempView("tC")
t_c.show()
+--------------------+
| celsius|
+--------------------+
|[35, 36, 32, 30, ...|
|[31, 32, 34, 55, 56]|
+--------------------+
spark.sql(""" SELECT celsius, TRANSFORM(celsius, t -> ((t * 9) div 5) + 32) as fahrenheit
FROM tC """).show()
+--------------------+--------------------+
| celsius| fahrenheit|
+--------------------+--------------------+
|[35, 36, 32, 30, ...|[95, 96, 89, 86, ...|
|[31, 32, 34, 55, 56]|[87, 89, 93, 131,...|
+--------------------+--------------------+
filter
filter为通过条件删选,返回一个数列
spark.sql(""" SELECT celsius, FILTER(celsius, t -> t >= 40) as high_temp
FROM tC """).show()
+--------------------+---------+
| celsius|high_temp|
+--------------------+---------+
|[35, 36, 32, 30, ...| [40, 42]|
|[31, 32, 34, 55, 56]| [55, 56]|
+--------------------+---------+
exists
exists为判断是否包含该元素,返回一个布尔值
spark.sql(""" SELECT celsius, EXISTS(celsius, t -> t >= 40) as is_high_temp
FROM tC """).show()
+--------------------+------------+
| celsius|is_high_temp|
+--------------------+------------+
|[35, 36, 32, 30, ...| true|
|[31, 32, 34, 55, 56]| true|
+--------------------+------------+
reduce
reduce为通过两个函数,将数组聚合为一个值,然后对该值进行运算
spark.sql(""" SELECT celsius,
reduce(celsius
, (t, acc) -> ((t * 9) div 5) + 32 + acc
, acc -> (acc div size(celsius))) as avgFahrenheit
FROM tC """).show()
+--------------------+-------------+
| celsius|avgFahrenheit|
+--------------------+-------------+
|[35, 36, 32, 30, ...| 96|
|[31, 32, 34, 55, 56]| 105|
+--------------------+-------------+
其他函数
Spark SQL高级函数 part1Spark SQL高级函数 part2
Spark SQL高级函数 part3
Reference
Learning Spark 2nd - Lightning Fast Big Data Analysis by Jules S. Damji, Brooke Wenig, Tathagata Das, and Denny Lee
网友评论