获取DataFrame首行中的某一列的值
data.show(2, false)
+------+----+------+
| name|rank|income|
+------+----+------+
|Brooke| 0| 20|
|Brooke| 0| 25|
+------+----+------+
data.first().getAs[String]("name") // Brooke
data.first().getAs[Int]("rank") // 0
data.first().getAs[Int]("income") // 20
其它参考:https://stackoverflow.com/questions/33007840/spark-extracting-values-from-a-row
udf函数传入参数
用法与Python闭包类似:
def f1(x:Int) = udf((y:Int) => x+y)
val f2 = (x:Int) => udf((y:Int) => x+y)
val f3 = (bcMap: Map[String, Int]) => udf((x:String) => bcMap(x))
val bcMap = spark.sparkContext.broadcast(xMap)
# 调用
df.withColumn("f1", f1(1)(colname))
.withColumn("f2", f2(2)(colname))
.withColumn("f3", f3(bcMap)(colname)
如何转置spark DataFrame?
参考:https://nikhil-suthar-bigdata.medium.com/how-to-transpose-spark-dataframe-fa82c079a6b
def TransposeDF(df: DataFrame, columns: Seq[String], pivotCol: String): DataFrame = {
val columnsValue = columns.map(x => "'" + x + "', " + x)
val stackCols = columnsValue.mkString(",")
val df_1 = df.selectExpr(pivotCol, "stack(" + columns.size + "," + stackCols + ")")
.select(pivotCol, "col0", "col1")
val final_df = df_1.groupBy(col("col0")).pivot(pivotCol).agg(concat_ws("", collect_list(col("col1"))))
.withColumnRenamed("col0", pivotCol)
final_df
}
val DataSeq = Seq(("Shirts", 10, 13, 34, 10), ("Trousers", 11, 2, 30, 20), ("Pants", 70, 43, 24, 60), ("Sweater", 101, 44, 54, 80))
import spark.implicits._
val productQtyDF = DataSeq.toDF("Products", "Small", "Medium", "Large", "ExLarge")
productQtyDF.show()
+--------+-----+------+-----+-------+
|Products|Small|Medium|Large|ExLarge|
+--------+-----+------+-----+-------+
|Shirts |10 |13 |34 |10 |
|Trousers|11 |2 |30 |20 |
|Pants |70 |43 |24 |60 |
|Sweater |101 |44 |54 |80 |
+--------+-----+------+-----+-------+
val productTypeDF = TransposeDF(productQtyDF, Seq("Small", "Medium", "Large", "ExLarge"), "Products")
productTypeDF.show(false)
+--------+-----+------+-------+--------+
|Products|Pants|Shirts|Sweater|Trousers|
+--------+-----+------+-------+--------+
|Medium |43 |13 |44 |2 |
|Small |70 |10 |101 |11 |
|ExLarge |60 |10 |80 |20 |
|Large |24 |34 |54 |30 |
+--------+-----+------+-------+--------+
spark做描述性统计
参考:https://blog.csdn.net/k_wzzc/article/details/84038233
// 本文所使用的数据集以鸢尾花数据集为例
summary(iris)
Sepal.Length Sepal.Width Petal.Length Petal.Width Species
Min. :4.300 Min. :2.000 Min. :1.000 Min. :0.100 setosa :50
1st Qu.:5.100 1st Qu.:2.800 1st Qu.:1.600 1st Qu.:0.300 versicolor:50
Median :5.800 Median :3.000 Median :4.350 Median :1.300 virginica :50
Mean :5.843 Mean :3.057 Mean :3.758 Mean :1.199
3rd Qu.:6.400 3rd Qu.:3.300 3rd Qu.:5.100 3rd Qu.:1.800
Max. :7.900 Max. :4.400 Max. :6.900 Max. :2.500
// 加载鸢尾花数据集
val irisDF = spark.read
.options(Map("header" -> "true",
"nullValue" -> "?",
"inferSchema" -> "true"))
.csv(inputFile)
//第一个参数 列的名称,可以是单独的列名,也可以是列名数组
//第二个参数 分位数的值数组 。
//最后一个参数 表示错误率,数值在 0-1之间 值越小精确度越高
val summary1 = irisDF.stat.approxQuantile(Array("sepalLength", "sepalWidth"),
Array(0.25, 0.5, 0.75), 0)
val summary2 = irisDF.stat.approxQuantile("sepalLength", Array(0.25, 0.5, 0.75), 0)
// summary2 返回的结果是Array(5.1 ,5.8 ,6.4),与R语言的结果一致,summary1返回的嵌套的array
// 以DataFrame形式返回统计结果
// 定义常用的几个数值型变量类型
val numType = Array(DoubleType, IntegerType, LongType, DecimalType, FloatType)
val tuples = irisDF.schema.map(sf => {
(sf.name, sf.dataType)
}).filter(tp => {
numType.contains(tp._2)
}).map(scm => {
val quantiles = irisDF.stat.approxQuantile(scm._1, Array(0.25, 0.5, 0.75), 0)
val col = scm._1
(col, quantiles)
})
val quantileDF = tuples
.toDF("attribute", "quantiles")
.withColumn("q1", $"quantiles".getItem(0))
.withColumn("median", $"quantiles".getItem(0))
.withColumn("q3", $"quantiles".getItem(2))
.withColumn("IOR", $"q3" - $"q1") // 计算四分位差
.drop("quantiles")
val schema = describe.schema
val longForm = describe.flatMap(row => {
val metric = row.getString(0)
(1 until row.length).map(i => {
(metric, schema(i).name, row.getString(i).toDouble)
})
}) .toDF("summary", "attribute", "values")
// 列联表
val dataset: DataFrame = longForm.groupBy($"attribute")
.pivot("summary").agg(first("values"))
quantileDF.join(dataset, "attribute").show()
quantileDF.join(dataset, "attribute").show()
+-----------+---+------+---+------------------+-----+---+------------------+---+-------------------+
| attribute| q1|median| q3| IOR|count|max| mean|min| stddev|
+-----------+---+------+---+------------------+-----+---+------------------+---+-------------------+
|petalLength|1.6| 1.6|5.1|3.4999999999999996|150.0|6.9|3.7580000000000027|1.0| 1.7652982332594662|
| sepalWidth|2.8| 2.8|3.3| 0.5|150.0|4.4| 3.057333333333334|2.0|0.43586628493669793|
|sepalLength|5.1| 5.1|6.4|1.3000000000000007|150.0|7.9| 5.843333333333335|4.3| 0.8280661279778637|
| petalWidth|0.3| 0.3|1.8| 1.5|150.0|2.5| 1.199333333333334|0.1| 0.7622376689603467|
+-----------+---+------+---+------------------+-----+---+------------------+---+-------------------+
DataFrame和Dataset的转换
DataFrame和Dataset是spark中两种相近的数据结构,其可以相互转换。DataFrame转Dataset通过as[rowClass]
方法,其中rowClass一般是我们自定义的一个case class
,DataFrame转为Dataset后每一行可以认为是一个类实例对象,与Python中的namedtuple非常相似。DataFrame可以认为是Dataset的特例:DataFrame = Dataset[Row]
。Dataset转DataFrame非常简单,使用toDF()
即可。
// 生成样例DataFrame
val dataDF = spark.createDataFrame(Seq(("Brooke", 0, 20), ("Brooke", 0, 25),
("Denny", 0, 31), ("Jules", 1, 30), ("TD", 1, 5), ("TD", 1, 3),("TD", 1, 35),
("TD", 1, 35),("Jack", 2, 33),("Jack", 2, 22),("Jack", 2, 11),("Jack", 2, 10))).toDF("name","rank","income")
val db = dataDF.groupBy("name")
.agg(max("rank").as("rank"), sum("income").as("income"))
db.show(10, false)
+------+----+------+
|name |rank|income|
+------+----+------+
|Brooke|0 |45 |
|Jules |1 |30 |
|Jack |2 |76 |
|TD |1 |78 |
|Denny |0 |31 |
+------+----+------+
// DataFrame转Dataset
// 需要注意定义类的字段名称必须要与待转换的DataFrame字段名相同,大小可以不同,不然会报错
case class Staff(NAME:String, RANK:Int, INCOME:Long)
val ds = db.as[Staff] // ds: org.apache.spark.sql.Dataset[Staff] = [name: string, rank: int ... 1 more field]
ds.show(2, false)
+------+----+------+
|name |rank|income|
+------+----+------+
|Brooke|0 |45 |
|Jules |1 |30 |
+------+----+------+
// 将Dataset转为DataFrame
val df = ds.toDF()
df.show(2)
+------+----+------+
| name|rank|income|
+------+----+------+
|Brooke| 0| 45|
| Jules| 1| 30|
+------+----+------+
spark udf返回n元素数组并生成n列
def myFunc: (String => Array[String]) = { s =>
Array("s".toLowerCase, s.toUpperCase)
}
import org.apache.spark.sql.functions.udf
val myUDF = udf(myFunc)
val newDF = df.withColumn("newCol", myUDF(df("Feature2")))
.select($"Feature1", $"Feature2", $"Feature 3",
$"newCol"(0).as("Slope"), $"newCol"(1).as("Offset"))
// 结果
+--------+--------+---------+-----+------+
|Feature1|Feature2|Feature 3|Slope|Offset|
+--------+--------+---------+-----+------+
|1.3 |3.4 |4.5 |s |3.4 |
+--------+--------+---------+-----+------+
参考:https://stackoverflow.com/questions/48979440/how-to-use-udf-to-return-multiple-columns
传参UDF(类似Python闭包的效果)
// defaultValue为可传参数
val calUdf = (defaultValue: Double) => udf { (numerator: Long, denominator: Long) =>
var index = 0.0
if (denominator > 0) {
if (numerator > denominator) index = defaultValue
else index = numerator * 1.0 / denominator
}
index
}
// 调用
val df = df.withColumn("new", calUdf(1.o)($"col1", $"col2")
日期转换
df.withColumn("lastdate", to_date(from_unixtime($"lastActiveTime" / 1000)))
.select("lastActiveTime", "lastdate").show(2, false)
+--------------+----------+
|lastActiveTime|lastdate |
+--------------+----------+
|1621440000000 |2021-05-20|
|1643784539274 |2022-02-02|
+--------------+----------+
only showing top 2 rows
groupby 合并list字段
import org.apache.spark.sql.functions.{collect_list, udf}
val flatten_distinct = udf(
(xs: Seq[Seq[String]]) => xs.flatten.distinct)
df
.groupBy("category")
.agg(
flatten_distinct(collect_list("array_value_1"))
)
参考:https://stackoverflow.com/questions/39496517/spark-merge-combine-arrays-in-groupby-aggregate
对于map或struct类型字段如何拆分为多个字段?
// map类型
val ds = Seq(
(1, Map("foo" -> (1, "a"), "bar" -> (2, "b"))),
(2, Map("foo" -> (3, "c"))),
(3, Map("bar" -> (4, "d")))
).toDF("id", "alpha")
ds.show(10, false)
+---+------------------------------+
|id |alpha |
+---+------------------------------+
|1 |{foo -> {1, a}, bar -> {2, b}}|
|2 |{foo -> {3, c}} |
|3 |{bar -> {4, d}} |
+---+------------------------------+
ds.select($"id", explode($"alpha")).show(10)
+---+---+------+
| id|key| value|
+---+---+------+
| 1|foo|{1, a}|
| 1|bar|{2, b}|
| 2|foo|{3, c}|
| 3|bar|{4, d}|
+---+---+------+
// struct类型
val structureData = Seq(
Row(Row("James ","","Smith"),Row(Row("CA","Los Angles"),Row("CA","Sandiago"))),
Row(Row("Michael ","Rose",""),Row(Row("NY","New York"),Row("NJ","Newark"))),
Row(Row("Robert ","","Williams"),Row(Row("DE","Newark"),Row("CA","Las Vegas"))),
Row(Row("Maria ","Anne","Jones"),Row(Row("PA","Harrisburg"),Row("CA","Sandiago"))),
Row(Row("Jen","Mary","Brown"),Row(Row("CA","Los Angles"),Row("NJ","Newark")))
)
val structureSchema = new StructType()
.add("name",new StructType()
.add("firstname",StringType)
.add("middlename",StringType)
.add("lastname",StringType))
.add("address",new StructType()
.add("current",new StructType()
.add("state",StringType)
.add("city",StringType))
.add("previous",new StructType()
.add("state",StringType)
.add("city",StringType)))
val df = spark.createDataFrame(
spark.sparkContext.parallelize(structureData),structureSchema)
df.printSchema()
df.show(10, false)
root
|-- name: struct (nullable = true)
| |-- firstname: string (nullable = true)
| |-- middlename: string (nullable = true)
| |-- lastname: string (nullable = true)
|-- address: struct (nullable = true)
| |-- current: struct (nullable = true)
| | |-- state: string (nullable = true)
| | |-- city: string (nullable = true)
| |-- previous: struct (nullable = true)
| | |-- state: string (nullable = true)
| | |-- city: string (nullable = true)
+---------------------+----------------------------------+
|name |address |
+---------------------+----------------------------------+
|{James , , Smith} |{{CA, Los Angles}, {CA, Sandiago}}|
|{Michael , Rose, } |{{NY, New York}, {NJ, Newark}} |
|{Robert , , Williams}|{{DE, Newark}, {CA, Las Vegas}} |
|{Maria , Anne, Jones}|{{PA, Harrisburg}, {CA, Sandiago}}|
|{Jen, Mary, Brown} |{{CA, Los Angles}, {NJ, Newark}} |
+---------------------+----------------------------------+
val df2 = df.select(col("name.*"),
col("address.current.*"),
col("address.previous.*"))
val df2Flatten = df2.toDF("fname","mename","lname","currAddState",
"currAddCity","prevAddState","prevAddCity")
df2Flatten.printSchema()
df2Flatten.show(false)
root
|-- fname: string (nullable = true)
|-- mename: string (nullable = true)
|-- lname: string (nullable = true)
|-- currAddState: string (nullable = true)
|-- currAddCity: string (nullable = true)
|-- prevAddState: string (nullable = true)
|-- prevAddCity: string (nullable = true)
+--------+------+--------+------------+-----------+------------+-----------+
|fname |mename|lname |currAddState|currAddCity|prevAddState|prevAddCity|
+--------+------+--------+------------+-----------+------------+-----------+
|James | |Smith |CA |Los Angles |CA |Sandiago |
|Michael |Rose | |NY |New York |NJ |Newark |
|Robert | |Williams|DE |Newark |CA |Las Vegas |
|Maria |Anne |Jones |PA |Harrisburg |CA |Sandiago |
|Jen |Mary |Brown |CA |Los Angles |NJ |Newark |
+--------+------+--------+------------+-----------+------------+-----------+
参考:
https://sparkbyexamples.com/spark/spark-how-to-convert-struct-type-to-columns/
https://stackoverflow.com/questions/40602606/how-to-get-keys-and-values-from-maptype-column-in-sparksql-dataframe
网友评论