1. 起因
在我们使用 evictor 算子的时候,官网有这样的一句话:
Flink provides no guarantees about the order of the elements within a window. This implies that although an evictor may remove elements from the beginning of the window, these are not necessarily the ones that arrive first or last
大概的意思就是说,evictor 尽管可以从窗口开始时移除元素但是并不保证,这个元素是第一个或最后一个到达窗口的。 why?
2. 解释
env.addSource(consumer).uid("orderAndRegisterUserIdSource")
.keyBy(new KeySelector<String, String>() {
@Override
public String getKey(String value) throws Exception {
return "a";
}
})
.timeWindow(Time.seconds(1000))
.evictor(new Evictor<String, TimeWindow>() {
@Override
public void evictBefore(Iterable<TimestampedValue<String>> elements, int size, TimeWindow window, EvictorContext evictorContext) {
System.out.println("evictBefore");
}
@Override
public void evictAfter(Iterable<TimestampedValue<String>> elements, int size, TimeWindow window, EvictorContext evictorContext) {
System.out.println("evictAfter");
}
})
.trigger(new CountAndTimeTrigger(2L))
.process(new ProcessWindowFunctionImp()).uid("process");
通常我们会写出这样的代码,但当我们在 evictBefore 或者 evictAfter输出第一个元素时,它竟然保证是第一个或者最后一个进入 window 的。
我们一起来看一下源码,进入 EvictingWindowOperator
public void processElement(StreamRecord<IN> element) throws Exception {
final Collection<W> elementWindows = windowAssigner.assignWindows(
element.getValue(), element.getTimestamp(), windowAssignerContext);
//if element is handled by none of assigned elementWindows
boolean isSkippedElement = true;
final K key = this.<K>getKeyedStateBackend().getCurrentKey();
if (windowAssigner instanceof MergingWindowAssigner) {
MergingWindowSet<W> mergingWindows = getMergingWindowSet();
for (W window : elementWindows) {
// adding the new window might result in a merge, in that case the actualWindow
// is the merged window and we work with that. If we don't merge then
// actualWindow == window
W actualWindow = mergingWindows.addWindow(window,
new MergingWindowSet.MergeFunction<W>() {
@Override
public void merge(W mergeResult,
Collection<W> mergedWindows, W stateWindowResult,
Collection<W> mergedStateWindows) throws Exception {
if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark())) {
throw new UnsupportedOperationException("The end timestamp of an " +
"event-time window cannot become earlier than the current watermark " +
"by merging. Current watermark: " + internalTimerService.currentWatermark() +
" window: " + mergeResult);
} else if (!windowAssigner.isEventTime() && mergeResult.maxTimestamp() <= internalTimerService.currentProcessingTime()) {
throw new UnsupportedOperationException("The end timestamp of a " +
"processing-time window cannot become earlier than the current processing time " +
"by merging. Current processing time: " + internalTimerService.currentProcessingTime() +
" window: " + mergeResult);
}
triggerContext.key = key;
triggerContext.window = mergeResult;
triggerContext.onMerge(mergedWindows);
for (W m : mergedWindows) {
triggerContext.window = m;
triggerContext.clear();
deleteCleanupTimer(m);
}
// merge the merged state windows into the newly resulting state window
evictingWindowState.mergeNamespaces(stateWindowResult, mergedStateWindows);
}
});
// drop if the window is already late
if (isWindowLate(actualWindow)) {
mergingWindows.retireWindow(actualWindow);
continue;
}
isSkippedElement = false;
W stateWindow = mergingWindows.getStateWindow(actualWindow);
if (stateWindow == null) {
throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
}
evictingWindowState.setCurrentNamespace(stateWindow);
evictingWindowState.add(element);
triggerContext.key = key;
triggerContext.window = actualWindow;
evictorContext.key = key;
evictorContext.window = actualWindow;
TriggerResult triggerResult = triggerContext.onElement(element);
if (triggerResult.isFire()) {
Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
if (contents == null) {
// if we have no state, there is nothing to do
continue;
}
emitWindowContents(actualWindow, contents, evictingWindowState);
}
if (triggerResult.isPurge()) {
evictingWindowState.clear();
}
registerCleanupTimer(actualWindow);
}
// need to make sure to update the merging state in state
mergingWindows.persist();
} else {
for (W window : elementWindows) {
// check if the window is already inactive
if (isWindowLate(window)) {
continue;
}
isSkippedElement = false;
evictingWindowState.setCurrentNamespace(window);
evictingWindowState.add(element);
triggerContext.key = key;
triggerContext.window = window;
evictorContext.key = key;
evictorContext.window = window;
TriggerResult triggerResult = triggerContext.onElement(element);
if (triggerResult.isFire()) {
Iterable<StreamRecord<IN>> contents = evictingWindowState.get();
if (contents == null) {
// if we have no state, there is nothing to do
continue;
}
emitWindowContents(window, contents, evictingWindowState);
}
if (triggerResult.isPurge()) {
evictingWindowState.clear();
}
registerCleanupTimer(window);
}
}
// side output input event if
// element not handled by any window
// late arriving tag has been set
// windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
if (isSkippedElement && isElementLate(element)) {
if (lateDataOutputTag != null){
sideOutput(element);
} else {
this.numLateRecordsDropped.inc();
}
}
}
这个里的 evictingWindowState 是一个 RocksDBListState。
由 Flink key state 为何仅与 key 有关的 ,我们知道 evictingWindowState.get 时也仅仅会得到当前 key 对应的值 。
当窗口触发时,传递给 emitWindowContents 时,也仅仅是当前 key 的值。( 对于同一个 key 而言是可以保证顺序的 )。故当 evictor 处理数据时也仅仅是当前 key 的值,而非整个 window 的值。故 evictor 处理的第一个数据不一定是 第一个或最后一个到达 window 的。而 window 也不保证元素顺序(进入window 窗口的顺序)
网友评论