初学Flink,对Watermarks的一些理解和感悟

作者: 9c0ddf06559c | 来源:发表于2018-09-29 19:42 被阅读14次

    官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/event_time.html
    翻译:https://www.jianshu.com/p/68ab40c7f347

    1. 几个重要的概念简述:

    • Window:Window是处理无界流的关键,Windows将流拆分为一个个有限大小的buckets,可以可以在每一个buckets中进行计算
    • start_time,end_time:当Window时时间窗口的时候,每个window都会有一个开始时间和结束时间(前开后闭),这个时间是系统时间
    • event-time: 事件发生时间,是事件发生所在设备的当地时间,比如一个点击事件的时间发生时间,是用户点击操作所在的手机或电脑的时间
    • Watermarks:可以把他理解为一个水位线,这个Watermarks在不断的变化,一旦Watermarks大于了某个window的end_time,就会触发此window的计算,Watermarks就是用来触发window计算的。

    2.如何使用Watermarks处理乱序的数据流

    什么是乱序呢?可以理解为数据到达的顺序和他的event-time排序不一致。导致这的原因有很多,比如延迟,消息积压,重试等等

    因为Watermarks是用来触发window窗口计算的,我们可以根据事件的event-time,计算出Watermarks,并且设置一些延迟,给迟到的数据一些机会。

    假如我们设置10s的时间窗口(window),那么0~10s,10~20s都是一个窗口,以0~10s为例,0位start-time,10为end-time。假如有4个数据的event-time分别是8(A),12.5(B),9(C),13.5(D),我们设置Watermarks为当前所有到达数据event-time的最大值减去延迟值3.5秒

    当A到达的时候,Watermarks为max{8}-3.5=8-3.5 = 4.5 < 10,不会触发计算
    当B到达的时候,Watermarks为max(12.8,5)-3.5=12.5-3.5 = 9 < 10,不会触发计算
    当C到达的时候,Watermarks为max(12.5,8,9)-3.5=12.5-3.5 = 9 < 10,不会触发计算
    当D到达的时候,Watermarks为max(13.5,12.5,8,9)-3.5=13.5-3.5 = 10 = 10,触发计算
    触发计算的时候,会将AC(因为他们都小于10)都计算进去

    通过上面这种方式,我们就将迟到的C计算进去了

    这里的延迟3.5s是我们假设一个数据到达的时候,比他早3.5s的数据肯定也都到达了,这个是需要根据经验推算的,加入D到达以后有到达了一个E,event-time=6,但是由于0~10的时间窗口已经开始计算了,所以E就丢了。

    3.看一个代码的实际例子

    下面代码中的BoundedOutOfOrdernessGenerator就是一个典型的Watermarks实例

    package com.meituan.flink.demo;
    
    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import com.meituan.flink.common.conf.FlinkConf;
    import com.meituan.flink.common.kafka.MTKafkaConsumer08;
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.streaming.api.CheckpointingMode;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
    import org.apache.flink.streaming.api.watermark.Watermark;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
    import org.apache.flink.util.Collector;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    /**
     * Created by smile on 14/11/2017.
     * 统计每 10 秒内每种操作有多少个
     */
    public class EventTimeWindowCount {
        private static final Logger logger = LoggerFactory.getLogger(EventTimeWindowCount.class);
    
        public static void main(String[] args) throws Exception {
            // 获取作业名
            String jobName = FlinkConf.getJobName(args);
            // 获取执行环境
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 设置使用 EventTime 作为时间戳(默认是 ProcessingTime)
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            // 开启 Checkpoint(每 10 秒保存一次检查点,模式为 Exactly Once)
            env.enableCheckpointing(10000);
            env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    
            // 设置从 Kafka 的 topic "log.orderlog" 中读取数据
            MYKafkaConsumer08 consumer = new MYKafkaConsumer08(jobName);
            DataStream<String> stream = env.addSource(consumer.getInstance("log.orderlog", new SimpleStringSchema()));
            // 默认接上次开始消费,以下的写法(setStartFromLatest)可以从最新开始消费,相应的还有(setStartFromEarliest 从最旧开始消费)
            // DataStream<String> stream = env.addSource(consumer.getInstance("log.orderlog", new SimpleStringSchema()).setStartFromLatest());
    
            DataStream<String> orderAmount =
                    // 将读入的字符串转化为 OrderRecord 对象
                    stream.map(new ParseOrderRecord())
                            // 设置从 OrderRecord 对象中提取时间戳的方式,下文 BoundedOutOfOrdernessGenerator 类中具体实现该方法
                            .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator())
                            // 用 OrderRecord 对象的 action 字段进行分流(相同 action 的进入相同流,不同 action 的进入不同流)
                            .keyBy("action")
                            // 触发 10s 的滚动窗口,即每十秒的数据进入同一个窗口
                            .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                            // 将同一窗口的每个 OrderRecord 对象的 count 字段加起来(其余字段只保留第一个进入该窗口的,后进去的丢弃)
                            .sum("count")
                            // 将结果从 OrderRecord 对象转换为 String,每十万条输出一条
                            .flatMap(new ParseResult());
                            // 如果想每条都输出来,那就输得慢一点,每 10 秒输出一条数据(请将上一行的 flatMap 换成下一行的 map)
                            // .map(new ParseResultSleep());
    
            // 输出结果(然后就可以去 Task Manage 的 Stdout 里面看)
            // 小数据量测试的时候可以这么写,正式上线的时候不要这么写!数据量大建议还是写到 Kafka Topic 或者其他的下游里面去
            orderAmount.print();
            env.execute(jobName);
        }
    
        public static class ParseOrderRecord implements MapFunction<String, OrderRecord> {
    
            @Override
            public OrderRecord map(String s) throws Exception {
                JSONObject jsonObject = JSON.parseObject(s);
                long id = jsonObject.getLong("id");
                int dealId = jsonObject.getInteger("dealid");
                String action = jsonObject.getString("_mt_action");
                double amount = jsonObject.getDouble("amount");
                String timestampString = jsonObject.getString("_mt_datetime");
                // 将字符串格式的时间戳解析为 long 类型,单位毫秒
                SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                Date timestampDate = simpleDateFormat.parse(timestampString);
                long timestamp = timestampDate.getTime();
    
                return new OrderRecord(id, dealId, action, amount, timestamp);
            }
        }
    
        public static class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<OrderRecord> {
            private final long maxOutOfOrderness = 3500; // 3.5 seconds
    
            private long currentMaxTimestamp;
    
            @Override
            public long extractTimestamp(OrderRecord record, long previousElementTimestamp) {
                // 将数据中的时间戳字段(long 类型,精确到毫秒)赋给 timestamp 变量,此处是 OrderRecord 的 timestamp 字段
                long timestamp = record.timestamp;
                currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
                return timestamp;
            }
    
            @Override
            public Watermark getCurrentWatermark() {
                // return the watermark as current highest timestamp minus the out-of-orderness bound
                return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
            }
        }
    
        public static class ParseResult implements FlatMapFunction<OrderRecord, String> {
            private static long msgCount = 0;
    
            @Override
            public void flatMap(OrderRecord record, Collector<String> out) throws Exception {
                // 每十万条输出一条,防止输出太多在 Task Manage 的 Stdout 里面刷新不出来
                if (msgCount == 0) {
                    out.collect("Start from: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(record.timestamp) + " action: " + record.action + " count = " + record.count);
                    msgCount = 0;
                }
                msgCount++;
                msgCount %= 100000;
            }
        }
    
        public static class ParseResultSleep implements MapFunction<OrderRecord, String> {
    
            @Override
            public String map(OrderRecord record) throws Exception {
                // 每 10 秒输出一条数据,防止输出太多在 Task Manage 的 Stdout 里面刷新不出来
                // 正式上线的时候不要这么写!
                Thread.sleep(10000);
                return "Start from: " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(record.timestamp) + " action: " + record.action + " count = " + record.count;
            }
        }
    
        public static class OrderRecord {
    
            public long id;
            public int dealId;
            public String action;
            public double amount;
            public long timestamp;
            public long count;
    
            public OrderRecord() {
            }
    
            public OrderRecord(long id, int dealId, String action, double amount, long timestamp) {
                this.id = id;
                this.dealId = dealId;
                this.action = action;
                this.amount = amount;
                this.timestamp = timestamp;
                this.count = 1;
            }
        }
    }
    

    相关文章

      网友评论

        本文标题:初学Flink,对Watermarks的一些理解和感悟

        本文链接:https://www.haomeiwen.com/subject/uomboftx.html