美文网首页
[Hive] 两个‘不常用’的函数posexplode和lag

[Hive] 两个‘不常用’的函数posexplode和lag

作者: LZhan | 来源:发表于2019-08-09 17:21 被阅读0次

    昨天看到了大神 文哥的学习日记 的最新一章博客,分享的是hive相关的知识。正好最近自己也在复盘hive,所以特地学习并实践了一下博客的内容。
    文哥分享的是关于hive sql的四道面试题,其实主要是围绕hive sql的两个函数:posexplodelag/lead
    说实话,我之前并不知道这两个函数,更别说在实际工作中应用了,相关的也就是用过explode函数;不过在看了这两个函数的使用场景后,发现还是很值得一学的。

    1.posexplode

    在说明posexplode之前,先了解下 Lateral View的用法:
    Lateral View与用户自定义生成函数即UDTF(如explode()或者split()等)结合使用。
    (UDTF:为每一个输入行生成0个或者多个输出行)
    Lateral View将UDTF应用于基础表的每一行,然后将输出行连接到输入行,以形成具有所提供的表别名的虚拟表。
    基本用法:
    lateral view:LATERAL VIEW udtf(expression) tableAlias AS columnAlias

    案例数据准备:

     val spark = SparkSession.builder()
          .master("local[2]")
          .appName("hive context")
          .getOrCreate()
    
        val hiveContext = new HiveContext(spark.sparkContext)
    
        //指定schema
        val schema = types.StructType(Seq(
          StructField("id", StringType, true),
          StructField("time", StringType, true)
        ))
        //创建案例数据
        val dataRdd = spark.sparkContext.parallelize(Array("a,b,c,d;2:00,3:00,4:00,5:00", "f,b,c,d;1:10,2:20,3:30,4:40")).map(line => line.split(";"))
        val rowRdd = dataRdd.map(p => Row(p(0).trim, p(1).trim))
    
        //创建DataFrame
        val data = hiveContext.createDataFrame(rowRdd, schema)
        data.registerTempTable("tempTable")
        data.show(false)
    

    结果显示:
    +-------+-------------------+
    |id |time |
    +-------+-------------------+
    |a,b,c,d|2:00,3:00,4:00,5:00|
    |f,b,c,d|1:10,2:20,3:30,4:40|
    +-------+-------------------+

    最后要展示的数据样式为:


    image.png

    1.1 演示下explode用法

       //不使用 Lateral View
       val sql1 =
        s"""
           |SELECT
           |  id,
           |  time,
           |  explode(split(time,',')) as single_time
           |FROM tempTable
           """.stripMargin
    
        hiveContext.sql(sql1).show(false)
      
      //2.使用explode和lateral view,效果与1一样
        val sql2 =
          s"""
             |SELECT
             |  id,
             |  time,
             |  single_time
             |FROM tempTable
             |lateral view explode(split(time,',')) as single_time
           """.stripMargin
    
        hiveContext.sql(sql2).show(false)
    
    

    结果显示:
    +-------+-------------------+-----------+
    |id |time |single_time|
    +-------+-------------------+-----------+
    |a,b,c,d|2:00,3:00,4:00,5:00|2:00 |
    |a,b,c,d|2:00,3:00,4:00,5:00|3:00 |
    |a,b,c,d|2:00,3:00,4:00,5:00|4:00 |
    |a,b,c,d|2:00,3:00,4:00,5:00|5:00 |
    |f,b,c,d|1:10,2:20,3:30,4:40|1:10 |
    |f,b,c,d|1:10,2:20,3:30,4:40|2:20 |
    |f,b,c,d|1:10,2:20,3:30,4:40|3:30 |
    |f,b,c,d|1:10,2:20,3:30,4:40|4:40 |
    +-------+-------------------+-----------+

    将id列也explode之后,结果显示为:

    +-------+-------------------+---------+-----------+
    |id |time |single_id|single_time|
    +-------+-------------------+---------+-----------+
    |a,b,c,d|2:00,3:00,4:00,5:00|a |2:00 |
    |a,b,c,d|2:00,3:00,4:00,5:00|a |3:00 |
    |a,b,c,d|2:00,3:00,4:00,5:00|a |4:00 |
    |a,b,c,d|2:00,3:00,4:00,5:00|a |5:00 |
    |a,b,c,d|2:00,3:00,4:00,5:00|b |2:00 |
    |a,b,c,d|2:00,3:00,4:00,5:00|b |3:00 |
    |a,b,c,d|2:00,3:00,4:00,5:00|b |4:00 |
    |a,b,c,d|2:00,3:00,4:00,5:00|b |5:00 |
    |a,b,c,d|2:00,3:00,4:00,5:00|c |2:00 |
    |a,b,c,d|2:00,3:00,4:00,5:00|c |3:00 |
    |a,b,c,d|2:00,3:00,4:00,5:00|c |4:00 |
    |a,b,c,d|2:00,3:00,4:00,5:00|c |5:00 |
    |a,b,c,d|2:00,3:00,4:00,5:00|d |2:00 |
    |a,b,c,d|2:00,3:00,4:00,5:00|d |3:00 |
    |a,b,c,d|2:00,3:00,4:00,5:00|d |4:00 |
    |a,b,c,d|2:00,3:00,4:00,5:00|d |5:00 |
    |f,b,c,d|1:10,2:20,3:30,4:40|f |1:10 |
    |f,b,c,d|1:10,2:20,3:30,4:40|f |2:20 |
    |f,b,c,d|1:10,2:20,3:30,4:40|f |3:30 |
    |f,b,c,d|1:10,2:20,3:30,4:40|f |4:40 |
    +-------+-------------------+---------+-----------+

    1.2 使用posexplode函数
    posexplode相比在explode之上,将一列数据转为多行之后,还会输出数据的下标。
    示例:

        val sql4 =
          s"""
             |SELECT
             |  id,
             |  time,
             |  single_id,
             |  single_id_index
             |FROM tempTable
             |lateral view posexplode(split(id,',')) t as single_id_index,single_id
         """.stripMargin
    
        hiveContext.sql(sql4).show(false)
    

    会发现多了1列 single_id_index
    结果显示:
    +-------+-------------------+---------+---------------+
    |id |time |single_id|single_id_index|
    +-------+-------------------+---------+---------------+
    |a,b,c,d|2:00,3:00,4:00,5:00|a |0 |
    |a,b,c,d|2:00,3:00,4:00,5:00|b |1 |
    |a,b,c,d|2:00,3:00,4:00,5:00|c |2 |
    |a,b,c,d|2:00,3:00,4:00,5:00|d |3 |
    |f,b,c,d|1:10,2:20,3:30,4:40|f |0 |
    |f,b,c,d|1:10,2:20,3:30,4:40|b |1 |
    |f,b,c,d|1:10,2:20,3:30,4:40|c |2 |
    |f,b,c,d|1:10,2:20,3:30,4:40|d |3 |
    +-------+-------------------+---------+---------------+

    1.3 在此基础上,可以实现最终效果,只要选取id的下标与time的下标一致的记录

       val sql5 =
          s"""
             |SELECT
             |  single_id,
             |  single_time
             |FROM tempTable
             | lateral view posexplode(split(id,',')) as single_id_index,single_id
             | lateral view posexplode(split(time,',')) as single_time_index,single_time
             |WHERE
             |  single_id_index=single_time_index
           """.stripMargin
    
        hiveContext.sql(sql5).show(false)
    

    结果显示:
    +---------+-----------+
    |single_id|single_time|
    +---------+-----------+
    |a |2:00 |
    |b |3:00 |
    |c |4:00 |
    |d |5:00 |
    |f |1:10 |
    |b |2:20 |
    |c |3:30 |
    |d |4:40 |
    +---------+-----------+

    2.posexplode的应用2

    应用场景:
    对于记录1:00001,输出1对应的下标5
    对于记录2:0101,输出1对应的下标2,4
    完整代码显示:

    object PosexplodeDemo2 {
      def main(args: Array[String]): Unit = {
    
        val spark = SparkSession.builder()
          .master("local[2]")
          .appName("hive context")
          .getOrCreate()
    
        val hiveContext = new HiveContext(spark.sparkContext)
    
        //指定schema
        val schema = types.StructType(Seq(
          StructField("id", IntegerType, true),
          StructField("value", StringType, true)
        ))
        //创建案例数据
        val dataRdd = spark.sparkContext.parallelize(Array("1,1011", "2,0101","3,1111","4,00001")).map(line => line.split(","))
        val rowRdd = dataRdd.map(p => Row(p(0).toInt, p(1).trim))
    
        //创建DataFrame
        val data = hiveContext.createDataFrame(rowRdd, schema)
        data.registerTempTable("tempTable")
        data.show(false)
    
        /**
          * +---+-----+------------+------------------+
          * |id |value|single_value|single_value_index|
          * +---+-----+------------+------------------+
          * |1  |1011 |1           |0                 |
          * |1  |1011 |1           |2                 |
          * |1  |1011 |1           |3                 |
          * |2  |0101 |1           |1                 |
          * |2  |0101 |1           |3                 |
          * |3  |1111 |1           |0                 |
          * |3  |1111 |1           |1                 |
          * |3  |1111 |1           |2                 |
          * |3  |1111 |1           |3                 |
          * |4  |00001|1           |4                 |
          * +---+-----+------------+------------------+
          */
        val sql=
          s"""
             |SELECT
             |  id,
             |  value,
             |  single_value,
             |  single_value_index
             |FROM tempTable
             | lateral view posexplode(split(value,'')) as single_value_index,single_value
             |WHERE single_value='1'
           """.stripMargin
        hiveContext.sql(sql).show(false)
    
    
        /**
          * +---+-----+-------+
          * |id |value|indices|
          * +---+-----+-------+
          * |4  |00001|5      |
          * |2  |0101 |2,4    |
          * |1  |1011 |1,3,4  |
          * |3  |1111 |1,2,3,4|
          * +---+-----+-------+
          */
        val sql1=
          s"""
             |SELECT
             |  id,
             |  value,
             |  concat_ws(',',collect_list(single_value_index)) as indices
             |FROM
             |(
             |  SELECT
             |    id,
             |    value,
             |    single_value,
             |    cast(single_value_index+1 as string) as single_value_index
             |  FROM tempTable
             |   lateral view posexplode(split(value,'')) as single_value_index,single_value
             |  WHERE single_value='1'
             |)
             |GROUP BY id,value
           """.stripMargin
    
        hiveContext.sql(sql1).show(false)
    
      }
    }
    
    

    3.lag和lead函数

    lag和lead是在实现分组排序的基础上,能够获取到排序在当前记录前几位或后几位的记录的某个字段值。
    基础语法:
    lag(字段名,N) over(partition by 分组字段 order by 排序字段 排序方式)
    lead(字段名,N) over(partition by 分组字段 order by 排序字段 排序方式)

    lag括号里的参数:字段名和数量N 含义是获取分组排序后比该条记录序号小N的对应记录的指定字段的值
    如果字段名为ts,N为1,就是取分组排序之后上一条记录的ts值

    lead括号里的参数:字段名和数量N 含义是获取分组排序后比该条记录序号大N的对应记录的指定字段的值
    如果字段名为ts,N为1,就是取分组排序之后下一条记录的ts值

    如果没有前一行或者后一行,对应的字段值为null

    应用场景:统计截至目前季度的平均值(按照time排序,并计算平均值)
    * +----+----+-----+
    * |id |time|score|
    * +----+----+-----+
    * |2014|A |3 |
    * |2014|C |1 |
    * |2014|B |2 |
    * |2015|A |4 |
    * |2015|C |3 |
    * +----+----+-----+

    完整代码显示:

     /**
          * +----+----+-----+---------+
          * |id  |time|score|pre_score|
          * +----+----+-----+---------+
          * |2014|A   |3    |null     |
          * |2014|B   |2    |3        |
          * |2014|C   |1    |2        |
          * |2015|A   |4    |null     |
          * |2015|C   |3    |4        |
          * +----+----+-----+---------+
          */
        val sql=
          s"""
             |SELECT
             |  id,
             |  time,
             |  score,
             |  lag(score,1) over(partition by id order by time asc) as pre_score
             |FROM tempTable
           """.stripMargin
        hiveContext.sql(sql).show(false)
    
    
        /**
          * +----+----+---------+
          * |id  |time|avg_score|
          * +----+----+---------+
          * |2014|A   |3.0      |
          * |2014|B   |2.5      |
          * |2014|C   |1.5      |
          * |2015|A   |4.0      |
          * |2015|C   |3.5      |
          * +----+----+---------+
          */
        val sql1=
          s"""
             |SELECT
             |  id,
             |  time,
             |  CASE WHEN pre_score IS NULL THEN score
             |  ELSE (score+pre_score)/2
             |  END AS avg_score
             |FROM
             |(
             |   SELECT
             |     id,
             |     time,
             |     score,
             |     lag(score,1) over(partition by id order by time asc) as pre_score
             |   FROM tempTable
             |) tmp
           """.stripMargin
    
        hiveContext.sql(sql1).show(false)
    

    4.posexplode和lag函数的结合(实现分块排序)

    image.png

    完整代码显示:

    object Posexplode_LagDemo {
    
      def main(args: Array[String]): Unit = {
    
        val spark = SparkSession.builder()
          .master("local[2]")
          .appName("hive context")
          .getOrCreate()
    
        val hiveContext = new HiveContext(spark.sparkContext)
    
        //指定schema
        val schema = types.StructType(Seq(
          StructField("id", IntegerType, true),
          StructField("value", IntegerType, true)
        ))
        //创建案例数据
        val dataRdd = spark.sparkContext.parallelize(Array("2014,1", "2015,1","2017,0","2018,0","2019,1","2020,1","2021,1","2022,0","2023,0")).map(line => line.split(","))
        val rowRdd = dataRdd.map(p => Row(p(0).toInt, p(1).toInt))
    
        //创建DataFrame
        val data = hiveContext.createDataFrame(rowRdd, schema)
        data.registerTempTable("tempTable")
    
        /**
          * +----+-----+
          * |id  |value|
          * +----+-----+
          * |2014|1    |
          * |2015|1    |
          * |2017|0    |
          * |2018|0    |
          * |2019|1    |
          * |2020|1    |
          * |2021|1    |
          * |2022|0    |
          * |2023|0    |
          * +----+-----+
          */
        data.show(false)
    
    
        /**
          * +----+-----+---------+
          * |id  |value|pre_value|
          * +----+-----+---------+
          * |2014|1    |null     |
          * |2015|1    |1        |
          * |2017|0    |1        |
          * |2018|0    |0        |
          * |2019|1    |0        |
          * |2020|1    |1        |
          * |2021|1    |1        |
          * |2022|0    |1        |
          * |2023|0    |0        |
          * +----+-----+---------+
          * ------>
          * +------------+-----+
          * |min_block_id|value|
          * +------------+-----+
          * |2014        |1    |
          * |2017        |0    |
          * |2019        |1    |
          * |2022        |0    |
          * +------------+-----+
          * ------>
          * 基础表与上面的表join
          * +----+-----+------------+----+
          * |id  |value|min_block_id|rank|
          * +----+-----+------------+----+
          * |2014|1    |2014        |1   |
          * |2015|1    |2014        |1   |
          * |2017|0    |2017        |1   |
          * |2018|0    |2017        |1   |
          * |2019|1    |2019        |1   |
          * |2019|1    |2014        |2   |
          * |2020|1    |2019        |1   |
          * |2020|1    |2014        |2   |
          * |2021|1    |2019        |1   |
          * |2021|1    |2014        |2   |
          * |2022|0    |2022        |1   |
          * |2022|0    |2017        |2   |
          * |2023|0    |2022        |1   |
          * |2023|0    |2017        |2   |
          * +----+-----+------------+----+
          *
          * ------>
          * 限制rank=1
          * +----+-----+------------+----+
          * |id  |value|min_block_id|rank|
          * +----+-----+------------+----+
          * |2014|1    |2014        |1   |
          * |2015|1    |2014        |1   |
          * |2017|0    |2017        |1   |
          * |2018|0    |2017        |1   |
          * |2019|1    |2019        |1   |
          * |2020|1    |2019        |1   |
          * |2021|1    |2019        |1   |
          * |2022|0    |2022        |1   |
          * |2023|0    |2022        |1   |
          * +----+-----+------------+----+
          *
          * ------>
          * 最终结果
          * +----+-----+--------+
          * |id  |value|new_rank|
          * +----+-----+--------+
          * |2014|1    |1       |
          * |2015|1    |2       |
          * |2017|0    |1       |
          * |2018|0    |2       |
          * |2019|1    |1       |
          * |2020|1    |2       |
          * |2021|1    |3       |
          * |2022|0    |1       |
          * |2023|0    |2       |
          * +----+-----+--------+
          *
          */
        val sql=
          s"""
             |SELECT id,
             |  value,
             |  row_number() over(partition by min_block_id order by id asc) as new_rank
             |FROM
             |(
             |  SELECT
             |    t.id,
             |    t.value,
             |    m.min_block_id,
             |    row_number() over(partition by t.id order by min_block_id desc) as rank
             |  FROM tempTable t
             |  INNER JOIN
             |  (
             |     SELECT id as min_block_id,
             |       value
             |     FROM
             |      (
             |       SELECT
             |         id,
             |         value,
             |         lag(value,1) over(partition by 1 order by id asc) as pre_value
             |       FROM tempTable
             |       )
             |     WHERE pre_value is null
             |      or value!=pre_value
             |  ) m
             |  ON t.value=m.value
             |  AND t.id>=m.min_block_id
             |)
             |WHERE rank=1
             |ORDER BY id
           """.stripMargin
    
        hiveContext.sql(sql).show(false)
      }
    }
    

    相关文章

      网友评论

          本文标题:[Hive] 两个‘不常用’的函数posexplode和lag

          本文链接:https://www.haomeiwen.com/subject/jfihjctx.html