基站数据分析案例
[TOC]
本节任务
场景解读练习
数据维度分析
教学目标
使用案例练习Spark算子
通过基站数据判断用户工作地址
教学内容
一、场景描述
手机用户通过基站进行通信,同时,基站可以获取到当前在线的用户。可以通过基站中的事件状态标识用户的进入和离开,从而可以获得用户在基站中的停留时长,而基站本身具有地理位置的信息,可以进而确定用户的位置
二、数据描述
1. 基站数据
- 基站ID
- 经度
- 纬度
- 信号类型
2. 用户数据
- 手机号
- 时间戳
- 基站ID
- 事件类型(1->进入,0->离开)
3. 样例数据
- 基站数据
9F36407EAD0629FC166F14DDE7970F68,116.304864,40.050645,6
CC0710CC94ECC657A8561DE549D940E0,116.303955,40.041935,6
16030401EAFB68F1E3CDF819735E1C66,116.296302,40.032296,6
- 用户数据
18688888888,20160327082400,16030401EAFB68F1E3CDF819735E1C66,1
18101056888,20160327082500,16030401EAFB68F1E3CDF819735E1C66,1
18688888888,20160327170000,16030401EAFB68F1E3CDF819735E1C66,0
18101056888,20160327180000,16030401EAFB68F1E3CDF819735E1C66,0
18101056888,20160327075000,9F36407EAD8829FC166F14DDE7970F68,1
18688888888,20160327075100,9F36407EAD8829FC166F14DDE7970F68,1
18101056888,20160327081000,9F36407EAD8829FC166F14DDE7970F68,0
18688888888,20160327081300,9F36407EAD8829FC166F14DDE7970F68,0
18688888888,20160327175000,9F36407EAD8829FC166F14DDE7970F68,1
18101056888,20160327182000,9F36407EAD8829FC166F14DDE7970F68,1
18688888888,20160327220000,9F36407EAD8829FC166F14DDE7970F68,0
18101056888,20160327230000,9F36407EAD8829FC166F14DDE7970F68,0
18101056888,20160327081100,CC0710CC94ECC657A8561DE549D940E0,1
18688888888,20160327081200,CC0710CC94ECC657A8561DE549D940E0,1
18688888888,20160327081900,CC0710CC94ECC657A8561DE549D940E0,0
18101056888,20160327082000,CC0710CC94ECC657A8561DE549D940E0,0
18688888888,20160327171000,CC0710CC94ECC657A8561DE549D940E0,1
18688888888,20160327171600,CC0710CC94ECC657A8561DE549D940E0,0
18101056888,20160327180500,CC0710CC94ECC657A8561DE549D940E0,1
18101056888,20160327181500,CC0710CC94ECC657A8561DE549D940E0,0
三、需求分析
根据事件类型以及时间戳做时间运算,确定用户在基站中的停留时间,在每一天的基站数据中取出停留时间最长的两组基站数据,进而确定用户的工作地点以及住址
四、实现过程
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object Mobile {
def main(args: Array[String]): Unit = {
// 0.初始化
val sparkConf = new SparkConf().setAppName("Master").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
// 1.读取数据
val userData: RDD[String] = sc.textFile("E://log")
val baseInfo: RDD[String] = sc.textFile("E://base_info.txt")
// 2.数据清洗
val phoneBaseWithTime: RDD[((String, String), Long)] = userData.map(line => {
val fields = line.split(",")
val phone = fields(0) // 手机号
var time = fields(1).toLong // 时间戳
val base = fields(2) // 基站ID
val event = fields(3).toInt // 事件类型
time = if (event == 1) -time else time
((phone, base), time)
})
val baseWithXY: RDD[(String, (String, String))] = baseInfo.map(line => {
val fields = line.split(",")
val base = fields(0) // 基站ID
val x = fields(1) // 经度
val y = fields(2) // 纬度
(base, (x, y))
})
// 3.计算用户停留在基站时间总和
val phoneBaseWithSumTime: RDD[((String, String), Long)] = phoneBaseWithTime.reduceByKey(_ + _)
// 4.将基站的经纬度引入
val baseWithPhoneSumTime: RDD[(String, (String, Long))] = phoneBaseWithSumTime.map(content => {
val phone = content._1._1
val base = content._1._2
val time = content._2
(base, (phone, time))
})
val data: RDD[(String, ((String, Long), (String, String)))] = baseWithPhoneSumTime.join(baseWithXY)
// 5.按手机号分组,同时按时间排序
val result: RDD[(String, List[(String, Long,(String, String))])] = data.map(content => {
val base = content._1 // 基站ID
val phone = content._2._1._1 // 手机号
val time = content._2._1._2 // 停留时间
val xy = content._2._2 // 经纬度
(phone,time,xy)
}).groupBy(_._1).mapValues(_.toList.sortWith(_._2 > _._2).take(2))
// 6.输出结果
result.foreach(println)
// 运行结果
// (18101056888,List((18101056888,97500,(116.296302,40.032296)), (18101056888,54000,(116.304864,40.050645))))
// (18688888888,List((18688888888,87600,(116.296302,40.032296)), (18688888888,51200,(116.304864,40.050645))))
sc.stop()
}
}
网友评论