本文档整理于Flink社区直播课
Flink窗口是实时处理非常重要的技术,广泛用于实时ETL、实时报表以及一些实时的监控。
学习路线

为什么要关心实现
- ReduceFunction为什么不用计算每个key的聚合值?
- 当key基数很大时,如何有效计算每个key窗口计算?
- 窗口计算的中间结果如何存储,何时被清理?
- 窗口计算如何容忍late data?
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }
实时数仓典型架构

Window应用场景

Window抽象概念

- TimeStampAssigner: 当基于Event Time时间语义时,定义数据流的event time并定义Watermark的生成算法。
- KeySelector:keyBy(col...),以什么字段分组,选择根据什么维度来做窗口聚合。
- WindowAssigner: 窗口分配器,定义即将到来的元素应该分配给哪一个窗口
- state:窗口状态,增量聚合函数时状态为ValueState,全量聚合函数时状态为一个ListState
- Trigger 触发器,定义窗口何时出发计算,发出计算结果
- Evictor 移除器,在ProcessWindowFunction调用之后或之前引用。可以从窗口中删除已经收集的元素。由于需要遍历所有元素,只能在没有定义增量聚合函数时使用。
- ProcessWindowFunction 全量窗口函数,可以访问窗口的元数据如窗口的开始时间以及结束时间,访问当前处理时间和Watermark,表达能力比增量集合函数强,代价是大状态,可以结合增量聚合函数使用。
Window编程接口

Window Assigner

Window Trigger
什么是Trigger
Flink中Trigger用于定义何时对窗口进行计算并发出结果,它的触发条件可以是时间也可以是某些特定条件。对于时间窗口而言,默认Trigger是Watermark大于窗口结束时间时触发。
在Flink窗口机制中,还有一个窗格的概念,它将窗口划分成多个规则的部分,这些部分可看作子窗口,可简单理解为对窗口再次分片。窗口则定义为一组key相同(分区操作),并且位于同一个窗口中的元素。每个窗格都有一个Trigger对象。
先看看Trigger类中的几个重要函数:
// 每当有元素添加到窗口都会调用
public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;
// 在处理时间计时器触发时调用
public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;
// 在事件时间计时器触发时调用
public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;
// 该函数会在清除窗口是调用
public abstract void clear(W window, TriggerContext ctx) throws Exception;
TriggerResult
每次调用触发器都会生成一个TriggerResult,它用于决定窗口接下来的行为。
- CONTINUE: 不做任何处理
- FIRE:触发计算
- PURGE:完全清除窗口内容,并删除窗口自身及元数据。
- FIRE_AND_PURGE:先进行窗口计算(FIRE),随后删除所有状态及元数据(PURGE)
Flink中Trigger的几个实现类:
内置Trigger | 说明 |
---|---|
ProcessingTimeTrigger | 一次触发,machine time大于窗口结束时间时触发 |
EventTimeTrigger | 一次触发,watermark大于窗口结束时间时触发 |
ContinuousProcessingTimeTrigger | 多次触发,基于processing time的固定时间间隔 |
ContinuousEventTimeTrigger | 多次触发 ,基于event time的固定时间间隔 |
CountTrigger | 多次触发,基于element的固定条数 |
DeltaTrigger | 多次触发,当前element与上次触发trigger的element做delta计算,超过threshold(阈值)时触发 |
PurgingTrigger | trigger wrapper,当nested trigger时触发,额外会清理窗口当前的中间状态 |
-
DeltaTrigger :会计算一个Delta值,那么到底是谁跟谁计算一个Delta呢?当前的element与上次触发Trigger的element做一个Delta计算.如果超过了指定的阈值,那么就触发计算.
-
PurgingTrigger:实际上是一个wrapper,是对上一个Trigger的包装,它可以对嵌套的Trigger做什么事情呢?让嵌套的Trigger自己去触发,一旦它触发的时候,可以给一个额外的功能,额外会清除窗口当前的中间状态.
Trigger实例
-
图trigger-1表示event time的数据流,定义5分钟的滚动窗口,窗口函数会计算事件的累计次数,同时使用基于event time的固定事件间隔的触发器将结构(窗口状态)sink到外部系统。
trigger-1
-
前面2min的数据进入窗口后,积累次数(作为窗口状态),由于定义了间隔2min的窗口触发器,此时结果将sink到外部系统。
trigger-2
-
第三分钟数据到达时,窗口状态更新,但还没有达到下次窗口出发的时间。
trigger-3
-
第4分钟数据进入窗口,更新窗口状态,达到窗口触发条件(间隔2min),发出结果,并对外部系统中已经存在的结果值发出更新。
trigger-4
Question:如果Result只能Append,不支持Update(如Druid),该如何解决呢?
PurgingTrigger:对于嵌套在内的触发器触发计算时,同时清除窗口中的状态。

Window Evictor

注:关于Window Trigger与Evictor的具体实现与案例,总结起来比较多,我会放在Trigger、Evictor源码篇里面去讲解。
网友评论