package com.atguigu.sparkmall.offline
/**
* 页面单跳转化率
*/
import java.sql.{Connection, DriverManager, PreparedStatement}
import java.util.UUID
import com.atguigu.sparkmall.common.model.{CategoryTop10, CategoryTop10SessionTop10, UserVisitAction}
import com.atguigu.sparkmall.common.util.StringUtil
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.{immutable, mutable}
object Req3PageFlowApplication {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("Req1CategoryTop10Application")
val sc = new SparkContext(conf)
//从文件中获取原始数据
val lineDataRDD: RDD[String] = sc.textFile("input/user_visit_action.csv")
//将数据转换成样例类
val actionRDD: RDD[UserVisitAction] = lineDataRDD.map {
line => {
val datas: Array[String] = line.split(",")
UserVisitAction(
datas(0),
datas(1),
datas(2),
datas(3).toLong,
datas(4),
datas(5),
datas(6).toLong,
datas(7).toLong,
datas(8),
datas(9),
datas(10),
datas(11),
datas(12)
)
}
}
//做缓存,原因是:
actionRDD.cache()
//TODO: 2.计算分母数据
//2.1将原始数据进行过滤,保留需要统计的页面(我们只关心1,2,3,4,5,6,7这几个页面)
val pageids = List(1,2,3,4,5,6,7)
val zipPageids: List[String] = pageids.zip(pageids.tail).map {
case (pageid1, pageid2) => {
pageid1 + "-" + pageid2
}
}
//1,2,3,4,5,6,7
val filterRDD: RDD[UserVisitAction] = actionRDD.filter {
action => {
pageids.contains(action.page_id.toInt) //注意类型
}
}
//将原始数据进行结构的转化 (pageid,1)
val pageIdToOneRDD: RDD[(Long, Long)] = filterRDD.map {
action => (action.page_id, 1L)
}
//将数据进行聚合统计 (pageid,1) => (pageid,sum)
val pageIdToSumRDD: RDD[(Long, Long)] = pageIdToOneRDD.reduceByKey(_+_)
val pageIdToSums: Map[Long, Long] = pageIdToSumRDD.collect().toMap
/*************************************************************************/
//TODO:分子计算
//将原始数据根据session进行分组(session, Iterato[UserVisitAction])
val sessionGroupRDD: RDD[(String, Iterable[UserVisitAction])] = actionRDD.groupBy(action => action.session_id)
//将分组后的数据进行时间的排序(升序):(session, List[pageid1,pageid2])
val sessionToZipRDD: RDD[(String, List[(String, Long)])] = sessionGroupRDD.mapValues {
datas => {
val actions: List[UserVisitAction] = datas.toList.sortWith {
(left, right) => {
left.action_time < right.action_time
}
}
val ids: List[Long] = actions.map(_.page_id)
val zipList: List[(Long, Long)] = ids.zip(ids.tail)
zipList.map {
case (pageid1, pageid2) => {
(pageid1 + "-" + pageid2, 1L)
}
}
}
}
//(1-2,1)
val zipRDD: RDD[(String, Long)] = sessionToZipRDD.map(_._2).flatMap(list => list)
//过滤无效数据
val zipFilterRDD: RDD[(String, Long)] = zipRDD.filter {
case (pageflow, one) => {
zipPageids.contains(pageflow)
}
}
//将拉链后的数据进行统计分析(pageid1-pageid2,sum)
val pageFlowReduceRDD: RDD[(String, Long)] = zipFilterRDD.reduceByKey(_+_)
// TODO 4. 计算单跳转化率 : 分子数据/分母数据 => (pageid1-pageid2).sum / pageid1.sum
pageFlowReduceRDD.foreach{
case(pageFlow,sum) => {
val pageid1: String = pageFlow.split("-")(0)
println(pageFlow + "=" + sum.toDouble / pageIdToSums.getOrElse(pageid1.toLong, 1L))
}
}
// 释放资源
sc.stop()
}
}
网友评论