美文网首页flink
flink 解决乱序事件流Watermark

flink 解决乱序事件流Watermark

作者: 邵红晓 | 来源:发表于2019-10-12 14:05 被阅读0次
    • Watermark 对乱序到达的事件流,进行了一个最大时间的等待(小于等于Watermark 的,都认为是到达了),Flink 中的事件时间处理依赖于一种特殊的带时间戳的元素,成为 watermark,它们会由数据源或是 watermark 生成器插入数据流中。具有时间戳 t 的 watermark 可以被理解为断言了所有时间戳小于或等于 t 的事件都(在某种合理的概率上)已经到达了。
    • 对于迟到的数据可以进行旁路输出
    • 前提需要抽取timestamp和生成watermark

    完整代码

           //定义socket的端口号
            int port = 9900;
            //获取运行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            //设置使用eventtime,默认是使用processtime
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    
            //设置并行度为1,默认并行度是当前机器的cpu数量
            env.setParallelism(1);
    
            //连接socket获取输入的数据
            DataStream<String> text = env.socketTextStream("****", port, "\n");
    
            //解析输入的数据
            DataStream<Tuple2<String, Long>> inputMap = text.map(new MapFunction<String, Tuple2<String, Long>>() {
                @Override
                public Tuple2<String, Long> map(String value) throws Exception {
                    String[] arr = value.split(",");
                    return new Tuple2<>(arr[0], Long.parseLong(arr[1]));
                }
            });
    
            //抽取timestamp和生成watermark
            DataStream<Tuple2<String, Long>> waterMarkStream = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() {
    
                Long currentMaxTimestamp = 0L;
                final Long maxOutOfOrderness = 10000L;// 最大允许的乱序时间是10s
    
                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                /**
                 * 定义生成watermark的逻辑
                 * 默认100ms被调用一次
                 */
                @Nullable
                @Override
                public Watermark getCurrentWatermark() {
                    return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
                }
    
                //定义如何提取timestamp
                @Override
                public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {
                    long timestamp = element.f1;
                    currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
                    System.out.println("key:"+element.f0+",eventtime:["+element.f1+"|"+sdf.format(element.f1)+"],currentMaxTimestamp:["+currentMaxTimestamp+"|"+
                            sdf.format(currentMaxTimestamp)+"],watermark:["+getCurrentWatermark().getTimestamp()+"|"+sdf.format(getCurrentWatermark().getTimestamp())+"]");
                    return timestamp;
                }
            });
    
            //保存被丢弃的数据
            OutputTag<Tuple2<String, Long>> outputTag = new OutputTag<Tuple2<String, Long>>("late-data"){};
            //注意,由于getSideOutput方法是SingleOutputStreamOperator子类中的特有方法,所以这里的类型,不能使用它的父类dataStream。
            SingleOutputStreamOperator<String> window = waterMarkStream.keyBy(0)
                    .window(TumblingEventTimeWindows.of(Time.seconds(3)))//按照消息的EventTime分配窗口,和调用TimeWindow效果一样
                    .allowedLateness(Time.seconds(2))//允许数·据迟到2秒
                    .sideOutputLateData(outputTag)
                    .apply(new WindowFunction<Tuple2<String, Long>, String, Tuple, TimeWindow>() {
                        /**
                         * 对window内的数据进行排序,保证数据的顺序
                         * @param tuple
                         * @param window
                         * @param input
                         * @param out
                         * @throws Exception
                         */
                        @Override
                        public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception {
                            String key = tuple.toString();
                            List<Long> arrarList = new ArrayList<Long>();
                            Iterator<Tuple2<String, Long>> it = input.iterator();
                            while (it.hasNext()) {
                                Tuple2<String, Long> next = it.next();
                                arrarList.add(next.f1);
                            }
                            Collections.sort(arrarList);
                            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                            String result = key + "," + arrarList.size() + "," + sdf.format(arrarList.get(0)) + "," + sdf.format(arrarList.get(arrarList.size() - 1))
                                    + "," + sdf.format(window.getStart()) + "," + sdf.format(window.getEnd());
                            out.collect(result);
                        }
                    });
            //把迟到的数据暂时打印到控制台,实际中可以保存到其他存储介质中
            DataStream<Tuple2<String, Long>> sideOutput = window.getSideOutput(outputTag);
            //sideOutput.addSink()  //可以addSinks
            sideOutput.writeAsText("D:\\Users\\xdata\\flink-learn\\data\\sideOutPut");
            sideOutput.print();
            //测试-把结果打印到控制台即可
            window.print();
    
            //注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行
            env.execute("eventtime-watermark");
    

    相关文章

      网友评论

        本文标题:flink 解决乱序事件流Watermark

        本文链接:https://www.haomeiwen.com/subject/pweqmctx.html