美文网首页
Spark案例练习-UV的统计

Spark案例练习-UV的统计

作者: 那山的狐狸 | 来源:发表于2020-05-08 23:45 被阅读0次

关注公众号:分享电脑学习

回复"百度云盘" 可以免费获取所有学习文档的代码(不定期更新)

云盘目录说明:

tools目录是安装包

res 目录是每一个课件对应的代码和资源等

doc 目录是一些第三方的文档工具

承接上一篇文档《Spark案例练习-PV的统计

参数说明:

继续上面的PV代码编写即可

思路:UV的计算

1.数据进行过滤清洗,获取两个字段(时间、guid)

2.guid非空,时间非空,时间字符串的长度必须大于10

3.将同一天的数据放在一起,根据guid去重,统计去重的结果

代码:

val rdd2 = rdd.map(line=>line.split("\t"))

.filter(arr=>{

//保留正常数据

arr.length >=3 && arr(2).trim.nonEmpty && arr(0).trim.length > 10

})

.map(arr => {

val date = arr(0).trim.substring(0,10)

val guid = arr(2).trim

(date,guid) // (date,url)

})

继续编写代码

有两种方式:

1. 基于groupByKey进行UV的统计

2. 基于reduceByKey实现UV的统计

先看基于groupByKey进行UV的统计

val uvRdd = rdd2.groupByKey()

.map(t=>{

val date = t._1

val guids = t._2

val uv = guids.toSet.size

(date,uv)

})

println("uv------------------"+ uvRdd.collect().mkString(";"))

再看基于reduceByKey实现UV的统计

rdd2.map(t=>{

((t._1,t._2),1)

})

.reduceByKey(_+_)

.map(_._1)

val uvRDD: RDD[(String, Int)] = rdd2.distinct()

.map(t=>(t._1,1))

.reduceByKey(_+_)

println("uv------------------"+ uvRDD.collect().mkString(";"))

最终指标的合并

val pvuvRdd = pvRdd.fullOuterJoin(uvRdd)

.map(t=>{

val date = t._1

val pv = t._2._1.getOrElse(0)//如果有值则返回对应的值,如果无值则返回0

val uv = t._2._2.getOrElse(0)

//返回结果

(date,pv,uv)

})

打印一下,可以看到合并的数据

数据输出(Driver、保存HDFS上,保存到RDBMS中)

数据返回给Driver

valresult = pvuvRdd.collect()

保存到HDFS上

pvuvRdd.saveAsTextFile(s"hdfs://master:9000/data/pv_uv/${System.currentTimeMillis()}")

端口注意下,如果想用域名(master)就要确保在本地hosts文件配置了(win环境下)

运行一下,可以看到hdfs上有了这个文件

保存到RDBMS中、保存到非关系型数据库中

建库建表

CREATEDATABASEspark_test;

USEspark_test;

CREATETABLEpvuv(

`date`DATENOTNULL,

`pv`INT(11)NOTNULL,

`uv`INT(11)NOTNULL

)ENGINE=MYISAMDEFAULTCHARSET=utf8;

编写代码

其中val conn = DriverManager.getConnection("","","")这句话是url、user和password

代码

pvuvRdd.foreachPartition(iter => {

//1.创建数据库连接对象

//2.创建数据输出prepareStatement对象

val conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/spark_test","root","root")

val pstmt = conn.prepareStatement("insert into pvuv(date,pv,uv) values(?,?,?);")

//3.数据迭代输出

iter.foreach(t => {

valdate= t._1

val pv = t._2

val uv = t._3

pstmt.setString(1,date)

pstmt.setInt(2,pv)

pstmt.setInt(3,uv)

pstmt.executeUpdate()

})

//4.关闭连接

conn.close()

pstmt.close()

})

运行代码,查看数据库

相关文章

网友评论

      本文标题:Spark案例练习-UV的统计

      本文链接:https://www.haomeiwen.com/subject/pvmcnhtx.html