复杂数据类型通常由简单数据构成,在操作这些复杂数据的时候常常会使用explode and collect
方法,另一种方法是使用udf函数来直接操作数据。在spark中有许多内置函数用来操作复杂数据类型:
Array type functions
Function | Description | Qeury | Output |
---|---|---|---|
array_distinct(array<T>): array<T> | Removes duplicates within an array | SELECT array_distinct(array(1, 2, 3, null, 3)); | [1,2,3,null] |
array_intersect(array<T>, array<T>): array<T> | Returns the intersection of two arrays without duplicates | SELECT array_inter sect(array(1, 2, 3), array(1, 3, 5)); | [1,3] |
array_union(array<T>, array<T>): array<T> | Returns the union of two arrays without duplicates | SELECT array_union(array(1, 2, 3), array(1, 3, 5)); | [1,2,3,5] |
array_except(array<T>, array<T>): array<T> | Returns elements in array1 but not in array2, without duplicates | SELECT array_except(array(1, 2, 3), array(1, 3, 5)); | [2] |
array_join(array<String>, String[, String]): String | Concatenates the elements of an array using a delimiter | SELECT array_join(array('hello', 'world'), ' '); | hello world |
array_max(array<T>): T | Returns the maximum value within the array; null elements are skipped | SELECT array_max(array(1, 20, null, 3)); | 20 |
array_min(array<T>): T | Returns the minimum value within the array; null elements are skipped | SELECT array_min(array(1, 20, null, 3)); | 1 |
array_position(array<T>, T): Long | Returns the (1-based) index of the first element of the given array as a Long | SELECT array_position(array(3, 2, 1), 1); | 3 |
array_remove(array<T>, T): array<T> | Removes all elements that are equal to the given element from the given array | SELECT array_remove(array(1, 2, 3, null, 3), 3); | [1,2,null] |
arrays_overlap(array<T>, array<T>): array<T> | Returns true if array1 contains at least one non-null element also present in array2 | SELECT arrays_overlap(array(1, 2, 3), array(3, 4, 5)); | true |
array_sort(array<T>): array<T> | Sorts the input array in ascending order, with null elements placed at the end of the array | SELECT array_sort(array('b', 'd', null, 'c', 'a')); | ["a","b","c","d",null] |
concat(array<T>, ...): array<T> | Concatenates strings, binaries, arrays, etc. | SELECT concat(array(1, 2, 3), array(4, 5), array(6)); | [1,2,3,4,5,6] |
flatten(array<array<T>>): array<T> | Flattens an array of arrays into a single array | SELECT flatten(array(array(1, 2), array(3, 4))); | [1,2,3,4] |
array_repeat(T, Int): array<T> | Returns an array containing the specified element the specified number of times | SELECT array_repeat('123', 3); | ["123","123","123"] |
reverse(array<T>): array<T> | Returns a reversed string or an array with the reverse order of elements | SELECT reverse(array(2, 1, 4, 3)); | [3,4,1,2] |
sequence(T, T[, T]): array<T> | Generates an array of elements from start to stop (inclusive) by incremental step | SELECT sequence(1, 5);SELECT sequence(5, 1); SELECT sequence(to_date('2018-01-01'), to_date('2018-03-01'), interval 1 month); | [1,2,3,4,5] [5,4,3,2,1] ["2018-01-01", "2018-02-01", "2018-03-01"] |
shuffle(array<T>): array<T> | Returns a random permutation of the given array | SELECT shuffle(array(1, 20, null, 3)); | [null,3,20,1] |
slice(array<T>, Int, Int): array<T> | Returns a subset of the given array starting from the given index (counting from the end if the index is negative), of the specified length | SELECT slice(array(1, 2, 3, 4), -2, 2); | [3,4] |
array_zip(array<T>, array<U>, ...): array<struct<T, U, ...>> | Returns a merged array of structs | SELECT arrays_zip(array(1, 2), array(2, 3), array(3, 4)); | [{"0":1,"1":2,"2":3},{"0":2,"1":3,"2":4}] |
element_at(array<T>, Int): T | Returns the element of the given array at the given (1-based) index | SELECT element_at(array(1, 2, 3), 2); | 2 |
cardinality(array<T>): Int | An alias of size; returns the size of the given array or a map | SELECT cardinality(array('b', 'd', 'c', 'a')); | 4 |
Map functions
Function | Description | Qeury | Output |
---|---|---|---|
map_form_arrays(array<K>, array<V>): map<K, V> | Creates a map from the given pair of key/value arrays; elements in keys should not be null | SELECT map_from_arrays(array(1.0, 3.0), array('2', '4')); | {"1.0":"2", "3.0":"4"} |
map_from_entries(array<struct<K, V>>): map<K, V> | Returns a map created from the given array | SELECT map_from_entries(array(struct(1, 'a'), struct(2, 'b'))); | {"1":"a", "2":"b"} |
map_concat(map<K, V>, ...): map<K, V> | Returns the union of the input maps | SELECT map_concat(map(1, 'a', 2, 'b'), map(2, 'c', 3, 'd')); | {"1":"a", "2":"c","3":"d"} |
element_at(map<K, V>, K): V | Returns the value of the given key, or null if the key is not contained in the map | SELECT element_at(map(1, 'a', 2, 'b'), 2); | b |
cardinality(array<T>): Int | An alias of size; returns the size of the given array or a map | SELECT cardinality(map(1, 'a', 2, 'b')); | 2 |
高级函数
- transform,
transform(array<T>, function<T, U>): array<U>
, 将function函数应用于array中的每一个元素。于UDF/map函数类似但是其效率更高; - filter,
filter(array<T>, function<T, Boolean>): array<T>
, 返回function值为true的元素; - exists,
exists(array<T>, function<T, V, Boolean>): Boolean
, 检查array中是否存在满足条件的元素; - reduce,
reduce(array<T>, B, function<B, T, B>, function<B, R>)
,function<B, T, B>
与一般的reduce函数作用相同,最后生成一个元素,而function<B, R>则是对最后一个元素进行操作生成最终元素;
// Create DataFrame with two rows of two arrays (tempc1, tempc2)
val t1 = Array(35, 36, 32, 30, 40, 42, 38)
val t2 = Array(31, 32, 34, 55, 56)
val tC = Seq(t1, t2).toDF("celsius")
tC.createOrReplaceTempView("tC")
// Show the DataFrame
tC.show()
+--------------------+
| celsius|
+--------------------+
|[35, 36, 32, 30, ...|
|[31, 32, 34, 55, 56]|
+--------------------+
// Calculate Fahrenheit from Celsius for an array of temperatures
spark.sql("""
SELECT celsius,
transform(celsius, t -> ((t * 9) div 5) + 32) as fahrenheit
FROM tC
""").show()
+--------------------+--------------------+
| celsius| fahrenheit|
+--------------------+--------------------+
|[35, 36, 32, 30, ...|[95, 96, 89, 86, ...|
|[31, 32, 34, 55, 56]|[87, 89, 93, 131,...|
+--------------------+--------------------+
// Filter temperatures > 38C for array of temperatures
spark.sql("""
SELECT celsius,
filter(celsius, t -> t > 38) as high
FROM tC
""").show()
+--------------------+--------+
| celsius| high|
+--------------------+--------+
|[35, 36, 32, 30, ...|[40, 42]| |[31, 32, 34, 55, 56]|[55, 56]|
+--------------------+--------+
// Is there a temperature of 38C in the array of temperatures
spark.sql("""
SELECT celsius,
exists(celsius, t -> t = 38) as threshold
FROM tC
""").show()
+--------------------+---------+
| celsius|threshold|
+--------------------+---------+
|[35, 36, 32, 30, ...| true|
|[31, 32, 34, 55, 56]| false|
+--------------------+---------+
// Calculate average temperature and convert to F
spark.sql("""
SELECT celsius,
reduce(
celsius,
0,
(t, acc) -> t + acc,
acc -> (acc div size(celsius) * 9 div 5) + 32
) as avgFahrenheit
FROM tC
""").show()
+--------------------+-------------+
| celsius|avgFahrenheit|
+--------------------+-------------+
|[35, 36, 32, 30, ...| 96| |[31, 32, 34, 55, 56]| 105|
+--------------------+-------------+
窗口函数(Window)
SQL | DataFrame API | |
---|---|---|
Ranking functions | rank() | rank() |
dense_rank() | denseRank() | |
percent_rank() | percentRank() | |
ntile() | ntile() | |
row_number() | rowNumber() | |
Analytic functions | cume_dist() | cumeDist() |
first_value() | firstValue() | |
last_value() | lastValue() | |
lag() | lag() | |
lead() | lead() |
// 实例数据
SELECT * FROM departureDelaysWindow
+------+-----------+-----------+
|origin|destination|TotalDelays|
+------+-----------+-----------+
| JFK| ORD| 5608|
| SEA| LAX| 9359|
| JFK| SFO| 35619|
| SFO| ORD| 27412|
| JFK| DEN| 4315|
| SFO| DEN| 18688|
| SFO| SEA| 17080|
| SEA| SFO| 22293|
| JFK| ATL| 12141|
| SFO| ATL| 5091|
| SEA| DEN| 13645|
| SEA| ATL| 4535|
| SEA| ORD| 10041|
| JFK| SEA| 7856|
| JFK| LAX| 35755|
| SFO| JFK| 24100|
| SFO| LAX| 40798|
| SEA| JFK| 4667|
+------+-----------+-----------+
// 选择每个出发origin延期最多的始发站
spark.sql("""
SELECT origin, destination, TotalDelays, rank
FROM (
SELECT origin, destination, TotalDelays, dense_rank()
OVER (PARTITION BY origin ORDER BY TotalDelays DESC) as rank
FROM departureDelaysWindow
) t
WHERE rank <= 3
""").show()
+------+-----------+-----------+----+
|origin|destination|TotalDelays|rank|
+------+-----------+-----------+----+
| SEA| SFO| 22293| 1|
| SEA| DEN| 13645| 2|
| SEA| ORD| 10041| 3|
| SFO| LAX| 40798| 1|
| SFO| ORD| 27412| 2|
| SFO| JFK| 24100| 3|
| JFK| LAX| 35755| 1|
| JFK| SFO| 35619| 2|
| JFK| ATL| 12141| 3|
+------+-----------+-----------+----+
参考:
learningSpark2.0
https://spark.apache.org/docs/latest/configuration.html
https://spark.apache.org/docs/latest/api/sql/
https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html
网友评论