AssignerWithPeriodicWatermarks :
周期插入watermark 到数据中
适用于数据比较密集的情况下
AssignerWithPunctuatedWatermarks
每条数据经过后插入watermark 到数据中
适用于数据比较稀疏的情况下
Watermark的引入
watermark的引入很简单,对于乱序数据,最常见的引用方式如下:
dataStream.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.milliseconds(1000)) {
override def extractTimestamp(element: SensorReading): Long = {
element.timestamp * 1000
}
} )
Event Time的使用一定要指定数据源中的时间戳。否则程序无法知道事件的事件时间是什么(数据源里的数据没有时间戳的话,就只能使用Processing Time了)。
我们看到上面的例子中创建了一个看起来有点复杂的类,这个类实现的其实就是分配时间戳的接口。Flink暴露了TimestampAssigner接口供我们实现,使我们可以自定义如何从事件数据中抽取时间戳。
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 从调用时刻开始给env创建的每一个stream追加时间特性
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val readings: DataStream[SensorReading] = env
.addSource(new SensorSource)
.assignTimestampsAndWatermarks(new MyAssigner())
MyAssigner有两种类型
AssignerWithPeriodicWatermarks
AssignerWithPunctuatedWatermarks
以上两个接口都继承自 TimestampAssigner
Assigner with periodic watermarks
周期性的生成watermark:系统会周期性的将watermark插入到流中(水位线也是一种特殊的事件!)。默认周期是200毫秒。可以使用ExecutionConfig.setAutoWatermarkInterval()方法进行设置。
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 每隔5秒产生一个watermark
env.getConfig.setAutoWatermarkInterval(5000)
产生watermark的逻辑:每隔5秒钟,Flink会调用AssignerWithPeriodicWatermarks的getCurrentWatermark()方法。如果方法返回一个时间戳大于之前水位的时间戳,新的watermark会被插入到流中。这个检查保证了水位线是单调递增的。如果方法返回的时间戳小于等于之前水位的时间戳,则不会产生新的watermark。
例子,自定义一个周期性的时间戳抽取:
class PeriodicAssigner extends AssignerWithPeriodicWatermarks[SensorReading] {
val bound: Long = 60 * 1000 // 延时为1分钟
var maxTs: Long = Long.MinValue // 观察到的最大时间戳
override def getCurrentWatermark: Watermark = {
new Watermark(maxTs - bound)
}
override def extractTimestamp(r: SensorReading, previousTS: Long) = {
maxTs = maxTs.max(r.timestamp)
r.timestamp
}
}
一种简单的特殊情况是,如果我们事先得知数据流的时间戳是单调递增的,也就是说没有乱序,那我们可以使用assignAscendingTimestamps,这个方法会直接使用数据的时间戳生成watermark。
val stream: DataStream[SensorReading] = ...
val withTimestampsAndWatermarks = stream
.assignAscendingTimestamps(e => e.timestamp)
>> result: E(1), W(1), E(2), W(2), ...
而对于乱序数据流,如果我们能大致估算出数据流中的事件的最大延迟时间,就可以使用如下代码:
val stream: DataStream[SensorReading] = ...
val withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(
new SensorTimeAssigner
)
class SensorTimeAssigner extends BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(5)) {
// 抽取时间戳
override def extractTimestamp(r: SensorReading): Long = r.timestamp
}
>> relust: E(10), W(0), E(8), E(7), E(11), W(1), ...
Assigner with punctuated watermarks
间断式地生成watermark。和周期性生成的方式不同,这种方式不是固定时间的,而是可以根据需要对每条数据进行筛选和处理。直接上代码来举个例子,我们只给sensor_1的传感器的数据流插入watermark:
class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[SensorReading] {
val bound: Long = 60 * 1000
override def checkAndGetNextWatermark(r: SensorReading, extractedTS: Long): Watermark = {
if (r.id == "sensor_1") {
new Watermark(extractedTS - bound)
} else {
null
}
}
override def extractTimestamp(r: SensorReading, previousTS: Long): Long = {
r.timestamp
}
}
网友评论