美文网首页Flink深度解析
Flink生成Timestamps和Watermarks

Flink生成Timestamps和Watermarks

作者: 尼小摩 | 来源:发表于2019-02-02 14:31 被阅读158次

    本章节是关于在event time上执行的程序。有关event time, processing time, and ingestion time的更多介绍,请参阅事件时间(Event Time)

    为了与event time结合使用,流程序需要相应地设置一个时间特性。

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    

    分配时间戳(Assigning Timestamps)

    为了让event time工作,Flink需要知道事件的时间戳,这意味着流中的每个元素都需要分配其事件时间戳。这个通常是通过抽取或者访问事件中某些字段的时间戳来获取的。

    时间戳的分配伴随着水印的生成,告诉系统事件时间中的进度。

    这里有两种方式来分配时间戳和生成水印:
    1. 直接在数据流源中进行。
    2. 通过timestamp assignerwatermark generator生成:在Flink中,timestamp分配器也定义了用来发射的水印。

    注意:timestamp和watermark都是通过从1970年1月1日0时0分0秒到现在的毫秒数来指定的。

    带有Timestamp和Watermark的源函数(Source Function with Timestamps And Watermarks)

    数据流源可以直接为它们产生的数据元素分配timestamp,并且他们也能发送水印。这样做的话,就没必要再去定义timestamp分配器了,需要注意的是:如果一个timestamp分配器被使用的话,由源提供的任何timestampwatermark都会被重写。

    为了通过源直接为一个元素分配一个timestamp,源需要调用SourceContext中的collectWithTimestamp(...)方法。为了生成watermark,源需要调用emitWatermark(Watermark)方法。

    下面是一个简单的(无checkpoint)由源分配timestamp和产生watermark的例子:

    override def run(ctx: SourceContext[MyType]): Unit = {
        while (/* condition */) {
            val next: MyType = getNext()
            ctx.collectWithTimestamp(next, next.eventTimestamp)
    
            if (next.hasWatermarkTime) {
                ctx.emitWatermark(new Watermark(next.getWatermarkTime))
            }
        }
    }
    

    时间戳分配器/水印生成器(Timestamp Assigners / Watermark Generators)

    Timestamp分配器获取一个流并生成一个新的带有Timestamp元素和水印的流。如果原始流已经有时间戳和/或水印,则Timestamp分配程序将覆盖它们

    Timestamp分配器通常在数据源之后立即指定,但这并不是严格要求的。通常是在timestamp分配器之前先解析(MapFunction)和过滤(FilterFunction)。在任何情况下,都需要在事件时间上的第一个操作(例如第一个窗口操作)之前指定timestamp分配程序。有一个特殊情况,当使用Kafka作为流作业的数据源时,Flink允许在源内部指定timestamp分配器和watermark生成器。更多关于如何进行的信息请参考Kafka Connector的文档。

    接下来的部分展示了要创建自己的timestamp 抽取器和watermark发射器,程序员需要实现的主要接口。想要查看Flink预定义的抽取器,

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    
    val stream: DataStream[MyEvent] = env.readFile(
             myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
             FilePathFilter.createDefaultFilter())
    
    val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
            .filter( _.severity == WARNING )
            .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())
    
    withTimestampsAndWatermarks
            .keyBy( _.getGroup )
            .timeWindow(Time.seconds(10))
            .reduce( (a, b) => a.add(b) )
            .addSink(...)
    

    周期性水印(With Periodic Watermarks)

    AssignerWithPeriodicWatermarks分配时间戳并定期生成水印(这可能依赖于流元素,或者纯粹基于处理时间)。

    watermark生成的时间间隔(每n毫秒)是通过ExecutionConfig.setAutoWatermarkInterval(…)定义的。每次调用分配器的getCurrentWatermark()方法时,如果返回的watermark非空且大于前一个watermark,则会发出新的watermark

    这里我们展示了两个使用周期性水印生成的时间戳分配器的简单示例。请注意,Flink附带了一个BoundedOutOfOrdernessTimestampExtractor,类似于下面所示的BoundedOutOfOrdernessGenerator,您可以在这里阅读相关内容。

    /**
     * This generator generates watermarks assuming that elements arrive out of order,
     * but only to a certain degree. The latest elements for a certain timestamp t will arrive
     * at most n milliseconds after the earliest elements for timestamp t.
     */
    class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
    
        val maxOutOfOrderness = 3500L // 3.5 seconds
    
        var currentMaxTimestamp: Long = _
    
        override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
            val timestamp = element.getCreationTime()
            currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
            timestamp
        }
    
        override def getCurrentWatermark(): Watermark = {
            // return the watermark as current highest timestamp minus the out-of-orderness bound
            new Watermark(currentMaxTimestamp - maxOutOfOrderness)
        }
    }
    
    /**
     * This generator generates watermarks that are lagging behind processing time by a fixed amount.
     * It assumes that elements arrive in Flink after a bounded delay.
     */
    class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
    
        val maxTimeLag = 5000L // 5 seconds
    
        override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
            element.getCreationTime
        }
    
        override def getCurrentWatermark(): Watermark = {
            // return the watermark as current time minus the maximum time lag
            new Watermark(System.currentTimeMillis() - maxTimeLag)
        }
    }
    

    带断点的水印(With Punctuated Watermarks)

    无论何时,当某一事件表明需要创建新的watermark时,使用AssignerWithPunctuatedWatermarks创建。这个类首先调用extractTimestamp(…)方法来为元素分配一个时间戳,然后立即调用该元素上的checkAndGetNextWatermark(…)方法。

    checkAndGetNextWatermark(…)方法传入在给extractTimestamp(…)方法中分配的timestamp,并可以决定是否要生成watermark。每当checkAndGetNextWatermark(…)方法返回一个非空watermark并且该watermark大于最新的前一个watermark时,就会发出新的watermark

    class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] {
    
        override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
            element.getCreationTime
        }
    
        override def checkAndGetNextWatermark(lastElement: MyEvent, extractedTimestamp: Long): Watermark = {
            if (lastElement.hasWatermarkMarker()) new Watermark(extractedTimestamp) else null
        }
    }
    

    注意: 可以在每个事件上生成一个watermark。但是,由于每个watermark都会导致下游的一些计算,过多的watermark会降低性能。

    每个Kafka分区的Timestamp(TimeStamps per Kafka Partion)

    当使用Apache Kafka作为数据源时,每个Kafka分区可能有一个简单的事件时间模式(递增timestamp或有界的无序)。然而,当使用来自Kafka的流时,多个分区通常是并行使用的,将事件与分区交叉,破坏了每个分区的数据模型(这是Kafka消费者客户端所固有的工作方式)

    在这种情况下,您可以使用Flink支持Kafka-partition-aware生成水印。该特性可以在Kafka消费者内部生成watermarks,每个分区的watermarks合并方式与流shuffles时合并watermarks的方式相同。

    例如,如果事件时间戳严格按照Kafka分区递增排列,那么使用升序时间戳水印生成器生成每个分区的水印将产生完美的整体水印。

    下图展示了如何使用每个kafka分区生成水印,以及在这种情况下水印如何通过流数据传播。

    val kafkaSource = new FlinkKafkaConsumer09[MyType]("myTopic", schema, props)
    kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[MyType] {
        def extractAscendingTimestamp(element: MyType): Long = element.eventTimestamp
    })
    
    val stream: DataStream[MyType] = env.addSource(kafkaSource)
    

    相关文章

      网友评论

        本文标题:Flink生成Timestamps和Watermarks

        本文链接:https://www.haomeiwen.com/subject/nnjssqtx.html