美文网首页
spark一些有用的小函数

spark一些有用的小函数

作者: 井底蛙蛙呱呱呱 | 来源:发表于2021-11-29 16:36 被阅读0次
    获取DataFrame首行中的某一列的值
    data.show(2, false)
    +------+----+------+
    |  name|rank|income|
    +------+----+------+
    |Brooke|   0|    20|
    |Brooke|   0|    25|
    +------+----+------+
    
    data.first().getAs[String]("name")   // Brooke
    data.first().getAs[Int]("rank")    // 0
    data.first().getAs[Int]("income")   // 20
    

    其它参考:https://stackoverflow.com/questions/33007840/spark-extracting-values-from-a-row

    udf函数传入参数

    用法与Python闭包类似:

    def f1(x:Int) = udf((y:Int) => x+y)
    
    val f2 = (x:Int) => udf((y:Int) => x+y)
    val f3 = (bcMap: Map[String, Int]) => udf((x:String) => bcMap(x))
    
    val bcMap = spark.sparkContext.broadcast(xMap)
    # 调用
    df.withColumn("f1", f1(1)(colname))
      .withColumn("f2", f2(2)(colname))
      .withColumn("f3", f3(bcMap)(colname)
    
    如何转置spark DataFrame?

    参考:https://nikhil-suthar-bigdata.medium.com/how-to-transpose-spark-dataframe-fa82c079a6b

     def TransposeDF(df: DataFrame, columns: Seq[String], pivotCol: String): DataFrame = {
        val columnsValue = columns.map(x => "'" + x + "', " + x)
        val stackCols = columnsValue.mkString(",")
        val df_1 = df.selectExpr(pivotCol, "stack(" + columns.size + "," + stackCols + ")")
          .select(pivotCol, "col0", "col1")
    
        val final_df = df_1.groupBy(col("col0")).pivot(pivotCol).agg(concat_ws("", collect_list(col("col1"))))
          .withColumnRenamed("col0", pivotCol)
        final_df
      }
    
    val DataSeq = Seq(("Shirts", 10, 13, 34, 10), ("Trousers", 11, 2, 30, 20), ("Pants", 70, 43, 24, 60), ("Sweater", 101, 44, 54, 80))
    import spark.implicits._
    val productQtyDF = DataSeq.toDF("Products", "Small", "Medium", "Large", "ExLarge")
    
    productQtyDF.show()
      +--------+-----+------+-----+-------+
      |Products|Small|Medium|Large|ExLarge|
      +--------+-----+------+-----+-------+
      |Shirts  |10   |13    |34   |10     |
      |Trousers|11   |2     |30   |20     |
      |Pants   |70   |43    |24   |60     |
      |Sweater |101  |44    |54   |80     |
      +--------+-----+------+-----+-------+
    
    val productTypeDF = TransposeDF(productQtyDF, Seq("Small", "Medium", "Large", "ExLarge"), "Products")
    productTypeDF.show(false)
      +--------+-----+------+-------+--------+
      |Products|Pants|Shirts|Sweater|Trousers|
      +--------+-----+------+-------+--------+
      |Medium  |43   |13    |44     |2       |
      |Small   |70   |10    |101    |11      |
      |ExLarge |60   |10    |80     |20      |
      |Large   |24   |34    |54     |30      |
      +--------+-----+------+-------+--------+
    
    spark做描述性统计

    参考:https://blog.csdn.net/k_wzzc/article/details/84038233

    // 本文所使用的数据集以鸢尾花数据集为例
     summary(iris)
    
      Sepal.Length    Sepal.Width     Petal.Length    Petal.Width          Species  
     Min.   :4.300   Min.   :2.000   Min.   :1.000   Min.   :0.100   setosa    :50  
     1st Qu.:5.100   1st Qu.:2.800   1st Qu.:1.600   1st Qu.:0.300   versicolor:50  
     Median :5.800   Median :3.000   Median :4.350   Median :1.300   virginica :50  
     Mean   :5.843   Mean   :3.057   Mean   :3.758   Mean   :1.199                  
     3rd Qu.:6.400   3rd Qu.:3.300   3rd Qu.:5.100   3rd Qu.:1.800                  
     Max.   :7.900   Max.   :4.400   Max.   :6.900   Max.   :2.500 
    
    // 加载鸢尾花数据集
    val irisDF = spark.read
      .options(Map("header" -> "true",
        "nullValue" -> "?",
        "inferSchema" -> "true"))
      .csv(inputFile)
    
    //第一个参数   列的名称,可以是单独的列名,也可以是列名数组
    //第二个参数   分位数的值数组 。
    //最后一个参数 表示错误率,数值在 0-1之间 值越小精确度越高
    val summary1 = irisDF.stat.approxQuantile(Array("sepalLength", "sepalWidth"),
      Array(0.25, 0.5, 0.75), 0)
    
    val summary2 = irisDF.stat.approxQuantile("sepalLength", Array(0.25, 0.5, 0.75), 0)
    // summary2 返回的结果是Array(5.1 ,5.8 ,6.4),与R语言的结果一致,summary1返回的嵌套的array
    
    // 以DataFrame形式返回统计结果
    // 定义常用的几个数值型变量类型
    val numType = Array(DoubleType, IntegerType, LongType, DecimalType, FloatType)
    
    val tuples = irisDF.schema.map(sf => {
      (sf.name, sf.dataType)
    }).filter(tp => {
      numType.contains(tp._2)
    }).map(scm => {
      val quantiles = irisDF.stat.approxQuantile(scm._1, Array(0.25, 0.5, 0.75), 0)
      val col = scm._1
      (col, quantiles)
    })
    
    val quantileDF = tuples
      .toDF("attribute", "quantiles")
      .withColumn("q1", $"quantiles".getItem(0))
      .withColumn("median", $"quantiles".getItem(0))
      .withColumn("q3", $"quantiles".getItem(2))
      .withColumn("IOR", $"q3" - $"q1") // 计算四分位差
      .drop("quantiles")
    
    val schema = describe.schema
    
    val longForm = describe.flatMap(row => {
      val metric = row.getString(0)
      (1 until row.length).map(i => {
        (metric, schema(i).name, row.getString(i).toDouble)
      })
    })    .toDF("summary", "attribute", "values")
    
    // 列联表
    val dataset: DataFrame = longForm.groupBy($"attribute")
      .pivot("summary").agg(first("values"))
    
    quantileDF.join(dataset, "attribute").show()
    quantileDF.join(dataset, "attribute").show()
    
    +-----------+---+------+---+------------------+-----+---+------------------+---+-------------------+
    |  attribute| q1|median| q3|               IOR|count|max|              mean|min|             stddev|
    +-----------+---+------+---+------------------+-----+---+------------------+---+-------------------+
    |petalLength|1.6|   1.6|5.1|3.4999999999999996|150.0|6.9|3.7580000000000027|1.0| 1.7652982332594662|
    | sepalWidth|2.8|   2.8|3.3|               0.5|150.0|4.4| 3.057333333333334|2.0|0.43586628493669793|
    |sepalLength|5.1|   5.1|6.4|1.3000000000000007|150.0|7.9| 5.843333333333335|4.3| 0.8280661279778637|
    | petalWidth|0.3|   0.3|1.8|               1.5|150.0|2.5| 1.199333333333334|0.1| 0.7622376689603467|
    +-----------+---+------+---+------------------+-----+---+------------------+---+-------------------+
    
    DataFrame和Dataset的转换

    DataFrame和Dataset是spark中两种相近的数据结构,其可以相互转换。DataFrame转Dataset通过as[rowClass]方法,其中rowClass一般是我们自定义的一个case class,DataFrame转为Dataset后每一行可以认为是一个类实例对象,与Python中的namedtuple非常相似。DataFrame可以认为是Dataset的特例:DataFrame = Dataset[Row]。Dataset转DataFrame非常简单,使用toDF()即可。

    // 生成样例DataFrame
    val dataDF = spark.createDataFrame(Seq(("Brooke", 0, 20), ("Brooke", 0, 25),
        ("Denny", 0, 31), ("Jules", 1, 30), ("TD", 1, 5), ("TD", 1, 3),("TD", 1, 35),
        ("TD", 1, 35),("Jack", 2, 33),("Jack", 2, 22),("Jack", 2, 11),("Jack", 2, 10))).toDF("name","rank","income")
    
    val db = dataDF.groupBy("name")
        .agg(max("rank").as("rank"), sum("income").as("income"))
    db.show(10, false)
    +------+----+------+
    |name  |rank|income|
    +------+----+------+
    |Brooke|0   |45    |
    |Jules |1   |30    |
    |Jack  |2   |76    |
    |TD    |1   |78    |
    |Denny |0   |31    |
    +------+----+------+
    
    // DataFrame转Dataset
    // 需要注意定义类的字段名称必须要与待转换的DataFrame字段名相同,大小可以不同,不然会报错
    case class Staff(NAME:String, RANK:Int, INCOME:Long)  
    
    val ds = db.as[Staff]    // ds: org.apache.spark.sql.Dataset[Staff] = [name: string, rank: int ... 1 more field]
    ds.show(2, false)
    +------+----+------+
    |name  |rank|income|
    +------+----+------+
    |Brooke|0   |45    |
    |Jules |1   |30    |
    +------+----+------+
    
    // 将Dataset转为DataFrame
    val df = ds.toDF()
    df.show(2)
    +------+----+------+
    |  name|rank|income|
    +------+----+------+
    |Brooke|   0|    45|
    | Jules|   1|    30|
    +------+----+------+
    
    spark udf返回n元素数组并生成n列
    def myFunc: (String => Array[String]) = { s => 
      Array("s".toLowerCase, s.toUpperCase)
    }
    
    import org.apache.spark.sql.functions.udf
    val myUDF = udf(myFunc)
    
    val newDF = df.withColumn("newCol", myUDF(df("Feature2")))
      .select($"Feature1", $"Feature2", $"Feature 3", 
        $"newCol"(0).as("Slope"), $"newCol"(1).as("Offset"))
    
    // 结果
    +--------+--------+---------+-----+------+
    |Feature1|Feature2|Feature 3|Slope|Offset|
    +--------+--------+---------+-----+------+
    |1.3     |3.4     |4.5      |s    |3.4   |
    +--------+--------+---------+-----+------+
    

    参考:https://stackoverflow.com/questions/48979440/how-to-use-udf-to-return-multiple-columns

    传参UDF(类似Python闭包的效果)

    // defaultValue为可传参数
      val calUdf = (defaultValue: Double) => udf { (numerator: Long, denominator: Long) =>
        var index = 0.0
        if (denominator > 0) {
          if (numerator > denominator) index = defaultValue
          else index = numerator * 1.0 / denominator
        }
        index
      }
    
    // 调用
    val df = df.withColumn("new", calUdf(1.o)($"col1", $"col2")
    
    

    日期转换

    df.withColumn("lastdate", to_date(from_unixtime($"lastActiveTime" / 1000)))
     .select("lastActiveTime", "lastdate").show(2, false)
    
    +--------------+----------+
    |lastActiveTime|lastdate  |
    +--------------+----------+
    |1621440000000 |2021-05-20|
    |1643784539274 |2022-02-02|
    +--------------+----------+
    only showing top 2 rows
    

    groupby 合并list字段

    import org.apache.spark.sql.functions.{collect_list, udf}
    
    val flatten_distinct = udf(
      (xs: Seq[Seq[String]]) => xs.flatten.distinct)
    
    df
      .groupBy("category")
      .agg(
        flatten_distinct(collect_list("array_value_1"))
      )
    

    参考:https://stackoverflow.com/questions/39496517/spark-merge-combine-arrays-in-groupby-aggregate

    对于map或struct类型字段如何拆分为多个字段?

    // map类型
    val ds = Seq(
      (1, Map("foo" -> (1, "a"), "bar" -> (2, "b"))),
      (2, Map("foo" -> (3, "c"))),
      (3, Map("bar" -> (4, "d")))
    ).toDF("id", "alpha")
    ds.show(10, false)
    
    +---+------------------------------+
    |id |alpha                         |
    +---+------------------------------+
    |1  |{foo -> {1, a}, bar -> {2, b}}|
    |2  |{foo -> {3, c}}               |
    |3  |{bar -> {4, d}}               |
    +---+------------------------------+
    
    ds.select($"id", explode($"alpha")).show(10)
    +---+---+------+
    | id|key| value|
    +---+---+------+
    |  1|foo|{1, a}|
    |  1|bar|{2, b}|
    |  2|foo|{3, c}|
    |  3|bar|{4, d}|
    +---+---+------+
    
    // struct类型
    val structureData = Seq(
        Row(Row("James ","","Smith"),Row(Row("CA","Los Angles"),Row("CA","Sandiago"))),
        Row(Row("Michael ","Rose",""),Row(Row("NY","New York"),Row("NJ","Newark"))),
        Row(Row("Robert ","","Williams"),Row(Row("DE","Newark"),Row("CA","Las Vegas"))),
        Row(Row("Maria ","Anne","Jones"),Row(Row("PA","Harrisburg"),Row("CA","Sandiago"))),
        Row(Row("Jen","Mary","Brown"),Row(Row("CA","Los Angles"),Row("NJ","Newark")))
      )
    
    val structureSchema = new StructType()
        .add("name",new StructType()
          .add("firstname",StringType)
          .add("middlename",StringType)
          .add("lastname",StringType))
        .add("address",new StructType()
          .add("current",new StructType()
            .add("state",StringType)
            .add("city",StringType))
          .add("previous",new StructType()
            .add("state",StringType)
            .add("city",StringType)))
    
    val df = spark.createDataFrame(
        spark.sparkContext.parallelize(structureData),structureSchema)
    df.printSchema()
    df.show(10, false)
    
    root
     |-- name: struct (nullable = true)
     |    |-- firstname: string (nullable = true)
     |    |-- middlename: string (nullable = true)
     |    |-- lastname: string (nullable = true)
     |-- address: struct (nullable = true)
     |    |-- current: struct (nullable = true)
     |    |    |-- state: string (nullable = true)
     |    |    |-- city: string (nullable = true)
     |    |-- previous: struct (nullable = true)
     |    |    |-- state: string (nullable = true)
     |    |    |-- city: string (nullable = true)
    
    +---------------------+----------------------------------+
    |name                 |address                           |
    +---------------------+----------------------------------+
    |{James , , Smith}    |{{CA, Los Angles}, {CA, Sandiago}}|
    |{Michael , Rose, }   |{{NY, New York}, {NJ, Newark}}    |
    |{Robert , , Williams}|{{DE, Newark}, {CA, Las Vegas}}   |
    |{Maria , Anne, Jones}|{{PA, Harrisburg}, {CA, Sandiago}}|
    |{Jen, Mary, Brown}   |{{CA, Los Angles}, {NJ, Newark}}  |
    +---------------------+----------------------------------+
    
    val df2 = df.select(col("name.*"),
        col("address.current.*"),
        col("address.previous.*"))
    val df2Flatten = df2.toDF("fname","mename","lname","currAddState",
        "currAddCity","prevAddState","prevAddCity")
    df2Flatten.printSchema()
    df2Flatten.show(false)
    
    root
     |-- fname: string (nullable = true)
     |-- mename: string (nullable = true)
     |-- lname: string (nullable = true)
     |-- currAddState: string (nullable = true)
     |-- currAddCity: string (nullable = true)
     |-- prevAddState: string (nullable = true)
     |-- prevAddCity: string (nullable = true)
    
    +--------+------+--------+------------+-----------+------------+-----------+
    |fname   |mename|lname   |currAddState|currAddCity|prevAddState|prevAddCity|
    +--------+------+--------+------------+-----------+------------+-----------+
    |James   |      |Smith   |CA          |Los Angles |CA          |Sandiago   |
    |Michael |Rose  |        |NY          |New York   |NJ          |Newark     |
    |Robert  |      |Williams|DE          |Newark     |CA          |Las Vegas  |
    |Maria   |Anne  |Jones   |PA          |Harrisburg |CA          |Sandiago   |
    |Jen     |Mary  |Brown   |CA          |Los Angles |NJ          |Newark     |
    +--------+------+--------+------------+-----------+------------+-----------+
    

    参考:
    https://sparkbyexamples.com/spark/spark-how-to-convert-struct-type-to-columns/
    https://stackoverflow.com/questions/40602606/how-to-get-keys-and-values-from-maptype-column-in-sparksql-dataframe

    相关文章

      网友评论

          本文标题:spark一些有用的小函数

          本文链接:https://www.haomeiwen.com/subject/tkvhtrtx.html