美文网首页玩转大数据
Flink 源码之Continuous Trigger

Flink 源码之Continuous Trigger

作者: AlienPaul | 来源:发表于2020-10-28 17:07 被阅读0次

Flink 系列博客

Flink QuickStart
Flink 双流操作
Flink on Yarn Kerberos的配置
Flink on Yarn部署和任务提交操作
Flink 配置Prometheus监控
Flink in docker 部署
Flink HA 部署
Flink 常见调优参数总结
Flink 源码之任务提交流程分析
Flink 源码之基本算子
Flink 源码之Trigger
Flink 源码之Evictor
Flink 源码之Window
Flink 源码之WindowOperator
Flink 源码之StreamGraph生成
Flink 源码之JobGraph生成
Flink 源码之两阶段提交
Flink 源码之分布式快照
Flink 源码之时间处理
Flink 源码之节点间通信
Flink 源码之Credit Based反压
Flink 源码之快照
Flink 源码之FlinkKafkaConsumer
Flink 源码之内存管理
Flink 源码之 1.11新特性Unaligned checkpoint
Flink 源码之TaskManager启动流程
Flink 源码之leader选举(Zookeeper方式)

什么是 Continuous Trigger

Continuous Trigger适用于时间跨度大的窗口计算场景。对于时间跨度大的窗口,看到计算结果必须等待很长的时间。比如一个长度为60分钟的窗口,我们需要等待60分钟才能看到计算结果。大家会想能否能实现这种效果:窗口内每隔10分钟计算一次中间结果输出给下游。这样我们能够清楚的看到窗口内元素的变化。Continuous Trigger正是为了满足这样的需求而诞生的。

Continuous Trigger根据Flink使用Event Time还是Processing Time分为如下两种:

  • ContinuousEventTimeTrigger
  • ContinuousProcessingTimeTrigger

分别适用于Event Time和Processing Time场景。

Event Time和Processing Time相关内容可参考:Flink 源码之时间处理

Trigger的相关介绍可参考:Flink 源码之Trigger

使用示例

我们举一个简单的例子:使用socket读入数据,依照processing time,使用长度为200s的窗口对元素进行聚合,求窗口内共有多少个元素。我们加入ContinuousProcessingTimeTrigger,每隔5s输出已经进入窗口的元素数量。

这个例子写成代码如下所示:

val env = StreamExecutionEnvironment.getExecutionEnvironment

val sourceStream = env.socketTextStream("192.168.100.128", 10000)

sourceStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(200)))
    .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)))
    .apply(new AllWindowFunction[String, Int, TimeWindow] {
      override def apply(window: TimeWindow, input: Iterable[String], out: Collector[Int]): Unit = {
        out.collect(input.size)
        println(s"window is: $window")
      }
    }).print()
env.execute()

创建Continuous Trigger只需传入一个参数:触发计算的间隔时间。

我们可以发现每隔5s都会打印出已经进入窗口的元素数量。窗口结束时间之前,所有的计算结果都是累计的,即Continuous Trigger触发计算的元素不会在计算之后清除。比如0-5s时候到来5个元素,5-10s时候又到来5个元素,那么上面的代码会输出:

5
window is: xxx
10
window is: xxx

这两次计算所在的window是同一个window。

如果时间已经超出了window的结束时间(这个例子中是200s之后),window中所有的元素会被清除,Continuous Trigger触发计算的元素不会再累计。

源代码分析

下面我们以ContinuousEventTimeTrigger为例分析下Continuous Trigger的具体实现。

成员变量

ContinuousEventTimeTrigger的成员变量和解释如下所示:

// 窗口内计算的触发时间间隔
private final long interval;

/** When merging we take the lowest of all fire timestamps as the new fire timestamp. */
// 状态变量描述,是一个ReducingState,用来储存下一次触发计算的时间
private final ReducingStateDescriptor<Long> stateDesc =
        new ReducingStateDescriptor<>("fire-time", new Min(), LongSerializer.INSTANCE);

ContinuousEventTimeTrigger 源码分析

一句话描述ContinuousEventTimeTrigger的原理:ContinuousEventTimeTrigger是有状态的,它保存了下一次需要触发计算的时间点。在元素到来的时候(onElement方法)更新这个状态变量,设置event time定时器,在触发event time的时候(onEventTime方法)判断是否需要触发计算。

下面我们逐个分析这些方法。

ContinuousEventTimeTriggeronElement方法:

@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {

    if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
        // if the watermark is already past the window fire immediately
        // 如果window的截止时间比watermark还靠前,说明window的元素全部到齐,可以触发计算
        return TriggerResult.FIRE;
    } else {
        // 否则在window结束时间设置一个定时器
        // 窗口结束的时候必须触发计算
        ctx.registerEventTimeTimer(window.maxTimestamp());
    }

    // 获取下次触发窗口计算的时间
    ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
    // 如果保存的有时间
    if (fireTimestamp.get() == null) {
        // 获取小于当前timestamp最大的invertal整数倍
        long start = timestamp - (timestamp % interval);
        // 加上interval,作为下一次触发计算的时间
        // 即刚好比timestamp大的下一个interval整数倍时间作为触发计算的时间
        long nextFireTimestamp = start + interval;
        // 把这个时间注册到event time定时器
        ctx.registerEventTimeTimer(nextFireTimestamp);
        // 保存触发时间到状态变量
        fireTimestamp.add(nextFireTimestamp);
    }

    // 不触发计算
    return TriggerResult.CONTINUE;
}

onEventTime方法,当event time定时器时间到的时候触发(ctx.registerEventTimeTimer()):

@Override
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {

    // 如果时间为window的结束时间,触发计算
    if (time == window.maxTimestamp()){
        return TriggerResult.FIRE;
    }

    // 获取状态变量
    ReducingState<Long> fireTimestampState = ctx.getPartitionedState(stateDesc);

    // 获取触发时间
    Long fireTimestamp = fireTimestampState.get();

    // 如果触发事件等于当前时间
    if (fireTimestamp != null && fireTimestamp == time) {
        // 清除状态变量
        fireTimestampState.clear();
        // 设置下一个触发时间为interval之后
        fireTimestampState.add(time + interval);
        // 注册event time定时器
        ctx.registerEventTimeTimer(time + interval);
        // 触发计算
        return TriggerResult.FIRE;
    }
    // 否则不触发计算
    return TriggerResult.CONTINUE;
}

如果窗口的所有元素都计算完毕(当前时间是window的结束时间),WindowOperator会调用clearAllState方法,清除window内的元素(详细可参考:Flink 源码之WindowOperator)。该方法调用了TriggerContextclear方法。最终调用了trigger对象的clear方法。如下所示:

@Override
public void clear(W window, TriggerContext ctx) throws Exception {
    // 获取状态变量
    ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
    // 获取触发时间
    Long timestamp = fireTimestamp.get();
    // 如果保存的有触发时间
    if (timestamp != null) {
        // 删除触发事件定时器
        ctx.deleteEventTimeTimer(timestamp);
        // 清除该状态变量
        fireTimestamp.clear();
    }
}

ContinuousProcessingTimeTriggerContinuousEventTimeTrigger逻辑基本相同,只不过ContinuousProcessingTimeTrigger逻辑实现在了onProcessingTime中,不再赘述。

相关文章

网友评论

    本文标题:Flink 源码之Continuous Trigger

    本文链接:https://www.haomeiwen.com/subject/aplcvktx.html