Flink SQL 支持三种窗口类型, 分别为 Tumble Windows / HOP Windows 和 Session Windows. 其中 HOP windows 对应 Table API 中的 Sliding Window, 同时每种窗口分别有相应的使用场景和方法.
Tumble Windows | HOP Window | Session Windows | |
---|---|---|---|
TUMBLE(time_attr, interval) | HOP(time_attr, interval1,interval2) | HOP(time_attr, interval) |
下面用几段代码演示如何使用上面 3组 API. 完整的代码见 Github
首先填充一点测试数据
// 初始数据 字段解释 -> (timeStamp , name , value)
DataStream<Tuple3<Long, String,Integer>> log = env.fromCollection(Arrays.asList(
//时间 14:53:00
new Tuple3<>(1572591180_000L,"xiao_ming",300),
//时间 14:53:09
new Tuple3<>(1572591189_000L,"zhang_san",303),
//时间 14:53:12
new Tuple3<>(1572591192_000L, "xiao_li",204),
//时间 14:53:21
new Tuple3<>(1572591201_000L,"li_si", 208)));
然后是转换为 Table
//这里需要注意的是 如果采用了EventTime, 那么 对应字段后面加 .rowtime, 否则加 .proctime
Table logT = tEnv.fromDataStream(logWithTime, "t.rowtime, name, v");
Tumble Windows
// GROUP BY TUMBLE(t, INTERVAL '10' SECOND) 相当于根据10s的时间来划分窗口
// TUMBLE_START(t, INTERVAL '10' SECOND) 获取窗口的开始时间
// TUMBLE_END(t, INTERVAL '10' SECOND) 获取窗口的结束时间
tEnv.sqlQuery("SELECT TUMBLE_START(t, INTERVAL '10' SECOND) AS window_start," +
"TUMBLE_END(t, INTERVAL '10' SECOND) AS window_end, SUM(v) FROM "
+ logT + " GROUP BY TUMBLE(t, INTERVAL '10' SECOND)");
HOP Windows
// HOP(time_attr, interval1, interval2)
// interval1 滑动长度
// interval2 窗口长度
// HOP_START(t, INTERVAL '5' SECOND, INTERVAL '10' SECOND) 表示窗口开始时间
// HOP_END(t, INTERVAL '5' SECOND, INTERVAL '10' SECOND) 表示窗口结束时间
Table result = tEnv.sqlQuery("SELECT HOP_START(t, INTERVAL '5' SECOND, INTERVAL '10' SECOND) AS window_start,"
+ "HOP_END(t, INTERVAL '5' SECOND, INTERVAL '10' SECOND) AS window_end, SUM(v) FROM "
+ logT + " GROUP BY HOP(t, INTERVAL '5' SECOND, INTERVAL '10' SECOND)");
Session Windows
// SESSION(time_attr, interval)
// interval 表示两条数据触发session的最大间隔
Table result = tEnv.sqlQuery("SELECT SESSION_START(t, INTERVAL '5' SECOND) AS window_start,"
+"SESSION_END(t, INTERVAL '5' SECOND) AS window_end, SUM(v) FROM "
+ logT + " GROUP BY SESSION(t, INTERVAL '5' SECOND)");
网友评论