WindowOperator
Context
定义了各种 registerEventTimeTimer,registerEventTimeTimer还有deleteXXX, 其本质是调用internalTimerService的同名方法来增删timer
提供onElement,onProcessingTime,onEventTime给上层(WindowOperator call), 内部就是包装了trigger的同名方法, trigger的方法由业务实现
HeapInternalTimerService
HeapInternalTimerService :> InternalTimerService, 主要就是维护两个queue,eventTimeTimersQueue和eventTimeTimersQueue
这两个queue的内部元素是timer
windowOperator implement了OneInputStreamOperator
-
processElement()
, 主要做了把element存到partitioned state里(给onEventTime和onProcessTime或自己去消费),再根据其中callcontext.onElement
的结果来决定fire来消费
首先用windowAssigner
找到element对应的windows
Collection<W> elementWindows = windowAssigner.assignWindows(
element.getValue(), element.getTimestamp(), windowAssignerContext);
mergeWindow逻辑跳过。。。直接看如何处理
for (W window: elementWindows) {
// drop if the window is already late, 处理迟到数据
if (isLate(window)) {
continue;
}
// 找到每个窗口对应的partitionedState并存入
AppendingState<IN, ACC> windowState =
getPartitionedState(window, windowSerializer, windowStateDescriptor);
windowState.add(element.getValue());
// 更新context
context.key = key;
context.window = window;
// call onElement
TriggerResult triggerResult = context.onElement(element);
if (triggerResult.isFire()) {
ACC contents = windowState.get();
if (contents == null) {
continue;
}
//执行window function
emitWindowContents(window, contents);
}
if (triggerResult.isPurge()) {
windowState.clear();
}
// 注册cleanup timer, 会在onEventTime和onProcessTime中读到这个timer并clean
registerCleanupTimer(window);
}
-
processWatermark()
这个方法在一个父类的父类 AbstractUdfStreamOperator中实现了,自己没实现。
这个方法call了timerService的advanceWatermark
, 其中timerService call了WindowOperator
的onEventTime()
public void advanceWatermark(long time) throws Exception {
currentWatermark = time;
InternalTimer<K, N> timer;
//不大于,水印用于表示小于该时间戳的元素都已到达,所以所有不大于水印的触发时间戳都该被触发
while ((timer = eventTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
Set<InternalTimer<K, N>> timerSet = getEventTimeTimerSetForTimer(timer);
timerSet.remove(timer);
eventTimeTimersQueue.remove();
keyContext.setCurrentKey(timer.getKey());
//这边调用了onEventTime
triggerTarget.onEventTime(timer);
}
processWatermark在OperatorChain.ChainningOutput.emitWatermark中被call。
windowOperator implement了triggerable
,2方法onEventTime()
和onProcessingTime()
。都是处理timer的
1.onEventTime(InternalTimer timer)
在HeapInternalTimerService。advanceWatermark中调用(触发watermark以下的timer时)。
onEventTime内部 callcontext.onEventTime()
(触发trigger.onEventTime) 来决定触发window计算, 和执行cleanup window的逻辑cleanAllState()
2.onProcessingTime(InternalTimer timer)
call context.onProcessingTime
和onEventTime类似, 触发trigger.onProcessTime和cleanAllState
上层HeapInternalTimerService.onProcessTime call windowOperator.onProcessTime()
//不大于表示小于该时间戳的元素都已到达,所以所有不大于此times的触发时间戳timer都该被触发
while ((timer = processingTimeTimersQueue.peek()) != null && timer.getTimestamp() <= time) {
Set<InternalTimer<K, N>> timerSet = getProcessingTimeTimerSetForTimer(timer);
timerSet.remove(timer);
processingTimeTimersQueue.remove();
keyContext.setCurrentKey(timer.getKey());
// triggerTarget 就是 windowOperator
triggerTarget.onProcessingTime(timer);
}
EvictingWindowOperator
EvictingWindowOperator :> WindowOperator
在计算前写call evictor.evictorBefore
AbstractKeyedTimePanes
优化window到pane里, 但目前deprecate, 未来会重做
WindowedStream reduce()
和apply()
(fold不支持)中 createFastTimeOperatorIfValid()
根据
if (windowAssigner.getClass() == SlidingAlignedProcessingTimeWindows.class(or TumblingAlignedProcessingTimeWindows) && trigger == null && evictor == null)
来判断是否可以使用带pane优化的operator (AccumulatingProcessingTimeWindowOperator
和AggregatingProcessingTimeWindowOperator
)
否则, 使用简单的 windowOperator和EvitorWindowOperator
网友评论