美文网首页Flink
Flink-1.12(六)Flink时间语义及Watermark

Flink-1.12(六)Flink时间语义及Watermark

作者: _大叔_ | 来源:发表于2021-06-25 09:36 被阅读0次

    时间语义

    flink 有三种时间:EventTime,表示数据最初的触发时间;IngestionTime,数据进入Flink的时间,是DataSource拿到数据的时间;ProcessingTime,执行操作算子的本地系统时间,与机器相关。

    flink1.12 中默认时间语义是 EventTime,在实际处理数据,大多也都是以 EventTime 为主。因为数据可能受于网络的影响,或其他因素导致乱序数据的产生。

    Watermark

    那对于乱序数据我们就可以使用 Watermark,我们来举个例子说明 flink 中的 Watermark 是什么:比如定点9点上车,但是往往有人9.01才来,那 Watermark 的做法就是把自己的时间调慢,也就是8.59分,9.01的人来了 对于我来说还是9点。如果有人9.02来了,那其实可以配合 window 的延迟,我先输出数据开车,你来弯道超车,然后我重新计算输出新数据,但非有人,9.03来呢,那我不管你了,我开车上高速,你可以坐下一辆车(侧输出流)。

    Watermark 的时间不宜设置太大,因为拿到数据时间可能是准确的,但是拿到数据就会很慢。相当于9点发车,但我想等人到齐(数据),我设置到了 8.30,我需要在等 30分钟才能发车,此时我到站 比别人还晚30分钟。

    以下演示 Watermark 时间乱序处理,和配合window的函数处理

    Watermark 在分区中会计算最小事件时钟,保证下游数据收到每个事件时钟,并根据自己的时钟得出是否计算结果并关闭窗口。Watermark 是以广播的形式把事件时钟传递给每个下游。

    watermark 的特性主要有以下几点:

    • 数据流中的Watermark用于表示timestamp小于Watermark的数据都已经到达,因此Window的执行是由Watermark触发的
    • Watermark是一条特殊的数据记录
    • Watermark必须单调递增,以确保任务的事件时间时钟向前推进,不可逆
    代码演示
    public class EventData{
    
        private Integer id;
        private Long eventTime;
        private String data;
        private Integer num;
    
        public EventData(){
    
        };
    
        public Integer getNum() {
            return num;
        }
    
        public void setNum(Integer num) {
            this.num = num;
        }
    
        public Integer getId() {
            return id;
        }
    
        public void setId(Integer id) {
            this.id = id;
        }
    
        public Long getEventTime() {
            return eventTime;
        }
    
        public void setEventTime(Long eventTime) {
            this.eventTime = eventTime;
        }
    
        public String getData() {
            return data;
        }
    
        public void setData(String data) {
            this.data = data;
        }
    
        @Override
        public String toString() {
            return "EventData{" +
                    "id=" + id +
                    ", eventTime=" + eventTime +
                    ", data='" + data + '\'' +
                    ", num=" + num +
                    '}';
        }
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            DataStream<String> dataStream = env.socketTextStream("192.168.200.58", 7777);
            // 数据转换
            DataStream<EventData> stream = dataStream.map(new MapFunction<String, EventData>() {
                @Override
                public EventData map(String value) throws Exception {
                    String[] strs = value.split(",");
                    EventData eventData = new EventData();
                    eventData.setId(Integer.valueOf(strs[0]));
                    eventData.setEventTime(Long.valueOf(strs[1]));
                    eventData.setData(strs[2]);
                    eventData.setNum(Integer.valueOf(strs[3]));
                    return eventData;
    
                }
            }).assignTimestampsAndWatermarks(
                new BoundedOutOfOrdernessTimestampExtractor<EventData>(Time.seconds(5)) {
                    //提取时间戳
                    @Override
                    public long extractTimestamp(EventData element) {
                        return element.getEventTime() * 1000L;
                    }
                }
            );
            stream.print("WM: ");
            // 基于事件时间的开窗聚合,统计15秒内数据的最小ID值
            stream.keyBy(new KeySelector<EventData, Object>() {
                        @Override
                        public Object getKey(EventData value) throws Exception {
                            return value.getData();
                        }
                    })
                    .timeWindow(Time.seconds(15))
    //                .window(TumblingEventTimeWindows.of(Time.seconds(15)))
                    .sum("num")
                    .print("result: ");
            env.execute("test");
        }
    

    setAutoWatermarkInterval 是设置 Watermark 的生成时间,默认是 0,也就是来一个数据我对这个数据生成一个 Watermark时钟,用于去比较窗口函数的时间来触发计算。可以手动设置,在庞大的数据量,每生成一个 Watermark 有些费性能。以下为隔100ms生成一次。

    env.getConfig().setAutoWatermarkInterval(100);
    

    测试数据如下:

    6,1623051400,test data,1
    6,1623051401,test data,1
    6,1623051402,test data,1
    6,1623051405,test data,3
    6,1623051406,test data,3
    6,1623051409,test data,3
    6,1623051410,test data,5
    

    你会发现当你的 timestamp 为 1623051410,会触发窗口计算,但输出的num=3,因为 Watermark 创建窗口是会自动创建,会根据你的第一个数据的 timestamp 以及 timeWindow 窗口的值计算窗口的 startTime,计算方式

    # startTime
    timestamp - (timestamp - offset + windowSize) % windowSize;
    # endTime
    startTime + windowSize
    

    其中默认情况 offset = 0,根据以上计算得出如下

    startTime     timestamp
    1623051390000 1623051400000
    1623051390000 1623051401000
    1623051390000 1623051402000
    1623051405000 1623051405000
    1623051405000 1623051406000
    1623051405000 1623051409000
    1623051405000 1623051410000
    

    会看到 400000(timestamp) 创建的 startTime 是 390000,405000(timestamp) 创建的 startTime 是 405000,他们被分为了两个窗口。而他们的 405000-390000=15000=15s,刚好就是我们的 windowSize,也就是说 405000 的触发点在 420000 ,但我们还给 Watermark 延迟了 5s,也就是说正确的关闭第二个窗口的 timestamp = 1623051425000,得出结论

    Watermark1=[390,405)  Watermark2=[405,420)
    

    以上结论并行度为1

    相关文章

      网友评论

        本文标题:Flink-1.12(六)Flink时间语义及Watermark

        本文链接:https://www.haomeiwen.com/subject/acmqeltx.html