ProcessFunction是一个低级的流处理操作,可以访问所有(非循环)流应用程序的基本构建块:
- events (流元素)
- state (容错,一致性,只在Keyed Stream)
- timers (事件时间和处理时间, 只在keyed stream)
ProcessFunction
可以看作是一个具有键控状态(keyed state)和计时器(timers)访问权的FlatMapFunction
。它通过调用输入流中接收的每个事件来处理事件。对于容错状态,ProcessFunction
允许访问Flink的键状态,可以通过RuntimeContext
访问,类似于其他有状态函数访问键状态的方式。
计时器(timers)允许应用程序对处理时间(processing time)和事件时间(event time)中的变更作出响应。processElement(…)
函数的每次调用都获得一个上下文(Context )对象,该对象可以访问元素的事件时间戳
和TimerService
。TimerService
可用于为将来的事件时间/处理时间注册回调。当到达计时器的特定时间时,将调用onTimer(…)
方法。在调用期间,所有状态的作用域再次限定为创建计时器的键,允许计时器操作键控状态。
注意:如果你想访问键控状态和定时器,你必须在一个键控流上应用ProcessFunction:
stream.keyBy(...).process(new MyProcessFunction())
要实现对两个输入的低级操作,应用程序可以使用CoProcessFunction(协处理器功能)。这个函数绑定了两个不同的输入,并获取对processElement1(…)
和processElement2(…)
的单独调用,以获取来自两个不同输入的记录。
实现低级连接通常遵循以下模式:
- 为一个(或两个)输入创建状态对象。
- 从输入接收元素时更新状态。
- 从其他输入接收元素后,探测状态并生成join的结果。
例如,将客户数据与金融交易join起来,同时保留客户数据的状态。如果在无序事件中,当客户数据流的水印超过交易时间时,可以使用计时器计算并发出交易的连接。
例子
下面的示例维护每个键的计数,并在没有更新该键的情况下,每分钟过去(在事件时间中)时发出键/计数对:
- 计数、键和最后修改时间戳存储在
ValueState
中,这是键的隐式作用域。 - 对于每个记录,
ProcessFunction
递增计数器并设置最后修改时间戳。 - 该函数还将回调时间安排在一分钟以后(在事件时间内)
- 在每次回调时,会检查回调的事件时间戳和存储计数的最后一次修改时间,如果它们匹配,则发出键/计数(在该分钟内没有更新)。
这个简单的示例可以用会话窗口实现。在这里使用ProcessFunction
来说明它提供的基本模式。
import org.apache.flink.api.common.state.ValueState
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.ProcessFunction.Context
import org.apache.flink.streaming.api.functions.ProcessFunction.OnTimerContext
import org.apache.flink.util.Collector
// the source data stream
val stream: DataStream[Tuple2[String, String]] = ...
// apply the process function onto a keyed stream
val result: DataStream[Tuple2[String, Long]] = stream
.keyBy(0)
.process(new CountWithTimeoutFunction())
/**
* The data type stored in the state
*/
case class CountWithTimestamp(key: String, count: Long, lastModified: Long)
/**
* The implementation of the ProcessFunction that maintains the count and timeouts
*/
class CountWithTimeoutFunction extends ProcessFunction[(String, String), (String, Long)] {
/** The state that is maintained by this process function */
lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext
.getState(new ValueStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp]))
override def processElement(value: (String, String), ctx: Context, out: Collector[(String, Long)]): Unit = {
// initialize or retrieve/update the state
val current: CountWithTimestamp = state.value match {
case null =>
CountWithTimestamp(value._1, 1, ctx.timestamp)
case CountWithTimestamp(key, count, lastModified) =>
CountWithTimestamp(key, count + 1, ctx.timestamp)
}
// write the state back
state.update(current)
// schedule the next timer 60 seconds from the current event time
ctx.timerService.registerEventTimeTimer(current.lastModified + 60000)
}
override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[(String, Long)]): Unit = {
state.value match {
case CountWithTimestamp(key, count, lastModified) if (timestamp == lastModified + 60000) =>
out.collect((key, count))
case _ =>
}
}
}
在Flink 1.4.0之前,当从处理时计时器调用ProcessFunction.onTimer()
方法时,它将当前处理时间设置为事件时间戳。这种行为非常微妙,用户可能不会注意到。它是有害的,因为处理时间戳是不确定的,并且与水印不一致。此外,用户实现的逻辑依赖于这个错误的时间戳,这很可能是无心的错误。所以我们决定修复它。当升级到1.4.0时,使用这个错误事件时间戳的Flink作业将失败,用户应该使其作业适应正确的逻辑。
KeyedProcessFunction
KeyedProcessFunction
作为ProcessFunction
的扩展,在onTimer(…)
方法中提供对定时器键的访问。
override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT]): Unit = {
var key = ctx.getCurrentKey
// ...
}
定时器(Timers)
这两种类型的计时器:处理时间和事件时间都由TimerService
在内部维护并排队等待执行。
TimerService
根据每个键和时间戳删除重复计时器,每个键和时间戳最多有一个计时器。如果为相同的时间戳注册了多个计时器,则只调用onTimer()
方法一次。
注意:Flink同步调用onTimer()
和processElement()
方法。因此,用户不必担心状态的并发修改。
故障容错(Fault Tolerance)
计时器与应用程序的状态都具有容错和检查点功能。在故障恢复或从保存点启动应用程序时,计时器将被恢复。
注意:在恢复之前应该立即触发的检查点处理时间计时器。这可能发生在应用程序从故障中恢复或从保存点启动时。
注意:计时器是异步检查点,除了RocksDB后端/增量快照/基于堆的计时器的组合之外(将使用FLINK-10026解析)。注意,大量计时器会增加时间检查点,因为计时器是检查点状态的一部分。有关如何减少计时器数量的建议,请参阅“计时器合并(Timer Coalescing)”一节。
计时器合并(Timer Coalescing)
由于Flink对每个键和时间戳只维护一个计时器,可以通过降低计时器的精度来合并计时器,从而减少计时器的数量。
对于1秒的定时器精度(事件或处理时间),你可以把目标时间四舍五入到整秒。计时器的启动时间最多提前1秒,但不会晚于要求的毫秒精度。因此,每个键和秒最多有一个计时器。
val coalescedTime = ((ctx.timestamp + timeout) / 1000) * 1000
ctx.timerService.registerProcessingTimeTimer(coalescedTime)
由于事件时间定时器只触发带有水印的情况,也可以使用当前的水印调度并结合这些定时器与下一个水印:
val coalescedTime = ctx.timerService.currentWatermark + 1
ctx.timerService.registerEventTimeTimer(coalescedTime)
计时器也可以停止和删除如下:
停止处理时间(processing-time)计时器:
val timestampOfTimerToStop = ...
ctx.timerService.deleteProcessingTimeTimer(timestampOfTimerToStop)
停止事件时间(event-time)计时器:
val timestampOfTimerToStop = ...
ctx.timerService.deleteEventTimeTimer(timestampOfTimerToStop)
如果没有注册具有给定时间戳的计时器,则停止计时器无效。
网友评论