美文网首页spark
[SparkSQL] 列转行lateral view explo

[SparkSQL] 列转行lateral view explo

作者: 林沐之森 | 来源:发表于2019-08-10 17:31 被阅读0次

    我用了两种方法实现列转行,说一下我平时使用SparkSQL的习惯,我通常
    1)是先读取HDFS中的文件,包括txt、parquet等格式
    2)然后通过createOrReplaceTempView 方法创建临时表格
    3)之后写sql直接处理
    相关代码可以参考我写的另一篇文章:

    一、开发环境

    spark-2.1.0-bin-hadoop2.6

    二、纯SQL实现列转行

    1、因为我在项目中的需求是同时将两个Array字段展开,所以在这里举的例子是同时处理两个字段的情况。
    2、要提醒的是,同时展开多个字段会造成数据膨胀严重,计算时会消耗大量的资源,需要根据实际情况进行考虑,选择比较合适的方法。
    3、语法:使用lateral view explode 函数进行列转行,新字段的别名,不能和以前的字段名相同。

    sparkSession.sql(
      s"""
        |select
        |    user_id,
        |    hobby,
        |    skill
        |from t1
        |lateral view explode(hobbies) tempcol as hobby
        |lateral view explode(skills) tempcol as skill
      """.stripMargin
      )
    

    三、使用DataFrame、Rdd格式数据,通过FlatMap算子实现列转行

    /**
      * 自定义函数,在flatMap算子中调用,将Array类型数据遍历,并合其他字段进行拼接,返回Seq序列
      * 同时实现两列的列转行
      */
    def splitCate(user_id: String,hobbies: Seq[String], skills: Seq[String]):Seq[Row] = {
      val hobbieList = hobbies.toString().split(",")
      val skillList = skills.toString().split(",")
      var resSeq = Seq[Row]()
      
      //如果不为空null或是空,进行遍历,拼接Row类型的序列。由于同时处理两个字段的列转行,
      //所以在判断一个字段是否为空或为null的同时,嵌套判断另一个字段是否为空或为null
      if(hobbieList!=null || !hobbieList.isEmpty){
        for (h <- hobbieList) {
          if(skillList != null || !skillList.isEmpty){
            for(s <- skillList){
              resSeq = resSeq :+ Row(user_id,  h, s)
            }
          }else{
            resSeq = resSeq :+ Row(user_id, h,null)
          }
        }
      }else{
          if(skillList != null || !skillList.isEmpty){
            for(s <- skillList){
              resSeq = resSeq :+ Row(user_id, null,s)
            }
          }else{
            resSeq = resSeq :+ Row(user_id,null,null)
          }
      }
      resSeq
    }
    
    //getAs[]() 中数据类型必须和获取到的字段的数据类型保持一致
    val flatMapRdd = leftJoinDF.rdd.repartition(200).flatMap(row => {
          val user_id = row.getAs[String]("user_id")
          val hobbies = row.getAs[Seq[String]]("cate")
          val skills = row.getAs[Seq[String]]("sub_cate")
        
          splitCate(user_id, hobbies,skills)
        })
        
        //创建schema,用于列转行之后的Rdd转化为DataFrame
        val schema = StructType(List(
          StructField("user_id", StringType, nullable = false),
          StructField("hobby", StringType, nullable = true),
          StructField("skill", StringType, nullable = true),
        )
        )
        val flatMapDF = sparkSession.createDataFrame(flatMapRdd, schema)
    

    相关文章

      网友评论

        本文标题:[SparkSQL] 列转行lateral view explo

        本文链接:https://www.haomeiwen.com/subject/saysjctx.html