美文网首页
spark一些有用的小函数

spark一些有用的小函数

作者: 井底蛙蛙呱呱呱 | 来源:发表于2021-11-29 16:36 被阅读0次
获取DataFrame首行中的某一列的值
data.show(2, false)
+------+----+------+
|  name|rank|income|
+------+----+------+
|Brooke|   0|    20|
|Brooke|   0|    25|
+------+----+------+

data.first().getAs[String]("name")   // Brooke
data.first().getAs[Int]("rank")    // 0
data.first().getAs[Int]("income")   // 20

其它参考:https://stackoverflow.com/questions/33007840/spark-extracting-values-from-a-row

udf函数传入参数

用法与Python闭包类似:

def f1(x:Int) = udf((y:Int) => x+y)

val f2 = (x:Int) => udf((y:Int) => x+y)
val f3 = (bcMap: Map[String, Int]) => udf((x:String) => bcMap(x))

val bcMap = spark.sparkContext.broadcast(xMap)
# 调用
df.withColumn("f1", f1(1)(colname))
  .withColumn("f2", f2(2)(colname))
  .withColumn("f3", f3(bcMap)(colname)
如何转置spark DataFrame?

参考:https://nikhil-suthar-bigdata.medium.com/how-to-transpose-spark-dataframe-fa82c079a6b

 def TransposeDF(df: DataFrame, columns: Seq[String], pivotCol: String): DataFrame = {
    val columnsValue = columns.map(x => "'" + x + "', " + x)
    val stackCols = columnsValue.mkString(",")
    val df_1 = df.selectExpr(pivotCol, "stack(" + columns.size + "," + stackCols + ")")
      .select(pivotCol, "col0", "col1")

    val final_df = df_1.groupBy(col("col0")).pivot(pivotCol).agg(concat_ws("", collect_list(col("col1"))))
      .withColumnRenamed("col0", pivotCol)
    final_df
  }

val DataSeq = Seq(("Shirts", 10, 13, 34, 10), ("Trousers", 11, 2, 30, 20), ("Pants", 70, 43, 24, 60), ("Sweater", 101, 44, 54, 80))
import spark.implicits._
val productQtyDF = DataSeq.toDF("Products", "Small", "Medium", "Large", "ExLarge")

productQtyDF.show()
  +--------+-----+------+-----+-------+
  |Products|Small|Medium|Large|ExLarge|
  +--------+-----+------+-----+-------+
  |Shirts  |10   |13    |34   |10     |
  |Trousers|11   |2     |30   |20     |
  |Pants   |70   |43    |24   |60     |
  |Sweater |101  |44    |54   |80     |
  +--------+-----+------+-----+-------+

val productTypeDF = TransposeDF(productQtyDF, Seq("Small", "Medium", "Large", "ExLarge"), "Products")
productTypeDF.show(false)
  +--------+-----+------+-------+--------+
  |Products|Pants|Shirts|Sweater|Trousers|
  +--------+-----+------+-------+--------+
  |Medium  |43   |13    |44     |2       |
  |Small   |70   |10    |101    |11      |
  |ExLarge |60   |10    |80     |20      |
  |Large   |24   |34    |54     |30      |
  +--------+-----+------+-------+--------+
spark做描述性统计

参考:https://blog.csdn.net/k_wzzc/article/details/84038233

// 本文所使用的数据集以鸢尾花数据集为例
 summary(iris)

  Sepal.Length    Sepal.Width     Petal.Length    Petal.Width          Species  
 Min.   :4.300   Min.   :2.000   Min.   :1.000   Min.   :0.100   setosa    :50  
 1st Qu.:5.100   1st Qu.:2.800   1st Qu.:1.600   1st Qu.:0.300   versicolor:50  
 Median :5.800   Median :3.000   Median :4.350   Median :1.300   virginica :50  
 Mean   :5.843   Mean   :3.057   Mean   :3.758   Mean   :1.199                  
 3rd Qu.:6.400   3rd Qu.:3.300   3rd Qu.:5.100   3rd Qu.:1.800                  
 Max.   :7.900   Max.   :4.400   Max.   :6.900   Max.   :2.500 

// 加载鸢尾花数据集
val irisDF = spark.read
  .options(Map("header" -> "true",
    "nullValue" -> "?",
    "inferSchema" -> "true"))
  .csv(inputFile)

//第一个参数   列的名称,可以是单独的列名,也可以是列名数组
//第二个参数   分位数的值数组 。
//最后一个参数 表示错误率,数值在 0-1之间 值越小精确度越高
val summary1 = irisDF.stat.approxQuantile(Array("sepalLength", "sepalWidth"),
  Array(0.25, 0.5, 0.75), 0)

val summary2 = irisDF.stat.approxQuantile("sepalLength", Array(0.25, 0.5, 0.75), 0)
// summary2 返回的结果是Array(5.1 ,5.8 ,6.4),与R语言的结果一致,summary1返回的嵌套的array

// 以DataFrame形式返回统计结果
// 定义常用的几个数值型变量类型
val numType = Array(DoubleType, IntegerType, LongType, DecimalType, FloatType)

val tuples = irisDF.schema.map(sf => {
  (sf.name, sf.dataType)
}).filter(tp => {
  numType.contains(tp._2)
}).map(scm => {
  val quantiles = irisDF.stat.approxQuantile(scm._1, Array(0.25, 0.5, 0.75), 0)
  val col = scm._1
  (col, quantiles)
})

val quantileDF = tuples
  .toDF("attribute", "quantiles")
  .withColumn("q1", $"quantiles".getItem(0))
  .withColumn("median", $"quantiles".getItem(0))
  .withColumn("q3", $"quantiles".getItem(2))
  .withColumn("IOR", $"q3" - $"q1") // 计算四分位差
  .drop("quantiles")

val schema = describe.schema

val longForm = describe.flatMap(row => {
  val metric = row.getString(0)
  (1 until row.length).map(i => {
    (metric, schema(i).name, row.getString(i).toDouble)
  })
})    .toDF("summary", "attribute", "values")

// 列联表
val dataset: DataFrame = longForm.groupBy($"attribute")
  .pivot("summary").agg(first("values"))

quantileDF.join(dataset, "attribute").show()
quantileDF.join(dataset, "attribute").show()

+-----------+---+------+---+------------------+-----+---+------------------+---+-------------------+
|  attribute| q1|median| q3|               IOR|count|max|              mean|min|             stddev|
+-----------+---+------+---+------------------+-----+---+------------------+---+-------------------+
|petalLength|1.6|   1.6|5.1|3.4999999999999996|150.0|6.9|3.7580000000000027|1.0| 1.7652982332594662|
| sepalWidth|2.8|   2.8|3.3|               0.5|150.0|4.4| 3.057333333333334|2.0|0.43586628493669793|
|sepalLength|5.1|   5.1|6.4|1.3000000000000007|150.0|7.9| 5.843333333333335|4.3| 0.8280661279778637|
| petalWidth|0.3|   0.3|1.8|               1.5|150.0|2.5| 1.199333333333334|0.1| 0.7622376689603467|
+-----------+---+------+---+------------------+-----+---+------------------+---+-------------------+
DataFrame和Dataset的转换

DataFrame和Dataset是spark中两种相近的数据结构,其可以相互转换。DataFrame转Dataset通过as[rowClass]方法,其中rowClass一般是我们自定义的一个case class,DataFrame转为Dataset后每一行可以认为是一个类实例对象,与Python中的namedtuple非常相似。DataFrame可以认为是Dataset的特例:DataFrame = Dataset[Row]。Dataset转DataFrame非常简单,使用toDF()即可。

// 生成样例DataFrame
val dataDF = spark.createDataFrame(Seq(("Brooke", 0, 20), ("Brooke", 0, 25),
    ("Denny", 0, 31), ("Jules", 1, 30), ("TD", 1, 5), ("TD", 1, 3),("TD", 1, 35),
    ("TD", 1, 35),("Jack", 2, 33),("Jack", 2, 22),("Jack", 2, 11),("Jack", 2, 10))).toDF("name","rank","income")

val db = dataDF.groupBy("name")
    .agg(max("rank").as("rank"), sum("income").as("income"))
db.show(10, false)
+------+----+------+
|name  |rank|income|
+------+----+------+
|Brooke|0   |45    |
|Jules |1   |30    |
|Jack  |2   |76    |
|TD    |1   |78    |
|Denny |0   |31    |
+------+----+------+

// DataFrame转Dataset
// 需要注意定义类的字段名称必须要与待转换的DataFrame字段名相同,大小可以不同,不然会报错
case class Staff(NAME:String, RANK:Int, INCOME:Long)  

val ds = db.as[Staff]    // ds: org.apache.spark.sql.Dataset[Staff] = [name: string, rank: int ... 1 more field]
ds.show(2, false)
+------+----+------+
|name  |rank|income|
+------+----+------+
|Brooke|0   |45    |
|Jules |1   |30    |
+------+----+------+

// 将Dataset转为DataFrame
val df = ds.toDF()
df.show(2)
+------+----+------+
|  name|rank|income|
+------+----+------+
|Brooke|   0|    45|
| Jules|   1|    30|
+------+----+------+
spark udf返回n元素数组并生成n列
def myFunc: (String => Array[String]) = { s => 
  Array("s".toLowerCase, s.toUpperCase)
}

import org.apache.spark.sql.functions.udf
val myUDF = udf(myFunc)

val newDF = df.withColumn("newCol", myUDF(df("Feature2")))
  .select($"Feature1", $"Feature2", $"Feature 3", 
    $"newCol"(0).as("Slope"), $"newCol"(1).as("Offset"))

// 结果
+--------+--------+---------+-----+------+
|Feature1|Feature2|Feature 3|Slope|Offset|
+--------+--------+---------+-----+------+
|1.3     |3.4     |4.5      |s    |3.4   |
+--------+--------+---------+-----+------+

参考:https://stackoverflow.com/questions/48979440/how-to-use-udf-to-return-multiple-columns

传参UDF(类似Python闭包的效果)

// defaultValue为可传参数
  val calUdf = (defaultValue: Double) => udf { (numerator: Long, denominator: Long) =>
    var index = 0.0
    if (denominator > 0) {
      if (numerator > denominator) index = defaultValue
      else index = numerator * 1.0 / denominator
    }
    index
  }

// 调用
val df = df.withColumn("new", calUdf(1.o)($"col1", $"col2")

日期转换

df.withColumn("lastdate", to_date(from_unixtime($"lastActiveTime" / 1000)))
 .select("lastActiveTime", "lastdate").show(2, false)

+--------------+----------+
|lastActiveTime|lastdate  |
+--------------+----------+
|1621440000000 |2021-05-20|
|1643784539274 |2022-02-02|
+--------------+----------+
only showing top 2 rows

groupby 合并list字段

import org.apache.spark.sql.functions.{collect_list, udf}

val flatten_distinct = udf(
  (xs: Seq[Seq[String]]) => xs.flatten.distinct)

df
  .groupBy("category")
  .agg(
    flatten_distinct(collect_list("array_value_1"))
  )

参考:https://stackoverflow.com/questions/39496517/spark-merge-combine-arrays-in-groupby-aggregate

对于map或struct类型字段如何拆分为多个字段?

// map类型
val ds = Seq(
  (1, Map("foo" -> (1, "a"), "bar" -> (2, "b"))),
  (2, Map("foo" -> (3, "c"))),
  (3, Map("bar" -> (4, "d")))
).toDF("id", "alpha")
ds.show(10, false)

+---+------------------------------+
|id |alpha                         |
+---+------------------------------+
|1  |{foo -> {1, a}, bar -> {2, b}}|
|2  |{foo -> {3, c}}               |
|3  |{bar -> {4, d}}               |
+---+------------------------------+

ds.select($"id", explode($"alpha")).show(10)
+---+---+------+
| id|key| value|
+---+---+------+
|  1|foo|{1, a}|
|  1|bar|{2, b}|
|  2|foo|{3, c}|
|  3|bar|{4, d}|
+---+---+------+

// struct类型
val structureData = Seq(
    Row(Row("James ","","Smith"),Row(Row("CA","Los Angles"),Row("CA","Sandiago"))),
    Row(Row("Michael ","Rose",""),Row(Row("NY","New York"),Row("NJ","Newark"))),
    Row(Row("Robert ","","Williams"),Row(Row("DE","Newark"),Row("CA","Las Vegas"))),
    Row(Row("Maria ","Anne","Jones"),Row(Row("PA","Harrisburg"),Row("CA","Sandiago"))),
    Row(Row("Jen","Mary","Brown"),Row(Row("CA","Los Angles"),Row("NJ","Newark")))
  )

val structureSchema = new StructType()
    .add("name",new StructType()
      .add("firstname",StringType)
      .add("middlename",StringType)
      .add("lastname",StringType))
    .add("address",new StructType()
      .add("current",new StructType()
        .add("state",StringType)
        .add("city",StringType))
      .add("previous",new StructType()
        .add("state",StringType)
        .add("city",StringType)))

val df = spark.createDataFrame(
    spark.sparkContext.parallelize(structureData),structureSchema)
df.printSchema()
df.show(10, false)

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- address: struct (nullable = true)
 |    |-- current: struct (nullable = true)
 |    |    |-- state: string (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |-- previous: struct (nullable = true)
 |    |    |-- state: string (nullable = true)
 |    |    |-- city: string (nullable = true)

+---------------------+----------------------------------+
|name                 |address                           |
+---------------------+----------------------------------+
|{James , , Smith}    |{{CA, Los Angles}, {CA, Sandiago}}|
|{Michael , Rose, }   |{{NY, New York}, {NJ, Newark}}    |
|{Robert , , Williams}|{{DE, Newark}, {CA, Las Vegas}}   |
|{Maria , Anne, Jones}|{{PA, Harrisburg}, {CA, Sandiago}}|
|{Jen, Mary, Brown}   |{{CA, Los Angles}, {NJ, Newark}}  |
+---------------------+----------------------------------+

val df2 = df.select(col("name.*"),
    col("address.current.*"),
    col("address.previous.*"))
val df2Flatten = df2.toDF("fname","mename","lname","currAddState",
    "currAddCity","prevAddState","prevAddCity")
df2Flatten.printSchema()
df2Flatten.show(false)

root
 |-- fname: string (nullable = true)
 |-- mename: string (nullable = true)
 |-- lname: string (nullable = true)
 |-- currAddState: string (nullable = true)
 |-- currAddCity: string (nullable = true)
 |-- prevAddState: string (nullable = true)
 |-- prevAddCity: string (nullable = true)

+--------+------+--------+------------+-----------+------------+-----------+
|fname   |mename|lname   |currAddState|currAddCity|prevAddState|prevAddCity|
+--------+------+--------+------------+-----------+------------+-----------+
|James   |      |Smith   |CA          |Los Angles |CA          |Sandiago   |
|Michael |Rose  |        |NY          |New York   |NJ          |Newark     |
|Robert  |      |Williams|DE          |Newark     |CA          |Las Vegas  |
|Maria   |Anne  |Jones   |PA          |Harrisburg |CA          |Sandiago   |
|Jen     |Mary  |Brown   |CA          |Los Angles |NJ          |Newark     |
+--------+------+--------+------------+-----------+------------+-----------+

参考:
https://sparkbyexamples.com/spark/spark-how-to-convert-struct-type-to-columns/
https://stackoverflow.com/questions/40602606/how-to-get-keys-and-values-from-maptype-column-in-sparksql-dataframe

相关文章

网友评论

      本文标题:spark一些有用的小函数

      本文链接:https://www.haomeiwen.com/subject/tkvhtrtx.html