美文网首页
Spark高级函数

Spark高级函数

作者: 井底蛙蛙呱呱呱 | 来源:发表于2021-11-28 17:17 被阅读0次

复杂数据类型通常由简单数据构成,在操作这些复杂数据的时候常常会使用explode and collect方法,另一种方法是使用udf函数来直接操作数据。在spark中有许多内置函数用来操作复杂数据类型:

Array type functions

Function Description Qeury Output
array_distinct(array<T>): array<T> Removes duplicates within an array SELECT array_distinct(array(1, 2, 3, null, 3)); [1,2,3,null]
array_intersect(array<T>, array<T>): array<T> Returns the intersection of two arrays without duplicates SELECT array_inter sect(array(1, 2, 3), array(1, 3, 5)); [1,3]
array_union(array<T>, array<T>): array<T> Returns the union of two arrays without duplicates SELECT array_union(array(1, 2, 3), array(1, 3, 5)); [1,2,3,5]
array_except(array<T>, array<T>): array<T> Returns elements in array1 but not in array2, without duplicates SELECT array_except(array(1, 2, 3), array(1, 3, 5)); [2]
array_join(array<String>, String[, String]): String Concatenates the elements of an array using a delimiter SELECT array_join(array('hello', 'world'), ' '); hello world
array_max(array<T>): T Returns the maximum value within the array; null elements are skipped SELECT array_max(array(1, 20, null, 3)); 20
array_min(array<T>): T Returns the minimum value within the array; null elements are skipped SELECT array_min(array(1, 20, null, 3)); 1
array_position(array<T>, T): Long Returns the (1-based) index of the first element of the given array as a Long SELECT array_position(array(3, 2, 1), 1); 3
array_remove(array<T>, T): array<T> Removes all elements that are equal to the given element from the given array SELECT array_remove(array(1, 2, 3, null, 3), 3); [1,2,null]
arrays_overlap(array<T>, array<T>): array<T> Returns true if array1 contains at least one non-null element also present in array2 SELECT arrays_overlap(array(1, 2, 3), array(3, 4, 5)); true
array_sort(array<T>): array<T> Sorts the input array in ascending order, with null elements placed at the end of the array SELECT array_sort(array('b', 'd', null, 'c', 'a')); ["a","b","c","d",null]
concat(array<T>, ...): array<T> Concatenates strings, binaries, arrays, etc. SELECT concat(array(1, 2, 3), array(4, 5), array(6)); [1,2,3,4,5,6]
flatten(array<array<T>>): array<T> Flattens an array of arrays into a single array SELECT flatten(array(array(1, 2), array(3, 4))); [1,2,3,4]
array_repeat(T, Int): array<T> Returns an array containing the specified element the specified number of times SELECT array_repeat('123', 3); ["123","123","123"]
reverse(array<T>): array<T> Returns a reversed string or an array with the reverse order of elements SELECT reverse(array(2, 1, 4, 3)); [3,4,1,2]
sequence(T, T[, T]): array<T> Generates an array of elements from start to stop (inclusive) by incremental step SELECT sequence(1, 5);SELECT sequence(5, 1); SELECT sequence(to_date('2018-01-01'), to_date('2018-03-01'), interval 1 month); [1,2,3,4,5] [5,4,3,2,1] ["2018-01-01", "2018-02-01", "2018-03-01"]
shuffle(array<T>): array<T> Returns a random permutation of the given array SELECT shuffle(array(1, 20, null, 3)); [null,3,20,1]
slice(array<T>, Int, Int): array<T> Returns a subset of the given array starting from the given index (counting from the end if the index is negative), of the specified length SELECT slice(array(1, 2, 3, 4), -2, 2); [3,4]
array_zip(array<T>, array<U>, ...): array<struct<T, U, ...>> Returns a merged array of structs SELECT arrays_zip(array(1, 2), array(2, 3), array(3, 4)); [{"0":1,"1":2,"2":3},{"0":2,"1":3,"2":4}]
element_at(array<T>, Int): T Returns the element of the given array at the given (1-based) index SELECT element_at(array(1, 2, 3), 2); 2
cardinality(array<T>): Int An alias of size; returns the size of the given array or a map SELECT cardinality(array('b', 'd', 'c', 'a')); 4

Map functions

Function Description Qeury Output
map_form_arrays(array<K>, array<V>): map<K, V> Creates a map from the given pair of key/value arrays; elements in keys should not be null SELECT map_from_arrays(array(1.0, 3.0), array('2', '4')); {"1.0":"2", "3.0":"4"}
map_from_entries(array<struct<K, V>>): map<K, V> Returns a map created from the given array SELECT map_from_entries(array(struct(1, 'a'), struct(2, 'b'))); {"1":"a", "2":"b"}
map_concat(map<K, V>, ...): map<K, V> Returns the union of the input maps SELECT map_concat(map(1, 'a', 2, 'b'), map(2, 'c', 3, 'd')); {"1":"a", "2":"c","3":"d"}
element_at(map<K, V>, K): V Returns the value of the given key, or null if the key is not contained in the map SELECT element_at(map(1, 'a', 2, 'b'), 2); b
cardinality(array<T>): Int An alias of size; returns the size of the given array or a map SELECT cardinality(map(1, 'a', 2, 'b')); 2

高级函数

  • transform, transform(array<T>, function<T, U>): array<U>, 将function函数应用于array中的每一个元素。于UDF/map函数类似但是其效率更高;
  • filter, filter(array<T>, function<T, Boolean>): array<T>, 返回function值为true的元素;
  • exists, exists(array<T>, function<T, V, Boolean>): Boolean, 检查array中是否存在满足条件的元素;
  • reduce, reduce(array<T>, B, function<B, T, B>, function<B, R>), function<B, T, B>与一般的reduce函数作用相同,最后生成一个元素,而function<B, R>则是对最后一个元素进行操作生成最终元素;
// Create DataFrame with two rows of two arrays (tempc1, tempc2)
val t1 = Array(35, 36, 32, 30, 40, 42, 38)
val t2 = Array(31, 32, 34, 55, 56)
val tC = Seq(t1, t2).toDF("celsius")
tC.createOrReplaceTempView("tC")
// Show the DataFrame
tC.show()
+--------------------+
| celsius|
+--------------------+
|[35, 36, 32, 30, ...|
|[31, 32, 34, 55, 56]|
+--------------------+

// Calculate Fahrenheit from Celsius for an array of temperatures
spark.sql("""
SELECT celsius,
 transform(celsius, t -> ((t * 9) div 5) + 32) as fahrenheit 
 FROM tC
""").show()
+--------------------+--------------------+
| celsius| fahrenheit|
+--------------------+--------------------+
|[35, 36, 32, 30, ...|[95, 96, 89, 86, ...|
|[31, 32, 34, 55, 56]|[87, 89, 93, 131,...|
+--------------------+--------------------+

// Filter temperatures > 38C for array of temperatures
spark.sql("""
SELECT celsius, 
 filter(celsius, t -> t > 38) as high 
 FROM tC
""").show()
+--------------------+--------+
| celsius| high|
+--------------------+--------+
|[35, 36, 32, 30, ...|[40, 42]| |[31, 32, 34, 55, 56]|[55, 56]|
+--------------------+--------+

// Is there a temperature of 38C in the array of temperatures
spark.sql("""
SELECT celsius, 
 exists(celsius, t -> t = 38) as threshold
 FROM tC
""").show()
+--------------------+---------+
| celsius|threshold|
+--------------------+---------+
|[35, 36, 32, 30, ...| true|
|[31, 32, 34, 55, 56]| false|
+--------------------+---------+

// Calculate average temperature and convert to F
spark.sql("""
SELECT celsius, 
 reduce(
 celsius, 
 0, 
 (t, acc) -> t + acc, 
 acc -> (acc div size(celsius) * 9 div 5) + 32
 ) as avgFahrenheit 
 FROM tC
""").show()
+--------------------+-------------+
| celsius|avgFahrenheit|
+--------------------+-------------+
|[35, 36, 32, 30, ...| 96| |[31, 32, 34, 55, 56]| 105|
+--------------------+-------------+

窗口函数(Window)

SQL DataFrame API
Ranking functions rank() rank()
dense_rank() denseRank()
percent_rank() percentRank()
ntile() ntile()
row_number() rowNumber()
Analytic functions cume_dist() cumeDist()
first_value() firstValue()
last_value() lastValue()
lag() lag()
lead() lead()
// 实例数据
SELECT * FROM departureDelaysWindow
+------+-----------+-----------+
|origin|destination|TotalDelays|
+------+-----------+-----------+
| JFK| ORD| 5608| 
| SEA| LAX| 9359| 
| JFK| SFO| 35619| 
| SFO| ORD| 27412|
| JFK| DEN| 4315| 
| SFO| DEN| 18688|
| SFO| SEA| 17080| 
| SEA| SFO| 22293| 
| JFK| ATL| 12141| 
| SFO| ATL| 5091| 
| SEA| DEN| 13645| 
| SEA| ATL| 4535| 
| SEA| ORD| 10041| 
| JFK| SEA| 7856| 
| JFK| LAX| 35755| 
| SFO| JFK| 24100| 
| SFO| LAX| 40798| 
| SEA| JFK| 4667| 
+------+-----------+-----------+

// 选择每个出发origin延期最多的始发站
spark.sql("""
SELECT origin, destination, TotalDelays, rank 
 FROM ( 
 SELECT origin, destination, TotalDelays, dense_rank() 
 OVER (PARTITION BY origin ORDER BY TotalDelays DESC) as rank 
 FROM departureDelaysWindow
 ) t 
 WHERE rank <= 3
""").show()
+------+-----------+-----------+----+
|origin|destination|TotalDelays|rank|
 +------+-----------+-----------+----+
| SEA| SFO| 22293| 1| 
| SEA| DEN| 13645| 2| 
| SEA| ORD| 10041| 3| 
| SFO| LAX| 40798| 1| 
| SFO| ORD| 27412| 2| 
| SFO| JFK| 24100| 3| 
| JFK| LAX| 35755| 1| 
| JFK| SFO| 35619| 2| 
| JFK| ATL| 12141| 3| 
+------+-----------+-----------+----+

参考:
learningSpark2.0
https://spark.apache.org/docs/latest/configuration.html
https://spark.apache.org/docs/latest/api/sql/
https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html

相关文章

网友评论

      本文标题:Spark高级函数

      本文链接:https://www.haomeiwen.com/subject/oxqhxrtx.html