美文网首页
spark sql和flink的滑动窗口

spark sql和flink的滑动窗口

作者: 王金松 | 来源:发表于2019-05-16 23:59 被阅读0次

    spark sql的滑动窗口

    • 第一个参数为事件时间的字段列
    • 第二个参数是窗口大小
    • 第三个参数是窗口步长
    • 第四个参数为可选,用来设置时间偏移
      参考:https://www.iteblog.com/archives/1705.html
    val ret = pre_data
          .withWatermark("time", "5 seconds")
          .groupBy(
            window($"time", "5 minutes", "5 minutes").alias("window"),
            $"flowKey.src_ip".alias("src_ip"),
            $"flowKey.src_port".alias("src_port"),
            $"flowKey.dest_ip".alias("dest_ip"),
            $"flowKey.proto".alias("proto"),
            $"flowKey.direction".alias("direction")
          )
    

    flink的滑动窗口

    滑动窗口有三个参数

    • 第一个是窗口大小
    • 第二个窗口是步长
    • 第三个窗口可选,是时间偏移量
      例如,没有偏移参数,按小时的窗口,有30分钟的滑动,将根据时间纪元来对齐,也就是说你将得到如下的窗口1:00:00.001:59:59.999,1:30:00.0002:29:59.999等。而如果你想改变窗口的对齐,你可以给定一个偏移,如果给定一个15分钟的偏移,你将得到如下的窗口:1:15:00.000~2:14.59.999, 1:45:00.000~2:44:59.999等。时间偏移一个很大的用处是用来调准非0时区的窗口,例如:在中国你需要指定一个8小时的时间偏移
      https://www.jianshu.com/p/a883262241ef
     processedData
          .keyBy(_.flowKey)
          .window(SlidingEventTimeWindows.of(Time.minutes(5), Time.minutes(5), Time.minutes(5)))
          .reduce((a, b) => {
            val start = if (a.start_timestamp <= b.start_timestamp)  a.start_timestamp else b.start_timestamp
            val end = if (a.end_timestamp >= b.end_timestamp) a.end_timestamp else b.end_timestamp
            FlowLog(a.flowKey, start, end, a.count + b.count)
          }).map(new RichMapFunction[FlowLog, JSONObject] {
    

    相关文章

      网友评论

          本文标题:spark sql和flink的滑动窗口

          本文链接:https://www.haomeiwen.com/subject/srsgaqtx.html