美文网首页
spark SQL利用窗口函数操作hive

spark SQL利用窗口函数操作hive

作者: 400476cab371 | 来源:发表于2017-12-27 11:22 被阅读704次

    **

    * sparksql支持了hive的窗口函数

    * 罗列一些窗口函数

    * 1.row_number分组排序

    * 2.sum 分组统计

    * 3.avg 分组统计

    * 4.count 分组统计

    * 5.lag 分组

    * 等等

    * 有利用于hive作业向spark-sql来转换。

    * 咱这就用最常用的row_number来做个实验

    */

    //导入一系列的依赖包

    val conf =new SparkConf().setMaster("local").setAppName("SparkSqlWindowFunctionOps")

    val sc =new SparkContext(conf)

    val hiveContext =new HiveContext(sc)

    //如果要创建的表存在就删除,然后创建我们要导入的表

    hiveContext.sql("DROP TABLE IF EXISTS scores")

    hiveContext.sql("CREATE TABLE IF NOT EXISTS scores(name STRING,score INTEGER)"

    +"ROW FORMAT DELIMITED FIELDS TERMINATED BY  '  ' LINES TERMINATED BY  ' \n'  " )//数据导入的信息

    //把要处理的数据导入到hive表中

    /**

    * 数据

    * zhangsan 50

    * lisi 90

    * lisi 49

    * wangwu 80

    * wangwu 89

    * maliu 63

    */

    hiveContext.sql("LOAD DATA LOCAL INPATH '/root/zgq/SparkApps/resources/readme.txt' INTO TABLE scores")

    /**

    * 使用子查询的方式完成目标数据的提取,在目标数据内部使用窗口函数row_number来进行分组排序

    * partition by :指定窗口函数分组的key

    * ORDER BY :分组后进行排序

    */

    val result =hiveContext.sql("SELECT name,score FROM  " +

    "(SELECT name,score,row_number() OVER " +

    "(PARTITION BY name ORDER BY score DESC) rank FROM scores) " +

    "sub_scores where rank <= 4")

    result.show()

    //最后把数据保存到Hive数据仓库中

    hiveContext.sql("DROP TABLE EXISTS sortedResultScores")

    result.saveAsParquetFile("sortedResultScores")

    对于用sparksql来操作hive是非常的方便

    相关文章

      网友评论

          本文标题:spark SQL利用窗口函数操作hive

          本文链接:https://www.haomeiwen.com/subject/tnvygxtx.html