美文网首页
Flink-水印-主流与侧流输出

Flink-水印-主流与侧流输出

作者: 卡门001 | 来源:发表于2021-12-17 11:21 被阅读0次

    实际工作中,数据到来不会总是有序的,所以window需要结合水印来使用,以满足实际场景。但是既便用了水印,也有可能存在漏网之鱼,这时就要用到侧流来将漏网数据收住,提高数据精准度。

    运行方法:

    1、将下述代码贴到工程里,注意需要依赖flink的相关包
    2、调试步骤1:linux 打开 nc工具,启动控制台,准备数据输入
    3、调试步骤2:启动工程,观察侧流与主流的数据

    例子数据

            /**
             * 1000,a,1
             * 2000,a,1
             * 4998,a,1
             * 4999,a,1
             * 6999,a,1
             * 12000,a,1
             * 当到了12000时,执行的时,第二个窗口内的数据
             * 第1个窗口:[0000,5000)
             * 第2个窗口:[5000,10000)
             * 注:flink窗口是左闭右开的
             */
    

    代码如下

    package com.flink.watermarks;
    
    
    import cn.hutool.core.date.format.FastDateFormat;
    import lombok.val;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.functions.ReduceFunction;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
    import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.util.Collector;
    import org.apache.flink.util.OutputTag;
    
    public class EventTimeWMApp {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
    
            test01(env);
            env.execute("WindowApp");
        }
    
        /**
         * EventTime结合WM使用
         * 输入数据格式:时间字段、单词、次数
         * @param env
         */
        public static void test01(StreamExecutionEnvironment env){
            //用于接收延迟到来的数据(窗口已结束,还有相应的时间段的数据进来)
            OutputTag<Tuple2<String,Integer>> outputTag = new OutputTag<Tuple2<String,Integer>>("late-data"){};
    
    
            DataStreamSource<String> source = env.socketTextStream("localhost",9527);
    
            //开始建数据源时,直接建watermarker更好一些,而不是在过程中建。
            SingleOutputStreamOperator<String> lines = source.assignTimestampsAndWatermarks(
                    new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) {
                        //乱序的watermarker处理器
                        @Override
                        public long extractTimestamp(String element) {
                            return Long.parseLong(element.split(",")[0]);//获取数据中的第一列,被当成触发5秒一个窗口的时间序列(对比值)
                        }
                    }
            );
    
            SingleOutputStreamOperator<Tuple2<String,Integer>>  mapStream = lines.map(new MapFunction<String, Tuple2<String,Integer>>(){
                @Override
                public Tuple2<String, Integer> map(String value) throws Exception {
                    String[] splits = value.split(",");
                    try{
                        return Tuple2.of(splits[1],Integer.parseInt(splits[2].trim()));
                    }catch (Exception e){
                        e.printStackTrace();
                        return new Tuple2("null",Integer.MIN_VALUE);
                    }
                }
            });
    
            /**
             * 1000,a,1
             * 2000,a,1
             * 4998,a,1
             * 4999,a,1
             * 6999,a,1
             * 12000,a,1
             * 当到了12000时,执行的时,第二个窗口内的数据
             * 第1个窗口:[0000,5000)
             * 第2个窗口:[5000,10000)
             * 注:flink窗口是左闭右开的
             */
            SingleOutputStreamOperator window =  mapStream.keyBy(x -> x.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                //sideOutputLateData(OutputTag<T> outputTag) ,用于解决窗口结束,还有数据进来的情况
                    .sideOutputLateData(outputTag)//接收延迟数据
                .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> v1, Tuple2<String, Integer> v2) throws Exception {
                        System.out.println(" ---reduce invoked ---" + v1.f0 + "[(" + v1.f1 + "," + v2.f1 + ")]===>" + (v1.f1 + v2.f1));
                        return Tuple2.of(v1.f0, v1.f1 + v2.f1);
                    }//以增量的方式聚合
                }, new ProcessWindowFunction<Tuple2<String, Integer>, Object, String, TimeWindow>() {
                    FastDateFormat format = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss");
                    //双窗口的方式
                    @Override
                    public void process(String s, Context context, Iterable<Tuple2<String, Integer>> elements, Collector<Object> out) throws Exception {
                        for (Tuple2<String, Integer> element : elements) {
                            out.collect("[" + format.format(context.window().getStart()) + "]===> " + format.format(context.window().getEnd()) + "]," + element.f0 + "===>" + element.f1);
                            //});
                        } //窗口的开始时间
                    }
                });
                //.sum(1)
                //主流数据
                window.print();
    
                //侧流数据
                DataStream<Tuple2<String,Integer>> sideOutput = window.getSideOutput(outputTag); //得到延迟数据
                sideOutput.print();
    
        }
    
    
    }
    
    

    相关文章

      网友评论

          本文标题:Flink-水印-主流与侧流输出

          本文链接:https://www.haomeiwen.com/subject/pmcifrtx.html