美文网首页
watermark 生成方式

watermark 生成方式

作者: 无来无去_A | 来源:发表于2020-08-13 11:18 被阅读0次

     AssignerWithPeriodicWatermarks :
    周期插入watermark 到数据中
    适用于数据比较密集的情况下

     AssignerWithPunctuatedWatermarks
    每条数据经过后插入watermark 到数据中
    适用于数据比较稀疏的情况下

    Watermark的引入
    watermark的引入很简单,对于乱序数据,最常见的引用方式如下:

    dataStream.assignTimestampsAndWatermarks(
     new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.milliseconds(1000)) {
      override def extractTimestamp(element: SensorReading): Long = {
        element.timestamp * 1000
      }
    } )
    

    Event Time的使用一定要指定数据源中的时间戳。否则程序无法知道事件的事件时间是什么(数据源里的数据没有时间戳的话,就只能使用Processing Time了)。
    我们看到上面的例子中创建了一个看起来有点复杂的类,这个类实现的其实就是分配时间戳的接口。Flink暴露了TimestampAssigner接口供我们实现,使我们可以自定义如何从事件数据中抽取时间戳。

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    
    // 从调用时刻开始给env创建的每一个stream追加时间特性
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    
    val readings: DataStream[SensorReading] = env
    .addSource(new SensorSource)
    .assignTimestampsAndWatermarks(new MyAssigner())
    

    MyAssigner有两种类型
    AssignerWithPeriodicWatermarks
    AssignerWithPunctuatedWatermarks

    以上两个接口都继承自 TimestampAssigner
    Assigner with periodic watermarks
    周期性的生成watermark:系统会周期性的将watermark插入到流中(水位线也是一种特殊的事件!)。默认周期是200毫秒。可以使用ExecutionConfig.setAutoWatermarkInterval()方法进行设置。

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    
    // 每隔5秒产生一个watermark
    env.getConfig.setAutoWatermarkInterval(5000)
    

    产生watermark的逻辑:每隔5秒钟,Flink会调用AssignerWithPeriodicWatermarks的getCurrentWatermark()方法。如果方法返回一个时间戳大于之前水位的时间戳,新的watermark会被插入到流中。这个检查保证了水位线是单调递增的。如果方法返回的时间戳小于等于之前水位的时间戳,则不会产生新的watermark。
    例子,自定义一个周期性的时间戳抽取:

    class PeriodicAssigner extends AssignerWithPeriodicWatermarks[SensorReading] {
    val bound: Long = 60 * 1000 // 延时为1分钟
    var maxTs: Long = Long.MinValue // 观察到的最大时间戳
    
    override def getCurrentWatermark: Watermark = {
    new Watermark(maxTs - bound)
    }
    
    override def extractTimestamp(r: SensorReading, previousTS: Long) = {
    maxTs = maxTs.max(r.timestamp)
    r.timestamp
    }
    }
    

    一种简单的特殊情况是,如果我们事先得知数据流的时间戳是单调递增的,也就是说没有乱序,那我们可以使用assignAscendingTimestamps,这个方法会直接使用数据的时间戳生成watermark。

    val stream: DataStream[SensorReading] = ...
    val withTimestampsAndWatermarks = stream
    .assignAscendingTimestamps(e => e.timestamp)
    
    >> result:  E(1), W(1), E(2), W(2), ...
    

    而对于乱序数据流,如果我们能大致估算出数据流中的事件的最大延迟时间,就可以使用如下代码:

    val stream: DataStream[SensorReading] = ...
    val withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(
    new SensorTimeAssigner
    )
    
    class SensorTimeAssigner extends BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(5)) {
    // 抽取时间戳
    override def extractTimestamp(r: SensorReading): Long = r.timestamp
    }
    
    >> relust:  E(10), W(0), E(8), E(7), E(11), W(1), ...
    

    Assigner with punctuated watermarks
    间断式地生成watermark。和周期性生成的方式不同,这种方式不是固定时间的,而是可以根据需要对每条数据进行筛选和处理。直接上代码来举个例子,我们只给sensor_1的传感器的数据流插入watermark:

    class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[SensorReading] {
    val bound: Long = 60 * 1000
    
    override def checkAndGetNextWatermark(r: SensorReading, extractedTS: Long): Watermark = {
    if (r.id == "sensor_1") {
    new Watermark(extractedTS - bound)
    } else {
    null
    }
    }
    override def extractTimestamp(r: SensorReading, previousTS: Long): Long = {
    r.timestamp
    }
    }
    
    

    相关文章

      网友评论

          本文标题:watermark 生成方式

          本文链接:https://www.haomeiwen.com/subject/fedbdktx.html