Flink 系列博客
Flink QuickStart
Flink双流操作
Flink on Yarn Kerberos的配置
Flink on Yarn部署和任务提交操作
Flink配置Prometheus监控
Flink in docker 部署
Flink HA 部署
Flink 常见调优参数总结
Flink 源码之任务提交流程分析
Flink 源码之基本算子
Flink 源码之Trigger
Flink 源码之Evictor
Flink 源码之Window
WindowOperator
WindowOperator负责window中元素存储和计算流程。
WindowOperator包含如下几个重要方法:
- open:operator初始化的逻辑
- processElement: 新元素进入window的时候调用
- onEventTime: event time计算触发时候的逻辑
- onProcessingTime:processing time计算触发时候的逻辑
open方法
open方法的调用栈如图所示:

由图可知,open方法在Task Manager中的Task -> StreamTask的openAllOperator得到调用。
在分析open方法之前,我们需要先分析下WindowOperator类的继承结构。

WindowOperator继承自AbstractUdfStreamOperator,AbstractStreamOperator继承自AbstractStreamOperator。
AbstractStreamOperator
的open方法:
/**
* This method is called immediately before any elements are processed, it should contain the
* operator's initialization logic, e.g. state initialization.
*
* <p>The default implementation does nothing.
*
* @throws Exception An exception in this method causes the operator to fail.
*/
@Override
public void open() throws Exception {}
根据英文注释,可以总结open方法的功能如下:
- open方法在operator处理任何元素之前调用
- open方法负责operator的初始化工作,例如状态初始化
- 如果该方法抛出异常,operator会失败,无法进行后续处理数据的流程
AbstractUdfStreamOperator的open代码如下所示:
@Override
public void open() throws Exception {
super.open();
FunctionUtils.openFunction(userFunction, new Configuration());
}
FunctionUtils这一行对userFunction进行了处理,稍后分析。userFunction为用户自定义处理方法的封装。比如apply或者process函数中传入的自定义处理逻辑,会被封装为userFunction。
FunctionUtils.openFunction方法:
public static void openFunction(Function function, Configuration parameters) throws Exception{
if (function instanceof RichFunction) {
RichFunction richFunction = (RichFunction) function;
richFunction.open(parameters);
}
}
如果userFunction使用的是RichFunction类型的话,会调用richFunction的open方法。
接下来分析下WindowOperator的open方法。代码如下:
public void open() throws Exception {
super.open();
// 打开迟到被丢弃数据条数统计的监控
this.numLateRecordsDropped = metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);
// 创建一个基于时间戳的数据收集器,用于输出窗口数据到下游
timestampedCollector = new TimestampedCollector<>(output);
// 获取时间服务,用于向windowAssignerContext传递当前的processing time
internalTimerService =
getInternalTimerService("window-timers", windowSerializer, this);
// 创建triggerContext和processContext,后面用到时候分析
triggerContext = new Context(null, null);
processContext = new WindowContext(null);
// 创建WindowAssignerContext,主要用于获取processing time
windowAssignerContext = new WindowAssigner.WindowAssignerContext() {
@Override
public long getCurrentProcessingTime() {
return internalTimerService.currentProcessingTime();
}
};
// create (or restore) the state that hold the actual window contents
// NOTE - the state may be null in the case of the overriding evicting window operator
// 创建windowState,用于储存窗口中的数据
if (windowStateDescriptor != null) {
windowState = (InternalAppendingState<K, W, IN, ACC, ACC>) getOrCreateKeyedState(windowSerializer, windowStateDescriptor);
}
// create the typed and helper states for merging windows
// 如果windowAssigner是MergingWindowAssigner子类,即使用的是SessionWindow的话,执行if内的内容
if (windowAssigner instanceof MergingWindowAssigner) {
// store a typed reference for the state of merging windows - sanity check
if (windowState instanceof InternalMergingState) {
windowMergingState = (InternalMergingState<K, W, IN, ACC, ACC>) windowState;
}
// TODO this sanity check should be here, but is prevented by an incorrect test (pending validation)
// TODO see WindowOperatorTest.testCleanupTimerWithEmptyFoldingStateForSessionWindows()
// TODO activate the sanity check once resolved
// else if (windowState != null) {
// throw new IllegalStateException(
// "The window uses a merging assigner, but the window state is not mergeable.");
// }
// 以下逻辑为创建储存window合并的状态变量mergingSetsState
@SuppressWarnings("unchecked")
final Class<Tuple2<W, W>> typedTuple = (Class<Tuple2<W, W>>) (Class<?>) Tuple2.class;
final TupleSerializer<Tuple2<W, W>> tupleSerializer = new TupleSerializer<>(
typedTuple,
new TypeSerializer[] {windowSerializer, windowSerializer});
final ListStateDescriptor<Tuple2<W, W>> mergingSetsStateDescriptor =
new ListStateDescriptor<>("merging-window-set", tupleSerializer);
// get the state that stores the merging sets
mergingSetsState = (InternalListState<K, VoidNamespace, Tuple2<W, W>>)
getOrCreateKeyedState(VoidNamespaceSerializer.INSTANCE, mergingSetsStateDescriptor);
mergingSetsState.setCurrentNamespace(VoidNamespace.INSTANCE);
}
}
processElement方法
有数据到达window的时候,系统会调用processElement方法。代码如下所示:
public void processElement(StreamRecord<IN> element) throws Exception {
// 获取需要分配给该元素的window。以eventtime时间窗口为例,element的eventtime位于window的起止时间之中,则该窗口需要指派给此元素
final Collection<W> elementWindows = windowAssigner.assignWindows(
element.getValue(), element.getTimestamp(), windowAssignerContext);
//if element is handled by none of assigned elementWindows
// 返回元素是否被处理,如果元素经过处理,返回false
boolean isSkippedElement = true;
// 此处获取到key的值,即keyBy方法字段的值
// 该KeyedStateBackend在StreamTaskStateInitializerImpl中创建
final K key = this.<K>getKeyedStateBackend().getCurrentKey();
// 判断Window是否是MergingWindowAssigner(合并窗口)的子类。比如SessionWindow属于MergingWindowAssigner
if (windowAssigner instanceof MergingWindowAssigner) {
// MergingWindowAssigner部分的处理逻辑,代码在后面分析,此处省略
} else {
// 非MergingWindowAssigner部分的处理逻辑,代码在后面分析,此处省略
}
// side output input event if
// element not handled by any window
// late arriving tag has been set
// windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
// 如果元素没有被window处理,并且元素来迟,会加入到旁路输出
// 否则此数据被丢弃,迟到被丢弃数据条数监控会增加1
if (isSkippedElement && isElementLate(element)) {
if (lateDataOutputTag != null){
sideOutput(element);
} else {
this.numLateRecordsDropped.inc();
}
}
}
MergingWindowAssigner部分的处理逻辑
在分析MergingWindowAssigner之前我们需要提前看下MergingWindowSet类。该类负责处理整个合并窗口的过程,以及存放合并窗口的结果。
MergingWindowSet中用一个重要的变量为mapping,定义为:
private final Map<W, W> mapping;
该类负责存放一个window到window的映射。具体这个映射有什么作用呢?
这里先提前介绍下windowState,windowState是一个HeapListState。它的内部具有三个值:key,namespace和value。对于给定的key和namespace,可以获取到属于他们的一组值。在windowOperator中,key为keyBy函数对应的元素字段的值,namspace为当前的window。
可以认为HeapListState是一个Map<K, Map<NS, V>>
的数据结构,对于给定的key,每个window和它里面的数据以Map<NS, V>
形式储存。Window对象本身作为Namespace,相当于Map<NS, V>
这个map中的key,维护了window和数据之间的关系。
但是问题来了,对于MergingWindowAssigner(SessionWindow),窗口是一直被合并的,合并之后的窗口和原先作为key保存数据对应关系的窗口肯定是不同的。该窗口对应的数据无法被合并之后的窗口获取到。
为了解决这个问题,有如下两种方案:
- 每次window合并之后,将原先window下的数据转移到合并后window下。这样涉及到很频繁的HashMap操作,性能开销较大。
- 选择一个固定的window(这里叫做状态window),来代表以后每次合并后的window,作为key来维护合并后window内的数据。同时再建立一个映射,维护状态window和合并后窗口之间的关系。Flink采用了这种做法。上述的
mapping
变量正是用于维护这个对应关系的。mapping
的key为合并后的窗口,value为状态窗口。
Flink使用一组被合并窗口中的第一个窗口,作为他们合并之后窗口的状态窗口。每次合并窗口操作发生之后,将mapping
中被合并的窗口的映射关系移除,同时将合并后窗口和状态窗口的对应关系写入mapping
。
还有一个变量是initialMapping
,它和mapping
相同,只不过是在创建MergingWindowSet的时候被赋值,和mapping
的值对比可用于判断mapping
是否发生了变更,是否需要持久化。
接下来分析下它的addWindow
方法,如下所示:
public W addWindow(W newWindow, MergeFunction<W> mergeFunction) throws Exception {
List<W> windows = new ArrayList<>();
// mapping 在创建MergingWindowSet对象的时候会读取ListState。该State保存了已合并的window
windows.addAll(this.mapping.keySet());
// 将新window增加进来
windows.add(newWindow);
// 此变量保存窗口合并的结果
final Map<W, Collection<W>> mergeResults = new HashMap<>();
// 调用windowAssigner的mergeWindows方法,合并后的结果被放入了mergeResults中
windowAssigner.mergeWindows(windows,
new MergingWindowAssigner.MergeCallback<W>() {
@Override
public void merge(Collection<W> toBeMerged, W mergeResult) {
if (LOG.isDebugEnabled()) {
LOG.debug("Merging {} into {}", toBeMerged, mergeResult);
}
mergeResults.put(mergeResult, toBeMerged);
}
});
W resultWindow = newWindow;
boolean mergedNewWindow = false;
// perform the merge
for (Map.Entry<W, Collection<W>> c: mergeResults.entrySet()) {
// 合并之后的窗口
W mergeResult = c.getKey();
// 被合并的窗口
Collection<W> mergedWindows = c.getValue();
// if our new window is in the merged windows make the merge result the
// result window
// 如果被合并的window中包含有当前需要加入set的window(newWindow),那么窗口合并的结果就是mergeResult,将它赋给resultWindow
if (mergedWindows.remove(newWindow)) {
mergedNewWindow = true;
resultWindow = mergeResult;
}
// pick any of the merged windows and choose that window's state window
// as the state window for the merge result
// 获取第一个被合并的窗口作为合并后窗口的状态窗口
W mergedStateWindow = this.mapping.get(mergedWindows.iterator().next());
// figure out the state windows that we are merging
// 逐个寻找mapping的key中是否有本次操作已合并的window。
// 这里将他们转移到mergedStateWindows变量中
List<W> mergedStateWindows = new ArrayList<>();
for (W mergedWindow: mergedWindows) {
W res = this.mapping.remove(mergedWindow);
if (res != null) {
mergedStateWindows.add(res);
}
}
// 在mapping中设置合并后的窗口和它的状态窗口的对应关系
this.mapping.put(mergeResult, mergedStateWindow);
// don't put the target state window into the merged windows
// mergedStateWindows去掉状态窗口
mergedStateWindows.remove(mergedStateWindow);
// don't merge the new window itself, it never had any state associated with it
// i.e. if we are only merging one pre-existing window into itself
// without extending the pre-existing window
// 如果被合并的窗口不包含已合并窗口,或者被合并窗口列表大小不为1的时候
// 此处条件一定会满足,因为TimeWindow的mergeWindows方法保证了mergedWindows的size大于1的时候才会调用MergeCallback回调函数
if (!(mergedWindows.contains(mergeResult) && mergedWindows.size() == 1)) {
mergeFunction.merge(mergeResult,
mergedWindows,
this.mapping.get(mergeResult),
mergedStateWindows);
}
}
// the new window created a new, self-contained window without merging
// 如果一个window合并过后还是他自己(第一次调用addWindow,或者是开始了一个新的session),会进入这个if分支
if (mergeResults.isEmpty() || (resultWindow.equals(newWindow) && !mergedNewWindow)) {
this.mapping.put(resultWindow, resultWindow);
}
// 返回合并后的window
return resultWindow;
}
让我们回到processElement
方法处理MergingWindowAssigner的分支部分。代码如下:
// 获取MergingWindowSet对象,负责存放合并的窗口
MergingWindowSet<W> mergingWindows = getMergingWindowSet();
for (W window: elementWindows) {
// adding the new window might result in a merge, in that case the actualWindow
// is the merged window and we work with that. If we don't merge then
// actualWindow == window
// 调用addWindow方法,返回被合并的window
W actualWindow = mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction<W>() {
@Override
public void merge(W mergeResult,
Collection<W> mergedWindows, W stateWindowResult,
Collection<W> mergedStateWindows) throws Exception {
// 检测两种异常情况,在新元素加入window的时候,合并后窗口的最大时间戳不可能比当前waterwark或者处理时间还小的
// 如果是event time类型,并且合并后的窗口的maxTimestamp+allowedLateness小于等于当前的watermark会报错
if ((windowAssigner.isEventTime() && mergeResult.maxTimestamp() + allowedLateness <= internalTimerService.currentWatermark())) {
throw new UnsupportedOperationException("The end timestamp of an " +
"event-time window cannot become earlier than the current watermark " +
"by merging. Current watermark: " + internalTimerService.currentWatermark() +
" window: " + mergeResult);
} else if (!windowAssigner.isEventTime()) {
long currentProcessingTime = internalTimerService.currentProcessingTime();
// 如果是基于processing time的窗口,合并后窗口的maxTimestamp小于等于当前处理时间的话,程序会报错
if (mergeResult.maxTimestamp() <= currentProcessingTime) {
throw new UnsupportedOperationException("The end timestamp of a " +
"processing-time window cannot become earlier than the current processing time " +
"by merging. Current processing time: " + currentProcessingTime +
" window: " + mergeResult);
}
}
// 设置triggerContext的key和window
triggerContext.key = key;
triggerContext.window = mergeResult;
// 调用具体trigger实现类的注册定时器方法
// 注册的timer触发事件为mergeResult窗口的maxTimestamp
triggerContext.onMerge(mergedWindows);
// 移除所有触发时间为被合并window的maxTimestamp的定时器
for (W m: mergedWindows) {
triggerContext.window = m;
triggerContext.clear();
deleteCleanupTimer(m);
}
// merge the merged state windows into the newly resulting state window
// 将所有被合并的状态window对应的数据写入到合并后的window对应的状态window中
windowMergingState.mergeNamespaces(stateWindowResult, mergedStateWindows);
}
});
// drop if the window is already late
// 如果window迟到,则清除迟到的window。isWindowLate方法后面会分析
if (isWindowLate(actualWindow)) {
mergingWindows.retireWindow(actualWindow);
continue;
}
// 接下来将处理此元素,需要设置为false
isSkippedElement = false;
// 获取状态window,状态window作为key,标记存储合并窗口内的数据
W stateWindow = mergingWindows.getStateWindow(actualWindow);
if (stateWindow == null) {
throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
}
// 储存元素的值
windowState.setCurrentNamespace(stateWindow);
windowState.add(element.getValue());
triggerContext.key = key;
triggerContext.window = actualWindow;
// 调用trigger的onElement
TriggerResult triggerResult = triggerContext.onElement(element);
if (triggerResult.isFire()) {
ACC contents = windowState.get();
if (contents == null) {
continue;
}
// 如果触发计算,则在此提取出合并后窗口的内容,传入userFunction进行计算
emitWindowContents(actualWindow, contents);
}
// 如果触发了purge操作,则需要清空window状态
if (triggerResult.isPurge()) {
windowState.clear();
}
// 注册window的cleanup timer 稍后分析
registerCleanupTimer(actualWindow);
}
// need to make sure to update the merging state in state
// 将已合并的window持久化到ListState中
mergingWindows.persist();
非MergingWindowAssigner部分的处理逻辑
for (W window: elementWindows) {
// drop if the window is already late
// 判断window是否迟到,后面分析isWindowLate
if (isWindowLate(window)) {
continue;
}
// 标记该元素得到了处理
isSkippedElement = false;
// windowState为HeapListState
// HeapListState为内存中存储的分区化的链表状态(ListState),使用namespace区分不同窗口的数据。可以理解为一个Map,key为window对象,value为元素的值
windowState.setCurrentNamespace(window);
windowState.add(element.getValue());
// 设置trigger上下文对象的key和window
triggerContext.key = key;
triggerContext.window = window;
// 调用trigger的onElement方法,询问trigger新element到来的时候需要作出什么动作。
TriggerResult triggerResult = triggerContext.onElement(element);
// isFire表示需要触发计算
if (triggerResult.isFire()) {
// 取出windowState当前namespace下所有的元素。即当前window下所有的元素。
ACC contents = windowState.get();
if (contents == null) {
continue;
}
// 使用用户传入的处理函数来计算window内数据,稍后分析
emitWindowContents(window, contents);
}
// 如果触发器返回需要清空数据,则删除window中所有的数据
if (triggerResult.isPurge()) {
windowState.clear();
}
// 注册timer,当前时间已经过了window的cleanup时间(后面有cleanup time的含义),会根据窗口的类型调用对应的onProcessingTime方法或者是onEventTime方法
registerCleanupTimer(window);
}
isWindowLate函数,负责判断window是否迟到。迟到的window会被忽略。
protected boolean isWindowLate(W window) {
// 如果window类型为eventtime并且window的cleanup时间比当前watermark早,说明window已经迟到
return (windowAssigner.isEventTime() && (cleanupTime(window) <= internalTimerService.currentWatermark()));
}
cleanupTime方法,用来获取window的cleanup time
private long cleanupTime(W window) {
if (windowAssigner.isEventTime()) {
// cleanup time为window的end时间 + 允许迟到的时间 - 1
long cleanupTime = window.maxTimestamp() + allowedLateness;
// 如果cleanupTime溢出(返回负值),则使用Long.MAX_VALUE
return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
} else {
return window.maxTimestamp();
}
}
registerCleanupTimer方法,清除window在cleanup time时刻注册的定时器
protected void registerCleanupTimer(W window) {
long cleanupTime = cleanupTime(window);
if (cleanupTime == Long.MAX_VALUE) {
// don't set a GC timer for "end of time"
return;
}
// 增加对应onProcessingTime或onEventTime定时器
if (windowAssigner.isEventTime()) {
// 当前的watermark如果大于cleanupTime之时,会调用trigger的onElementTime方法
triggerContext.registerEventTimeTimer(cleanupTime);
} else {
// 当前的系统时间大于cleanupTime之时,会调用trigger的onProcessingTime方法
triggerContext.registerProcessingTimeTimer(cleanupTime);
}
}
emitWindowContents,收集window中的元素,调用用户编写的处理函数。代码如下:
private void emitWindowContents(W window, ACC contents) throws Exception {
timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
processContext.window = window;
// 此处调用用户编写的处理函数
// 此处用户的函数,比如ProcessWindowFunction被InternalIterableProcessWindowFunction包装了
userFunction.process(triggerContext.key, window, processContext, contents, timestampedCollector);
}
onEventTime
下面是onEventTime方法的代码。当event time触发计算的时候会调用该方法。
public void onEventTime(InternalTimer<K, W> timer) throws Exception {
// 获取key和window
triggerContext.key = timer.getKey();
triggerContext.window = timer.getNamespace();
MergingWindowSet<W> mergingWindows;
// 如果是MergingWindowAssigner(Session Window)
if (windowAssigner instanceof MergingWindowAssigner) {
mergingWindows = getMergingWindowSet();
// 获取状态window
W stateWindow = mergingWindows.getStateWindow(triggerContext.window);
if (stateWindow == null) {
// Timer firing for non-existent window, this can only happen if a
// trigger did not clean up timers. We have already cleared the merging
// window and therefore the Trigger state, however, so nothing to do.
return;
} else {
// 设置当前window为triggerContext.window,方便以后调用windowState.get()方法获取window中的元素
windowState.setCurrentNamespace(stateWindow);
}
} else {
// 设置当前window为triggerContext.window,方便以后调用windowState.get()方法获取window中的元素
windowState.setCurrentNamespace(triggerContext.window);
mergingWindows = null;
}
// 调用trigger的onEventTime方法
TriggerResult triggerResult = triggerContext.onEventTime(timer.getTimestamp());
// 此处为触发计算的逻辑
if (triggerResult.isFire()) {
ACC contents = windowState.get();
if (contents != null) {
emitWindowContents(triggerContext.window, contents);
}
}
// 如果触发了purge操作,则清空window中的内容
if (triggerResult.isPurge()) {
windowState.clear();
}
// 如果是event time类型,并且定时器触发时间是window的cleanup时间的时候,意味着该窗口的数据已经处理完毕,需要清除该窗口的所有状态
if (windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
clearAllState(triggerContext.window, windowState, mergingWindows);
}
// 持久化window合并状态
if (mergingWindows != null) {
// need to make sure to update the merging state in state
mergingWindows.persist();
}
}
onProcessingTime
onProcessingTime
方法和onEventTime
方法相比,除了调用trigger
的onProcessingTime
方法这一处不同外,其他的的逻辑基本类似,不再赘述。
网友评论