Spark zip算子优雅的实现差分算法
什么是差分呢?用公式表示就如下图所示:
差分计算有什么用呢?它在时间序列建模中重要作用自不必说,单是在日常开发的各个场景中就有许多用的上的地方,比如在广告浏览中,我们监控每个id点击某个广告的间隔时长等指标,判断是否存在恶意点击的行为;再比如我们监控某个传感器传入的数据间隔,判断该传感器是否正常工作。类似的应用场景还有许多。在这里就不做更多介绍。
虽然这些计算在实时处理的时候可以很简单的就能实现,但是很多情况下我们还是需要进行离线计算,在spark sql中如何能快速的实现这个功能呢,zip算子为我们提供了一个优雅的实现。
案例:现在我们有一个数据集,记录的是传感器传输消息的时间,为了简单起见,笔者在这里只给出了id,和时间戳,我们要计算传感器传输的时间间隔
数据查看
// 数据量
+---+-----+
|mid|count|
+---+-----+
| m1|84840|
| m2|35621|
+---+-----+
val spec = Window.partitionBy($"mid").orderBy($"timestamp")
val sensor = spark.read.option("header", true)
.option("InferSchema", true)
.csv(inputFile) // 加载数据
.withColumn("r", row_number().over(spec)) // 利用窗口函数对数据分区排序
/**
* 实现的思路就是将原数据分为分别 “去头掐尾”
* 然后在利用zip函数 进行差分计算
*/
val n = 1 // 差分阶数
val lastnMap = sensor.groupBy($"mid").agg(last("r"))
.toDF("mid", "r").rdd.map(row => {
val mid = row.getAs[String]("mid")
val r = row.getAs[Int]("r")
val range = (r - n + 1) to r
range.map(i => (mid, i) -> false)
}).flatMap(_.toMap).collect().toMap
val lastBroad = spark.sparkContext.broadcast(lastnMap)
val exceptLastUDF = udf((mmsi: String, r: Int) => {
lastBroad.value.getOrElse((mmsi, r), true)
})
sensor.groupBy($"mid").count().show()
// 去掉每组后n行数据
val exceptLastRDD: RDD[Row] = sensor.where(exceptLastUDF($"mid", $"r")).rdd
// 去掉每组前n行数据
val exceptFirstRDD: RDD[Row] = sensor.where($"r" > n).rdd
// RDD[(Row, Row)
val diffed: DataFrame = exceptFirstRDD.zip(exceptLastRDD)
.map(drow => {
// 两个row之间还可以进行更复杂的操作,这里只做时间差计算
val mid = drow._1.getAs[String]("mid")
val time = drow._1.getAs[Int]("timestamp")
val time2 = drow._2.getAs[Int]("timestamp")
val r = drow._1.getAs[Int]("r")
val r2 = drow._2.getAs[Int]("r")
(mid, time, r, r2, time - time2)
}).toDF("mid", "time", "r", "r2", "timeDiff")
// 结果查看
diffed.show()
+---+----------+---+---+--------+
|mid| time| r| r2|timeDiff|
+---+----------+---+---+--------+
| m1|1519833112| 2| 1| 180|
| m1|1519833253| 3| 2| 141|
| m1|1519833472| 4| 3| 219|
| m1|1519833652| 5| 4| 180|
| m1|1519833952| 6| 5| 300|
+---+----------+---+---+--------+
网友评论