将RDD[Map[String,String]] 转化为展平 DataFrame,类似于pyspark 中 dict 结构toDF的效果。
input
val mapRDD: RDD[Map[String, String]] = sc.parallelize(Seq(
Map("name" -> "zhangsan", "age" -> "18", "addr" -> "bj"),
Map("name" -> "lisi", "age" -> "20", "addr" -> "hz"),
))
output
name age addr
zhangsan 18 bj
lisi 20 hz
1. Map中元素固定
每个 Map 只有三个元素的情况下
val columns=mapRDD.take(1).flatMap(_.keys)
val resultantDF=mapRDD.filter(_.nonEmpty).map{m=>
val seq=m.values.toSeq
(seq(0),seq(1),seq(2))
}.toDF(columns:_*)
resultantDF.show()
2. Map中元素不固定
RDD[Map[String,String]] -> RDD[Row] -> DataFrame
def map2DF(spark: SparkSession, rdd: RDD[Map[String, String]]): DataFrame = {
val cols = rdd.take(1).flatMap(_.keys)
val resRDD = rdd.filter(_.nonEmpty).map { m =>
val seq = m.values.toSeq
Row.fromSeq(seq)
}
val fields = cols.map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
spark.createDataFrame(resRDD, schema)
}
网友评论