美文网首页Flink
flink的window和watermark的简单说明

flink的window和watermark的简单说明

作者: 傻疯子 | 来源:发表于2021-12-08 21:34 被阅读0次

    window

    window窗口:通过将流人为的切割成一块一块进行统计

    常用类型有Tumbling Windows和Sliding Windows,还有Session Windows以及可以自定义窗口。

    Tumbling Windows是滚动窗口。根据时间或数量条件划分窗口大小,分成若干不重叠的块。
    Sliding Windows是滑动窗口。根据时间或数量条件划分窗口大小以及每次移动的时间或数量。

    windowAPI

    滚动窗口

    stream.flatMap(_.split(" "))
          .map((_,1))
          .keyBy(0)
          //窗口大小
          //滚动时间窗口
          .timeWindow(Time.seconds(10))
           //滚动计数窗口
           //.countWindow(5)
          .sum(1)
    

    滑动窗口

       stream.flatMap(_.split(" "))
            .map((_,1))
            .keyBy(_._1)
            //窗口大小和滑动间隔
            //滑动时间窗口
            .timeWindow(Time.seconds(10),Time.seconds(5))
            //滑动计数窗口
            //.countWindow(5,1)
            .sum(1)
    

    window是keyby后并行计算的窗口
    windowAll是不对数据进行分组

    // Keyed Window
    stream
    .keyBy(...) <- 按照一个Key进行分组
    .window(...) <- 将数据流中的元素分配到相应的窗口中
    [.trigger(...)] <- 指定触发器Trigger(可选)
    [.evictor(...)] <- 指定清除器Evictor(可选)
    .reduce/aggregate/process() <- 窗口处理函数Window Function
    
    // Non-Keyed Window
    stream
    .windowAll(...) <- 不分组,将数据流中的所有元素分配到相应的窗口中
    [.trigger(...)] <- 指定触发器Trigger(可选)
    [.evictor(...)] <- 指定清除器Evictor(可选)
    .reduce/aggregate/process() <- 窗口处理函数Window Function
    

    Trigger在获取之前就进行筛选
    Evictor在获取之后进行筛选

    window和windowAll中可以传以下继承WindowAssigner的自定义窗口
    TumblingEventTimeWindows
    TumblingProcessingTimeWindows
    SlidingEventTimeWindows
    SlidingProcessingTimeWindows
    EventTimeSessionWindows
    ProcessingTimeSessionWindows

    watermark

    watermark是给予窗口计算一个等待机制,因为数据有时候不是按顺序来,可能会迟到,所以希望能等到窗口内数据到齐后再进行计算
    数据有一个产生EventTime,结合watermark进行使用

    使用数据事件的时间

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    

    产生水位线

    env.getConfig.setAutoWatermarkInterval(200)
    
    //规定允许乱序时间和提取时间戳
    val waterMarkStream = tupStream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10))
          .withTimestampAssigner(new SerializableTimestampAssigner[Tuple2[String, Long]] {
      override def extractTimestamp(element: (String, Long), recordTimestamp: Long): Long = {
               element._2
    }
          })
    

    然后再将waterMarkStream进行窗口函数的操作,此外可以使用sideOutputLateData保存乱序未按时到达的数据

    //保存丢弃的数据
    val Stream = waterMarkStream.keyBy(0)
          .window(TumblingEventTimeWindows.of(Time.seconds(3)))
          .sideOutputLateData(outputTag)
          .(窗口处理函数)
    //获得被丢弃数据
    val sideOutput = resStream.getSideOutput(outputTag)
    

    此外还可以通过allowedLateness允许获取迟到的数据,当到达后重新计算

    val Stream = waterMarkStream.keyBy(0)
          .allowedLateness(Time.seconds(2))
          .sideOutputLateData(outputTag)
          .(窗口处理函数)
    

    另外需要注意的是如果是在并行执行的情况下,窗口需要等所有线程都触发watermark才会执行

    相关文章

      网友评论

        本文标题:flink的window和watermark的简单说明

        本文链接:https://www.haomeiwen.com/subject/hdntfrtx.html