TimeWindow的定义
- timeColumn: eventTime列;
- windowDuration: 窗口的宽度;
- slideDuration: 每次滑动的步长;
- startTime: 相对于
1970-01-01 00:00:00 UTC
开启窗口周期调度的开始时间,比如说想小时级别15min开始滚动滑动,e.g.12:15-13:15, 13:15-14:15...
,这时startTime就是15 minutes
;
case class TimeWindow(
timeColumn: Expression,
windowDuration: Long,
slideDuration: Long,
startTime: Long)
TimeWindowing Rule
目的: 针对于滑窗,时间列会被多个窗口用到,所以需要基于Expand
算子将一个时间列映射到多个时间窗口中去;
实现步骤:
* maxNumOverlapping <- ceil(windowDuration / slideDuration)
* for (i <- 0 until maxNumOverlapping)
* windowId <- ceil((timestamp - startTime) / slideDuration)
* windowStart <- windowId * slideDuration + (i - maxNumOverlapping) * slideDuration + startTime
* windowEnd <- windowStart + windowDuration
* return windowStart, windowEnd
- 计算总共重叠的窗口个数:ceil(windowDuration / slideDuration) = maxNumOverlapping;
- 从0开始迭代到maxNumOverlapping,计算Window;
- 在真实的实现中,区分了滚动窗口和滑动窗口:
- 如果是滚动窗口,其maxNumOverlapping就是1,直接转换成Window返回即可;
- 如果是滑动窗口,其maxNumOverlapping>1,需要基于
Expand
算子整合多个Window,达到一次input映射到多个Window的目的;
- 需要注意的时最终的结果其实是一个包含
WINDOW_START
和WINDOW_END
的struct,其实也就是13:15-14:15
这样的表达式,并映射为EventTime列,并不是WindowExpression
;
具体实现代码分析:
val window = windowExpressions.head
val metadata = window.timeColumn match {
case a: Attribute => a.metadata
case _ => Metadata.empty
}
// 根据传入的i值,获取第i个窗口:
// - 在滚动窗口中,这个值即为0;
// - 在滑动窗口中,这个值即为`0 until maxNumOverlapping`;
def getWindow(i: Int, overlappingWindows: Int): Expression = {
val division = (PreciseTimestampConversion(
window.timeColumn, TimestampType, LongType) - window.startTime) / window.slideDuration
val ceil = Ceil(division)
// if the division is equal to the ceiling, our record is the start of a window
val windowId = CaseWhen(Seq((ceil === division, ceil + 1)), Some(ceil))
val windowStart = (windowId + i - overlappingWindows) *
window.slideDuration + window.startTime
val windowEnd = windowStart + window.windowDuration
// 注意这个结果:最终生成的其实是一个包含`WINDOW_START`和`WINDOW_END`的struct,其实也就是`13:15-14:15`这样的表达式;
CreateNamedStruct(
Literal(WINDOW_START) ::
PreciseTimestampConversion(windowStart, LongType, TimestampType) ::
Literal(WINDOW_END) ::
PreciseTimestampConversion(windowEnd, LongType, TimestampType) ::
Nil)
}
val windowAttr = AttributeReference(
WINDOW_COL_NAME, window.dataType, metadata = metadata)()
if (window.windowDuration == window.slideDuration) {
// 滚动窗口的实现,这里将通过getWindow获得的struct表达式alias为COL_NAME
val windowStruct = Alias(getWindow(0, 1), WINDOW_COL_NAME)(
exprId = windowAttr.exprId, explicitMetadata = Some(metadata))
val replacedPlan = p transformExpressions {
case t: TimeWindow => windowAttr
}
// For backwards compatibility we add a filter to filter out nulls
val filterExpr = IsNotNull(window.timeColumn)
replacedPlan.withNewChildren(
Filter(filterExpr,
Project(windowStruct +: child.output, child)) :: Nil)
} else {
// 滑动窗口的实现
val overlappingWindows =
math.ceil(window.windowDuration * 1.0 / window.slideDuration).toInt
// 基于`tabulate`遍历实现伪代码中的迭代
val windows =
Seq.tabulate(overlappingWindows)(i => getWindow(i, overlappingWindows))
val projections = windows.map(_ +: child.output)
val filterExpr =
window.timeColumn >= windowAttr.getField(WINDOW_START) &&
window.timeColumn < windowAttr.getField(WINDOW_END)
// 最后基于Expand算子实现多个Window的映射
val substitutedPlan = Filter(filterExpr,
Expand(projections, windowAttr +: child.output, child))
val renamedPlan = p transformExpressions {
case t: TimeWindow => windowAttr
}
renamedPlan.withNewChildren(substitutedPlan :: Nil)
网友评论