美文网首页
Spark zip算子优雅的实现差分算法

Spark zip算子优雅的实现差分算法

作者: k_wzzc | 来源:发表于2018-11-18 14:12 被阅读0次

    Spark zip算子优雅的实现差分算法

    什么是差分呢?用公式表示就如下图所示:

    差分

    差分计算有什么用呢?它在时间序列建模中重要作用自不必说,单是在日常开发的各个场景中就有许多用的上的地方,比如在广告浏览中,我们监控每个id点击某个广告的间隔时长等指标,判断是否存在恶意点击的行为;再比如我们监控某个传感器传入的数据间隔,判断该传感器是否正常工作。类似的应用场景还有许多。在这里就不做更多介绍。

    虽然这些计算在实时处理的时候可以很简单的就能实现,但是很多情况下我们还是需要进行离线计算,在spark sql中如何能快速的实现这个功能呢,zip算子为我们提供了一个优雅的实现。
    案例:现在我们有一个数据集,记录的是传感器传输消息的时间,为了简单起见,笔者在这里只给出了id,和时间戳,我们要计算传感器传输的时间间隔


    数据查看
    // 数据量
    +---+-----+
    |mid|count|
    +---+-----+
    | m1|84840|
    | m2|35621|
    +---+-----+
    
    
        val spec = Window.partitionBy($"mid").orderBy($"timestamp")
    
        val sensor = spark.read.option("header", true)
          .option("InferSchema", true)
          .csv(inputFile) // 加载数据
          .withColumn("r", row_number().over(spec)) // 利用窗口函数对数据分区排序
    
    
        /**
          * 实现的思路就是将原数据分为分别 “去头掐尾”
          * 然后在利用zip函数 进行差分计算
          */
        val n = 1 // 差分阶数
        val lastnMap = sensor.groupBy($"mid").agg(last("r"))
          .toDF("mid", "r").rdd.map(row => {
          val mid = row.getAs[String]("mid")
          val r = row.getAs[Int]("r")
          val range = (r - n + 1) to r
          range.map(i => (mid, i) -> false)
        }).flatMap(_.toMap).collect().toMap
    
        val lastBroad = spark.sparkContext.broadcast(lastnMap)
    
        val exceptLastUDF = udf((mmsi: String, r: Int) => {
          lastBroad.value.getOrElse((mmsi, r), true)
        })
    
    
        sensor.groupBy($"mid").count().show()
    
        // 去掉每组后n行数据
        val exceptLastRDD: RDD[Row] = sensor.where(exceptLastUDF($"mid", $"r")).rdd
    
        // 去掉每组前n行数据
        val exceptFirstRDD: RDD[Row] = sensor.where($"r" > n).rdd
    
        // RDD[(Row, Row)
        val diffed: DataFrame = exceptFirstRDD.zip(exceptLastRDD)
          .map(drow => {
            //     两个row之间还可以进行更复杂的操作,这里只做时间差计算
            val mid = drow._1.getAs[String]("mid")
            val time = drow._1.getAs[Int]("timestamp")
            val time2 = drow._2.getAs[Int]("timestamp")
            val r = drow._1.getAs[Int]("r")
            val r2 = drow._2.getAs[Int]("r")
            (mid, time, r, r2, time - time2)
          }).toDF("mid", "time", "r", "r2", "timeDiff")
    
    
        // 结果查看
        diffed.show()
     +---+----------+---+---+--------+
    |mid|      time|  r| r2|timeDiff|
    +---+----------+---+---+--------+
    | m1|1519833112|  2|  1|     180|
    | m1|1519833253|  3|  2|     141|
    | m1|1519833472|  4|  3|     219|
    | m1|1519833652|  5|  4|     180|
    | m1|1519833952|  6|  5|     300|
    +---+----------+---+---+--------+
    

    相关文章

      网友评论

          本文标题:Spark zip算子优雅的实现差分算法

          本文链接:https://www.haomeiwen.com/subject/uahgfqtx.html