1、需求:
计算每天【点击】,【下单】,【支付】次数排名前十的品类
注意:就是二次排序,当点击次数一样多的时候,比较下单次数,下单次数一样多的时候比较支付次数
2、数据格式
date:
日期 2020-03-12
user_id:
用户id user123
session_id:
会话id XXXXXYYYYY
page_id:
页面id 1
action:
访问的时间 1520769809971
city_id:
该用户所在的城市 1
search_keywords:
搜索关键字 "麻辣小龙虾|火锅鱼"
【click_category_id】:
用户点击品类的id 1
click_product_id:
用户点击商品的id 1
【order_category_id】:
用户下单的品类id 1^A2 #品类1和品类2
order_product_id:
用户下单的商品id 1^A2^A3 #商品1、商品2和商品3
【pay_category_id】:
用户支付的品类id 1^A2
pay_product_id:
用户支付的商品id 1^A2
3、数据调研
数据格式如下,数据之间用的^A作为分隔符
2020-03-11,user123,XXXXXYYYYY,1,1520769809971,1,"麻辣小龙虾|火锅鱼",1,1,1^A2,1^A2^A3,1^A2,1^A2
2020-03-11,user1234,XX55YYYYY,1,1520769809972,1,"小龙虾|火锅鱼",1,1,1^A2^A3,1^A2^A3,1^A2^A3,1^A2^A3
2020-03-11,user1234,XX55YYYYY,1,1520769809973,1,"小龙虾|火锅鱼",2,1,1^A2^A3,1^A2^A3,1^A3,1^A2^A3
2020-03-11,user1234,XX55YYYYY,1,1520769809974,1,"小龙虾|火锅鱼",2,1,1^A2^A3,1^A2^A3,1^A2^ A3,1^A2^A3
2020-03-11,user1234,XX55YYYYY,1,1520769809975,1,"小龙虾|火锅鱼",4,1,1^A2^A3,1^A2^A3,1^A2^A3^A4,1^A2^A3
数据是模拟数据,里面有些是点击日志,有些是下单日志,有些是支付日志,一条日志要么就是点击日志,要么就是下单日志,要么就是支付日志,如果一条日志是点击日志,那么这条日志里面支付和下单的品类ID是空的,里面只会有点击的品类ID,支付日志和下单日志同理。而且注意点击的时候一下子只能点击一个品类,所以日志里面只会包含一个品类,但是下单的日志和支付的日志里面,可能会包含多个品类。
4、结果
计算结果如下:
category_id=1|click_category_count=2|order_category_count=5|pay_category_count=5
category_id=2|click_category_count=2|order_category_count=5|pay_category_count=4
category_id=4|click_category_count=1|order_category_count=0|pay_category_count=1
category_id=3|click_category_count=0|order_category_count=4|pay_category_count=4
5、代码
object SparkTopN {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(s"${Constants.SPARK_APP_NAME}").setMaster("local")
val sc = new SparkContext(conf)
val rdd: RDD[String] = sc.textFile("D:\\Users\\newhopedata\\IdeaProjects\\SparkScalaWork\\src\\main\\scala\\nx\\core\\log.txt")
/**
* 第一步:
* 获取到所有的品类id
*/
val allCategoryid = getAllCategoryID(rdd).distinct()
/**
* 第二步:
* 分别获取品类的:
* 点击,下单,支付的次数
*/
val clickCategoryCount = getClickCategoryCount(rdd)
val orderCategoryCount = getOrderCategoryCount(rdd)
val payCategoryCount = getPayCategoryCount(rdd)
/**
* 第三步:
* 第一步和第二步的结果进行leftjoin
*/
val resultRDD: RDD[(Long, String)] = leftJoinData(allCategoryid,clickCategoryCount,orderCategoryCount,payCategoryCount)
/**
* 第四步:
* 实现二次排序的效果
*/
getTopN(resultRDD)
sc.stop()
}
/**
* 获取所有的品类id
* @param rdd
* @return
*/
def getAllCategoryID(rdd: RDD[String]):RDD[(Long,Long)]={
val ids = new mutable.HashSet[(Long,Long)]
rdd.flatMap( line =>{
val fields = line.split(",")
val click_category_id = fields(7)
val order_category_id = fields(9)
val pay_category_id = fields(11)
if(click_category_id != null && !click_category_id.trim.equals("")){
ids+=((click_category_id.toLong,click_category_id.toLong))
}
if(order_category_id != null && !order_category_id.trim.equals("")){
val fields = order_category_id.split(s"\\${Constants.SPLIT_FIELDS}")
for(categoryid <- fields ){
ids += ((categoryid.toLong,categoryid.toLong))
}
}
if(pay_category_id != null && !pay_category_id.trim.equals("")){
val fields = pay_category_id.split(s"\\${Constants.SPLIT_FIELDS}")
for(categoryid <- fields ){
ids += ((categoryid.toLong,categoryid.toLong))
}
}
ids
})
}
/**
* 获取 品类的点击的次数
* @param rdd
* @return
*/
def getClickCategoryCount(rdd: RDD[String]):RDD[(Long,Long)]={
rdd.filter( line =>{
val fields = line.split(",")
fields(7) != null && !fields(7).trim.equals("")
}).map( line =>{
val click_category_id = line.split(",")(7).toLong
(click_category_id,1L)
}).reduceByKey(_+_)
}
/**
* 获取品类的下单次数
* @param rdd
* @return
*/
def getOrderCategoryCount(rdd: RDD[String]):RDD[(Long,Long)]={
rdd.filter( line =>{
val fields = line.split(",")(9)
fields != null && !fields.trim.equals("")
}).flatMap( line =>{
line.split(",")(9).split(s"\\${Constants.SPLIT_FIELDS}")
}).map( order_category_id =>{
(order_category_id.toLong,1L)
}).reduceByKey(_+_)
}
/**
* 获取品类的支付次数
* @param rdd
* @return
*/
def getPayCategoryCount(rdd: RDD[String]):RDD[(Long,Long)]={
rdd.filter( line =>{
val fields = line.split(",")(11)
fields != null && ! fields.trim.equals("")
}).flatMap( line =>{
line.split(",")(11).split(s"\\${Constants.SPLIT_FIELDS}")
}).map( pay_category_id =>{
(pay_category_id.toLong,1L)
}).reduceByKey(_+_)
}
def leftJoinData(
allCategoryID2ID:RDD[(Long,Long)],
clickCategoryidAndCount:RDD[(Long,Long)],
orderCategoryidAndCount:RDD[(Long,Long)],
payCategoryidAndCount:RDD[(Long,Long)]
): RDD[(Long,String)] ={
/**
* Long:categoryid
* Long:categoryid
* Some
* None
*/
val resultRDD: RDD[(Long, (Long, Option[Long]))] =
allCategoryID2ID.leftOuterJoin(clickCategoryidAndCount)
val result2RDD: RDD[(Long, (String, Option[Long]))] = resultRDD.map(tuple => {
val category_id = tuple._1.toLong
val click_category_count = tuple._2._2.getOrElse(0)
val value = s"${Constants.CATEGORY_ID}=" + category_id + "|" + s"${Constants.CLICK_CATEGORY_COUNT}=" + click_category_count
(category_id, value)
}).leftOuterJoin(orderCategoryidAndCount)
val result3RDD = result2RDD.map(tuple => {
val category_id = tuple._1.toLong
var value = tuple._2._1
val count = tuple._2._2.getOrElse(0)
//category_id=1|click_category_count=5|order_category_count=4
value += "|" + s"${Constants.ORDER_CATEGORY_COUNT}=" + count
(category_id, value)
}).leftOuterJoin(payCategoryidAndCount)
result3RDD.map( tuple =>{
val category_id = tuple._1.toLong
var value = tuple._2._1
val count = tuple._2._2.getOrElse(0)
value += "|" + s"${Constants.PAY_CATEGORY_COUNT}=" + count
//category_id=1|click_category_count=5|order_category_count=4|pay_category_count=1
(category_id,value)
})
}
/**
* 实现二次排序的效果
* 根据 点击,下单,支付 进行排序
*
* @param resultRDD
*/
def getTopN(resultRDD: RDD[(Long, String)]): Unit ={
resultRDD.map( tuple =>{
val category_id = tuple._1
val value = tuple._2
//category_id=1|click_category_count=5|order_category_count=4|pay_category_count=1
val click_count = value.split("\\|")(1).split("=")(1).toLong
val order_count = value.split("\\|")(2).split("=")(1).toLong
val pay_count = value.split("\\|")(3).split("=")(1).toLong
val key = new Sortkey(click_count,order_count,pay_count)
//这个地方value的位置其实就我们做项目而言,返回来一个category_id就可以了
//现在返回来value字段,其实就是为了看效果而已!!!
(key,value)
}).sortByKey(false)
.foreach( tuple =>{
println(tuple._2);
/***
*
* category_id=1|click_category_count=2|order_category_count=5|pay_category_count=5
category_id=2|click_category_count=2|order_category_count=5|pay_category_count=4
category_id=4|click_category_count=1|order_category_count=0|pay_category_count=1
category_id=3|click_category_count=0|order_category_count=4|pay_category_count=4
*
*
*/
})
}
}
网友评论