美文网首页
spark(strcuted streaming)和flink的

spark(strcuted streaming)和flink的

作者: 王金松 | 来源:发表于2019-05-14 23:29 被阅读0次

    watermarker

    watermark是一种衡量Event Time进展的机制,它是数据本身的一个隐藏属性。通常基于Event Time的数据,自身都包含一个timestamp。watermark的含义表示晚于这个时间的数据都已经处理完,如果还有的话直接抛弃掉

    作用

    watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用watermark机制结合window来实现。
    我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生(out-of-order或者说late element)。
    但是对于late element,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark。

    structed streaming

    表示方法
    .withWatermark("timestamp", "10 minutes")
    
    图表解释
    watermark

    上图为trigger=5min,window=10min,watermark-threshold=10min的图表
    一个trigger的水位线等于上一个trigger中的最大eventtime-threshold
    如上图
    12:15-12:20这个trigger的水位线等于12:10-12:15这个trigger的最大eventTime(12:14-10) = 12:04
    12:20-12:25这个trigger的水位线等于12:15-12:20这个trigger的最大eventTime(12:21-10) = 12:11

    flink watermark

    表示方法
    • source之后直接加watermark
    val processedData = env.addSource(consumer).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks)
    
    • 做一定的预处理之后加watermark
     val processedData = env.addSource(consumer)
          .map(log => {
            preprocess(log)
          }).filter(_ != null).assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks)
    
    Watermakers的实现

    基于事件时间

    processedData.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[FlowLog] {
          var currentMaxTimestamp = 0l
          final val maxOutOfOrderness = 30000l
          override def getCurrentWatermark: Watermark = {
            new Watermark(currentMaxTimestamp - maxOutOfOrderness)
          }
    
          override def extractTimestamp(element: FlowLog, previousElementTimestamp: Long): Long = {
            val timestamp = element.start_timestamp
            currentMaxTimestamp = math.max(timestamp, currentMaxTimestamp)
            timestamp
          }
        })
    

    基于系统时间

    processedData.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[FlowLog] {
          final val maxOutOfOrderness = 30000l
          override def getCurrentWatermark: Watermark = {
            new Watermark(System.currentTimeMillis() - maxOutOfOrderness)
          }
    
          override def extractTimestamp(element: FlowLog, previousElementTimestamp: Long): Long = {
            val timestamp = element.start_timestamp
            timestamp
          }
        })
    

    可以看到watermark=math.max(element.timestamp, currentMaxTimestamp) - maxOutOfOrderness

    • maxOutOfOrderness就是上面strcuted streaming中所说的threshold,所以从上面代码来看flink的watermark也是之前数据的最大事件时间-threshold
    参考

    https://www.jianshu.com/p/f90831c1e96d

    相关文章

      网友评论

          本文标题:spark(strcuted streaming)和flink的

          本文链接:https://www.haomeiwen.com/subject/flwyaqtx.html