**
* sparksql支持了hive的窗口函数
* 罗列一些窗口函数
* 1.row_number分组排序
* 2.sum 分组统计
* 3.avg 分组统计
* 4.count 分组统计
* 5.lag 分组
* 等等
* 有利用于hive作业向spark-sql来转换。
* 咱这就用最常用的row_number来做个实验
*/
//导入一系列的依赖包
val conf =new SparkConf().setMaster("local").setAppName("SparkSqlWindowFunctionOps")
val sc =new SparkContext(conf)
val hiveContext =new HiveContext(sc)
//如果要创建的表存在就删除,然后创建我们要导入的表
hiveContext.sql("DROP TABLE IF EXISTS scores")
hiveContext.sql("CREATE TABLE IF NOT EXISTS scores(name STRING,score INTEGER)"
+"ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' LINES TERMINATED BY ' \n' " )//数据导入的信息
//把要处理的数据导入到hive表中
/**
* 数据
* zhangsan 50
* lisi 90
* lisi 49
* wangwu 80
* wangwu 89
* maliu 63
*/
hiveContext.sql("LOAD DATA LOCAL INPATH '/root/zgq/SparkApps/resources/readme.txt' INTO TABLE scores")
/**
* 使用子查询的方式完成目标数据的提取,在目标数据内部使用窗口函数row_number来进行分组排序
* partition by :指定窗口函数分组的key
* ORDER BY :分组后进行排序
*/
val result =hiveContext.sql("SELECT name,score FROM " +
"(SELECT name,score,row_number() OVER " +
"(PARTITION BY name ORDER BY score DESC) rank FROM scores) " +
"sub_scores where rank <= 4")
result.show()
//最后把数据保存到Hive数据仓库中
hiveContext.sql("DROP TABLE EXISTS sortedResultScores")
result.saveAsParquetFile("sortedResultScores")
对于用sparksql来操作hive是非常的方便
网友评论