美文网首页
spark convert RDD[Map] to DataFr

spark convert RDD[Map] to DataFr

作者: breeze_lsw | 来源:发表于2017-10-12 00:17 被阅读642次

    将RDD[Map[String,String]] 转化为展平 DataFrame,类似于pyspark 中 dict 结构toDF的效果。

    input

    val mapRDD: RDD[Map[String, String]] = sc.parallelize(Seq(
       Map("name" -> "zhangsan", "age" -> "18", "addr" -> "bj"),
       Map("name" -> "lisi", "age" -> "20", "addr" -> "hz"),
    ))
    

    output

    name     age addr
    zhangsan 18  bj
    lisi     20  hz
    

    1. Map中元素固定

    每个 Map 只有三个元素的情况下

    val columns=mapRDD.take(1).flatMap(_.keys)
    
    val resultantDF=mapRDD.filter(_.nonEmpty).map{m=>
          val seq=m.values.toSeq
          (seq(0),seq(1),seq(2))
          }.toDF(columns:_*)
    
    resultantDF.show()
    

    2. Map中元素不固定
    RDD[Map[String,String]] -> RDD[Row] -> DataFrame

      def map2DF(spark: SparkSession, rdd: RDD[Map[String, String]]): DataFrame = {
        val cols = rdd.take(1).flatMap(_.keys)
        val resRDD = rdd.filter(_.nonEmpty).map { m =>
          val seq = m.values.toSeq
          Row.fromSeq(seq)
        }
    
        val fields = cols.map(fieldName => StructField(fieldName, StringType, nullable = true))
        val schema = StructType(fields)
    
        spark.createDataFrame(resRDD, schema)
      }
    

    相关文章

      网友评论

          本文标题:spark convert RDD[Map] to DataFr

          本文链接:https://www.haomeiwen.com/subject/lragyxtx.html