1.前言
窗口的触发器定义了窗口是何时被触发并同时决定触发行为(对窗口进行清理或者计算)。
注意:窗口的触发在内部是设置定时器来实现的。
2. 触发器相关类
Trigger抽象类:
Trigger抽象类- onElement:每个元素到达触发的回调方法;
- onProcessingTime:基于处理时间定时器触发的回调方法;
- onEventTime:基于事件时间定时器触发的回调方法;
- onMerge:窗口在合并时触发的回调方法(会话窗口分配器assigner);
TriggerContext接口(定义在Trigger类中),用于维持状态,注册定时器等:
TriggerContext- registerXXXTimeTimer:注册(处理/事件)时间定时器;
- deleteXXXTimeTimer:删除(处理/事件)时间定时器;
- getPartitionedState:从Flink状态存储终端获取状态;
TriggerResult枚举类,用于决定窗口在触发后的行为:
- CONTINUE:不作任何处理;
- FIRE_AND_PURGE:触发窗口计算并输出结果同时清理并释放窗口(该值只会被清理触发器PurgingTrigger使用);
- FIRE:触发窗口计算并输出结果,但窗口并没有被释放并且数据仍然保留;
- PURGE:不触发窗口计算,不输出结果,只清除窗口中的所有数据并释放窗口
3.时间窗口触发器
3.1 ProcessingTimeTrigger
onElement方法
@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
ctx.registerProcessingTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
先去注册一个ProcessingTime定时器,触发时间点就是当前窗口的最大时间戳;
触发结果就是不做任何操作。
onProcessingTime和onEventTime方法
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
return TriggerResult.FIRE;
}
3.2 EventTimeTrigger
onElement方法
@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
当窗口的最大时间戳小于等于水位线立即触发窗口计算;否则的话就去注册EventTime定时器,结果就是不做任何操作。
onProcessingTime和onEventTime方法
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
return time == window.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
onEventTime:如果时间等于窗口的最大时间戳,则触发对窗口进行计算,否则不做任何操作
这里为什么需要有一层时间的判断呢(time == window.maxTimestamp() )?
参考博客:Flink中EventTimeTrigger的理解
4.持续时间触发器
笔者之前写过 窗口实用触发器
持续触发,顾名思义,在本次触发之后需要更新并且保存下一次触发的时间戳,因此在持续时间触发器中引入了状态保存机制:
/** When merging we take the lowest of all fire timestamps as the new fire timestamp. */
private final ReducingStateDescriptor<Long> stateDesc =
new ReducingStateDescriptor<>("fire-time", new Min(), LongSerializer.INSTANCE);
使用的是ReducingState,这里面调用的是Min函数,选择多个时间戳内最小的。
ContinuousProcessingTimeTrigger
onElement方法:
@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
//获得存储触发时间戳的状态容器
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
//获取当前处理时间
timestamp = ctx.getCurrentProcessingTime();
//如果状态对象为空,则初始化;否则直接返回
if (fireTimestamp.get() == null) {
//计算起始时间
long start = timestamp - (timestamp % interval);
//下一次触发时间戳为起始时间加上触发间隔
long nextFireTimestamp = start + interval;
//以下一次触发的时间戳注册处理时间定时器
ctx.registerProcessingTimeTimer(nextFireTimestamp);
//将下一次触发计算的时间戳加入状态进行保存
fireTimestamp.add(nextFireTimestamp);
return TriggerResult.CONTINUE;
}
return TriggerResult.CONTINUE;
}
所以,ContinuousProcessingTimeTrigger的onElement方法主要是完成对存储窗口触发时间状态对象的初始化,并注册了第一次执行的定时器。
ContinuousEventTimeTrigger的onElement方法实现跟ContinuousProcessingTimeTrigger除了获取时间戳的方式不同,基本与此类似。
基于时间的回调方法,onProcessingTime
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
//首先从状态中查找触发时间
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
//跟定时器的注册时间进行对比,只有两者相等时才会触发计算
if (fireTimestamp.get().equals(time)) {
//清空状态并重新初始化值
fireTimestamp.clear();
fireTimestamp.add(time + interval);
//注册下一次触发的定时器
ctx.registerProcessingTimeTimer(time + interval);
//触发窗口计算
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
网友评论