我用了两种方法实现列转行,说一下我平时使用SparkSQL的习惯,我通常
1)是先读取HDFS中的文件,包括txt、parquet等格式
2)然后通过createOrReplaceTempView 方法创建临时表格
3)之后写sql直接处理
相关代码可以参考我写的另一篇文章:
一、开发环境
spark-2.1.0-bin-hadoop2.6
二、纯SQL实现列转行
1、因为我在项目中的需求是同时将两个Array字段展开,所以在这里举的例子是同时处理两个字段的情况。
2、要提醒的是,同时展开多个字段会造成数据膨胀严重,计算时会消耗大量的资源,需要根据实际情况进行考虑,选择比较合适的方法。
3、语法:使用lateral view explode 函数进行列转行,新字段的别名,不能和以前的字段名相同。
sparkSession.sql(
s"""
|select
| user_id,
| hobby,
| skill
|from t1
|lateral view explode(hobbies) tempcol as hobby
|lateral view explode(skills) tempcol as skill
""".stripMargin
)
三、使用DataFrame、Rdd格式数据,通过FlatMap算子实现列转行
/**
* 自定义函数,在flatMap算子中调用,将Array类型数据遍历,并合其他字段进行拼接,返回Seq序列
* 同时实现两列的列转行
*/
def splitCate(user_id: String,hobbies: Seq[String], skills: Seq[String]):Seq[Row] = {
val hobbieList = hobbies.toString().split(",")
val skillList = skills.toString().split(",")
var resSeq = Seq[Row]()
//如果不为空null或是空,进行遍历,拼接Row类型的序列。由于同时处理两个字段的列转行,
//所以在判断一个字段是否为空或为null的同时,嵌套判断另一个字段是否为空或为null
if(hobbieList!=null || !hobbieList.isEmpty){
for (h <- hobbieList) {
if(skillList != null || !skillList.isEmpty){
for(s <- skillList){
resSeq = resSeq :+ Row(user_id, h, s)
}
}else{
resSeq = resSeq :+ Row(user_id, h,null)
}
}
}else{
if(skillList != null || !skillList.isEmpty){
for(s <- skillList){
resSeq = resSeq :+ Row(user_id, null,s)
}
}else{
resSeq = resSeq :+ Row(user_id,null,null)
}
}
resSeq
}
//getAs[]() 中数据类型必须和获取到的字段的数据类型保持一致
val flatMapRdd = leftJoinDF.rdd.repartition(200).flatMap(row => {
val user_id = row.getAs[String]("user_id")
val hobbies = row.getAs[Seq[String]]("cate")
val skills = row.getAs[Seq[String]]("sub_cate")
splitCate(user_id, hobbies,skills)
})
//创建schema,用于列转行之后的Rdd转化为DataFrame
val schema = StructType(List(
StructField("user_id", StringType, nullable = false),
StructField("hobby", StringType, nullable = true),
StructField("skill", StringType, nullable = true),
)
)
val flatMapDF = sparkSession.createDataFrame(flatMapRdd, schema)
网友评论