背景
工作中遇到一个需求,需要按天划分窗口,并且每隔固定时间段触发一次窗口计算,时间语义为ProcessingTime。在测试过程中发现,使用ContinuousProcessingTimeTrigger会有一个问题:当窗口到达EndTime时并不会触发。
测试
在本地测试时使用自造数据:类别,数量,时间。然后统计每分钟的总量,每10秒钟触发一次窗口计算,并且触发窗口计算后立即清除已经计算过的所有数据,累计的总量值通过状态保存。
public class demo2 {
private static class DataSource extends RichParallelSourceFunction<Tuple3<String,Integer,String>>{
private volatile boolean isRunning=true;
@Override
public void run(SourceContext<Tuple3<String,Integer,String>> ctx) throws Exception{
Random random=new Random();
while(isRunning){
Thread.sleep((getRuntimeContext().getIndexOfThisSubtask()+1)*1000*8);
String key="类别"+(char)('A'+random.nextInt(1));
int value=random.nextInt(10)+1;
String dt=dateTime.transferLongToDate("yyyy-MM-dd HH:mm:ss.SSS",System.currentTimeMillis());
System.out.println(String.format("Emits\t(%s,%d,%s)",key,value,dt));
ctx.collect(new Tuple3<>(key,value,dt));
}
}
@Override
public void cancel(){
isRunning=false;
}
}
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
DataStream<Tuple3<String,Integer,String>> ds =env.addSource(new DataSource());
SingleOutputStreamOperator<String> res=ds
.keyBy(
(KeySelector<Tuple3<String, Integer,String>, String>) in -> in.f0
)
.window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))
.evictor(CountEvictor.of(0,true))
.process(new ProcessWindowFunction<Tuple3<String, Integer,String>, String, String, TimeWindow>() {
private static final long serialVersionUID = 3091075666113786631L;
private ValueState<Integer> valueState;
@Override
public void open(Configuration parameters) throws Exception {
ValueStateDescriptor<Integer> desc=new ValueStateDescriptor<>("value_state",Integer.class);
valueState=getRuntimeContext().getState(desc);
super.open(parameters);
}
@Override
public void process(String tuple, Context context, Iterable<Tuple3<String, Integer,String>> iterable, Collector<String> collector) throws Exception {
//测试输出:窗口的每次触发时间
System.out.println("trigger:"+dateTime.transferLongToDate("yyyy-MM-dd HH:mm:ss.SSS",context.currentProcessingTime()));
int res=0;
if(valueState.value()!=null){
res=valueState.value();
}
for(Tuple3<String, Integer,String> val:iterable){
res+=val.f1;
}
valueState.update(res);
String out=dateTime.transferLongToDate("yyyy-MM-dd HH:mm:ss",context.window().getStart())+
","+tuple.toString()+":"+valueState.value();
collector.collect(out);
}
@Override
public void clear(Context context) throws Exception {
//状态清理时间
System.out.println("Start Clear:"+dateTime.transferLongToDate("yyyy-MM-dd HH:mm:ss.SSS",System.currentTimeMillis()));
valueState.clear();
super.clear(context);
}
});
res.process(new ProcessFunction<String, Object>() {
@Override
public void processElement(String s, Context context, Collector<Object> collector) throws Exception {
System.out.println(s);
}
});
env.execute();
}
}
程序执行后的输出结果如下:
image.png
从上图可以看到在30/40/50这三个节点,窗口都触发了计算,并输出了正确的累计结果,但是在窗口结束的时间点并未触发计算
问题定位
看源码
- 属性声明
public class ContinuousProcessingTimeTrigger<W extends Window> extends Trigger<Object, W> {
private static final long serialVersionUID = 1L;
private final long interval;
/** When merging we take the lowest of all fire timestamps as the new fire timestamp. */
private final ReducingStateDescriptor<Long> stateDesc =
new ReducingStateDescriptor<>("fire-time", new Min(), LongSerializer.INSTANCE);
interval为传入的触发时间间隔;stateDesc是定义的ReduceState状态描述符,Min()代表选择的ReduceFunction,表示选择多个时间戳中时间最小的。
- onElement方法
@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
timestamp = ctx.getCurrentProcessingTime();
if (fireTimestamp.get() == null) {
long start = timestamp - (timestamp % interval);
long nextFireTimestamp = start + interval;
ctx.registerProcessingTimeTimer(nextFireTimestamp);
fireTimestamp.add(nextFireTimestamp);
return TriggerResult.CONTINUE;
}
return TriggerResult.CONTINUE;
}
onElement方法是用来初始化窗口的第一次的触发时间。
- onProcessingTime方法
@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
if (fireTimestamp.get().equals(time)) {
fireTimestamp.clear();
fireTimestamp.add(time + interval);
ctx.registerProcessingTimeTimer(time + interval);
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
onProcessingTime方法是基于ProcessingTime的回调方法,首先从状态中获取当前的触发时间,然后跟定时器中时间进行比对,如果两者相等,则清除状态值并重新初始化,然后更新注册下一次的定时器触发时间,最后触发窗口计算。
由onProcessingTime的代码推测,最后一次fireTimestamp和ctx.registerProcessingTimeTimer注册的时间已经超出了窗口的结束时间,导致在窗口结束时并不会触发最后一次计算。
- 测试代码验证
根据ContinuousProcessingTimeTrigger的源码新建一个MyContinuousProcessingTimeTrigger的类,修改其中的onProcessingTime方法:
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
if (fireTimestamp.get().equals(time)) {
fireTimestamp.clear();
fireTimestamp.add(time + interval);
ctx.registerProcessingTimeTimer(time + interval);
System.out.println("nextFireTime:"+dateTime.transferLongToDate("yyyy-MM-dd HH:mm:ss.SSS",time+this.interval));
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
然后再测试代码中使用MyContinuousProcessingTimeTrigger,测试输出如下:
image.png
前两次注册的40&50秒两个时间点都会正确触发,但17:00:00这个时间点因为此时窗口以及关闭(窗口的关闭时间:16:59:59.999),导致不会触发。
问题的源头以及确认,那接下来就是解决这个问题了。
解决途径
解决这个问题,同样需要去翻源码,我们在窗口的process方法中找到如下代码:
if (!windowAssigner.isEventTime() && isCleanupTime(triggerContext.window, timer.getTimestamp())) {
clearAllState(triggerContext.window, evictingWindowState, mergingWindows);
}
private void clearAllState(
W window,
ListState<StreamRecord<IN>> windowState,
MergingWindowSet<W> mergingWindows) throws Exception {
windowState.clear();
triggerContext.clear();
processContext.window = window;
processContext.clear();
if (mergingWindows != null) {
mergingWindows.retireWindow(window);
mergingWindows.persist();
}
}
可以看到,会有一个CleanupTime,当满足这个条件时,会清除窗口的信息。继续翻isCleanupTime这个方法:
/**
* Returns {@code true} if the given time is the cleanup time for the given window.
*/
protected final boolean isCleanupTime(W window, long time) {
return time == cleanupTime(window);
}
/**
* Returns the cleanup time for a window, which is
* {@code window.maxTimestamp + allowedLateness}. In
* case this leads to a value greater than {@link Long#MAX_VALUE}
* then a cleanup time of {@link Long#MAX_VALUE} is
* returned.
*
* @param window the window whose cleanup time we are computing.
*/
private long cleanupTime(W window) {
if (windowAssigner.isEventTime()) {
long cleanupTime = window.maxTimestamp() + allowedLateness;
return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
} else {
return window.maxTimestamp();
}
}
可以看到对于非EventTime的语义,cleanupTime就是窗口的结束时间window.maxTimestamp(),看到这里,解决问题的方法也就有了:
修改MyContinuousProcessingTimeTrigger中的onProcessingTime方法:
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
if(time==window.maxTimestamp()){
return TriggerResult.FIRE;
}
if (fireTimestamp.get().equals(time)) {
fireTimestamp.clear();
fireTimestamp.add(time + interval);
ctx.registerProcessingTimeTimer(time + interval);
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
测试结果:
image.png
可以看到在窗口结束时会触发正确的统计结果。
网友评论