spark sql的滑动窗口
- 第一个参数为事件时间的字段列
- 第二个参数是窗口大小
- 第三个参数是窗口步长
- 第四个参数为可选,用来设置时间偏移
参考:https://www.iteblog.com/archives/1705.html
val ret = pre_data
.withWatermark("time", "5 seconds")
.groupBy(
window($"time", "5 minutes", "5 minutes").alias("window"),
$"flowKey.src_ip".alias("src_ip"),
$"flowKey.src_port".alias("src_port"),
$"flowKey.dest_ip".alias("dest_ip"),
$"flowKey.proto".alias("proto"),
$"flowKey.direction".alias("direction")
)
flink的滑动窗口
滑动窗口有三个参数
- 第一个是窗口大小
- 第二个窗口是步长
- 第三个窗口可选,是时间偏移量
例如,没有偏移参数,按小时的窗口,有30分钟的滑动,将根据时间纪元来对齐,也就是说你将得到如下的窗口1:00:00.001:59:59.999,1:30:00.0002:29:59.999等。而如果你想改变窗口的对齐,你可以给定一个偏移,如果给定一个15分钟的偏移,你将得到如下的窗口:1:15:00.000~2:14.59.999, 1:45:00.000~2:44:59.999等。时间偏移一个很大的用处是用来调准非0时区的窗口,例如:在中国你需要指定一个8小时的时间偏移
https://www.jianshu.com/p/a883262241ef
processedData
.keyBy(_.flowKey)
.window(SlidingEventTimeWindows.of(Time.minutes(5), Time.minutes(5), Time.minutes(5)))
.reduce((a, b) => {
val start = if (a.start_timestamp <= b.start_timestamp) a.start_timestamp else b.start_timestamp
val end = if (a.end_timestamp >= b.end_timestamp) a.end_timestamp else b.end_timestamp
FlowLog(a.flowKey, start, end, a.count + b.count)
}).map(new RichMapFunction[FlowLog, JSONObject] {
网友评论