美文网首页
2022-05-28-Flink-47(六)

2022-05-28-Flink-47(六)

作者: 冰菓_ | 来源:发表于2022-05-28 23:46 被阅读0次

    1. 处理函数

    测试案例

    第一个水位线为 -9223372036854775808
    这里watermark 总是比 上一个事件时间 慢1ms

    
    public class processDemo1 {
        public static void main(String[] args) throws Exception {
    
    
            Configuration conf= new Configuration();
            conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
            //自定义端口
            conf.setInteger(RestOptions.PORT, 8050);
            StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
            // 默认是200L
            env.getConfig().setAutoWatermarkInterval(200L);
            env.setParallelism(2);
    
    
            // 随机生成事件数据
            DataStreamSource<Event> addSource = env.addSource(new Source());
            SingleOutputStreamOperator<Event> outputStreamOperator = addSource.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                @Override
                public long extractTimestamp(Event element, long recordTimestamp) {
                    return element.timestamp;
                }
            }));
    
            outputStreamOperator.process(new ProcessFunction<Event, String>() {
                @Override
                public void processElement(Event value, ProcessFunction<Event, String>.Context ctx, Collector<String> out) throws Exception {
                    long watermark = ctx.timerService().currentWatermark();
                    long processingTime = ctx.timerService().currentProcessingTime();
                    Long timestamp = ctx.timestamp();
                    int thisSubtask = getRuntimeContext().getIndexOfThisSubtask();
                    System.out.println(processingTime);
                    System.out.println("--------");
                    System.out.println(timestamp);
    
                    out.collect(value.user + "  " + value.timestamp + "   " + watermark + "  " + thisSubtask);
    
    
                }
    
                @Override
                public void open(Configuration parameters) throws Exception {
                    super.open(parameters);
                }
    
                @Override
                public void onTimer(long timestamp, ProcessFunction<Event, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
                    super.onTimer(timestamp, ctx, out);
                }
            }).print();
    
            env.execute();
    
    
    
    
        }
    }
    
    定时器的简单应用-事件的处理时间
    
    public class processProcessingTime {
    
        public static void main(String[] args) throws Exception{
            Configuration conf= new Configuration();
            conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
            //自定义端口
            conf.setInteger(RestOptions.PORT, 8050);
            StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
            // 默认是200L
            env.getConfig().setAutoWatermarkInterval(200L);
            env.setParallelism(2);
    
            DataStreamSource<Event> addSource = env.addSource(new Source());
    
    
            //定时器的触发和事件处理时间
            addSource.keyBy(data -> data.user).process(new KeyedProcessFunction<String, Event, String>() {
                @Override
                public void processElement(Event value, KeyedProcessFunction<String, Event, String>.Context ctx, Collector<String> out) throws Exception {
                     //获取时间的处理时间
                    long processingTime = ctx.timerService().currentProcessingTime();
                    out.collect(ctx.getCurrentKey() + "事件处理事件为" + new Timestamp(processingTime));
                    //10秒的定时器
                    ctx.timerService().registerProcessingTimeTimer(processingTime + 10 * 1000L);
                }
    
                @Override
                public void onTimer(long timestamp, KeyedProcessFunction<String, Event, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
                           out.collect(ctx.getCurrentKey() + "定时器触发事件" + new Timestamp(timestamp));
                }
            }).print();
    
            env.execute();
        }
    }
    
    
    定时器的简单应用-事件的事件时间
    
    public class processEventTime {
    
    
        public static void main(String[] args) throws Exception{
            Configuration conf= new Configuration();
            conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
            //自定义端口
            conf.setInteger(RestOptions.PORT, 8050);
            StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
            // 默认是200L
            env.getConfig().setAutoWatermarkInterval(200L);
            env.setParallelism(1);
    
            SingleOutputStreamOperator<Event> operator = env.addSource(new Source()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                @Override
                public long extractTimestamp(Event element, long recordTimestamp) {
                    return element.timestamp;
                }
            }));
    
    
    
            operator.keyBy(data -> data.user).process(new KeyedProcessFunction<String, Event, String>() {
                @Override
                public void processElement(Event value, KeyedProcessFunction<String, Event, String>.Context ctx, Collector<String> out) throws Exception {
                    Long timestamp = ctx.timestamp();
                    System.out.println(new Timestamp(value.timestamp));
                    out.collect(ctx.getCurrentKey() + "时间戳" + new Timestamp(timestamp) + "水位线" + ctx.timerService().currentWatermark());
                    //10秒的定时器
                    ctx.timerService().registerEventTimeTimer(timestamp + 10 * 1000L);
                }
    
                @Override
                public void onTimer(long timestamp, KeyedProcessFunction<String, Event, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
                    out.collect(ctx.getCurrentKey() + "定时器触发事件" + new Timestamp(timestamp) + "水位线" + ctx.timerService().currentWatermark());
                }
            }).print();
    
            env.execute();
        }
    }
    
    2022-05-23 23:54:50.305
    大时间戳2022-05-23 23:54:50.305水位线-9223372036854775808
    2022-05-23 23:54:51.309
    大时间戳2022-05-23 23:54:51.309水位线1653321290304
    2022-05-23 23:54:52.309
    中时间戳2022-05-23 23:54:52.309水位线1653321291308
    2022-05-23 23:54:53.309
    中时间戳2022-05-23 23:54:53.309水位线1653321292308
    2022-05-23 23:54:54.31
    小时间戳2022-05-23 23:54:54.31水位线1653321293308
    2022-05-23 23:54:55.31
    中时间戳2022-05-23 23:54:55.31水位线1653321294309
    2022-05-23 23:54:56.31
    中时间戳2022-05-23 23:54:56.31水位线1653321295309
    2022-05-23 23:54:57.311
    小时间戳2022-05-23 23:54:57.311水位线1653321296309
    2022-05-23 23:54:58.311
    小时间戳2022-05-23 23:54:58.311水位线1653321297310
    2022-05-23 23:54:59.311
    中时间戳2022-05-23 23:54:59.311水位线1653321298310
    2022-05-23 23:55:00.312
    中时间戳2022-05-23 23:55:00.305水位线1653321299310
    大定时器触发事件2022-05-23 23:55:00.305水位线1653321300311
    

    定时器的时间必须是小于等于水位线才会触发

    processwindowFunction
    windowall实现TOP_N
    
    public class processTop_N {
    
    
        public static void main(String[] args) throws Exception {
            Configuration conf= new Configuration();
            conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
            //自定义端口
            conf.setInteger(RestOptions.PORT, 8050);
            StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
            // 默认是200L
            env.getConfig().setAutoWatermarkInterval(200L);
            env.setParallelism(1);
    
            SingleOutputStreamOperator<Event> operator = env.addSource(new Source()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                @Override
                public long extractTimestamp(Event element, long recordTimestamp) {
                    return element.timestamp;
                }
            }));
    
           operator.map(data -> data.url)
                   //滑动窗口
                   .windowAll(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)))
                   .aggregate(new AggregateFunction<String, HashMap<String, Long>, ArrayList<Tuple2<String, Long>>>() {
                       @Override
                       public HashMap<String, Long> createAccumulator() {
                           return new HashMap<String, Long>();
                       }
    
                       @Override
                       public HashMap<String, Long> add(String value, HashMap<String, Long> accumulator) {
                           //每来一个数据判断是否和map里的key是重复的,重复的+1,不重复的置为1
                           if (accumulator.containsKey(value)) {
                               accumulator.put(value, accumulator.get(value) + 1);
                           } else {
                               accumulator.put(value, 1L);
                           }
                           return accumulator;
                       }
    
                       @Override
                       public ArrayList<Tuple2<String, Long>> getResult(HashMap<String, Long> accumulator) {
                           ArrayList<Tuple2<String, Long>> list = new ArrayList<>();
                           for (Map.Entry<String, Long> entry : accumulator.entrySet()) {
                               list.add(Tuple2.of(entry.getKey(), entry.getValue()));
                           }
                           list.sort(new Comparator<Tuple2<String, Long>>() {
                               @Override
                               public int compare(Tuple2<String, Long> o1, Tuple2<String, Long> o2) {
                                   return (int) (o2.f1 - o1.f1)  ;
                               }
                           });
                           return list;
                       }
    
                       @Override
                       public HashMap<String, Long> merge(HashMap<String, Long> a, HashMap<String, Long> b) {
                           return null;
                       }
                   }, new ProcessAllWindowFunction<ArrayList<Tuple2<String, Long>>, String, TimeWindow>() {
                       @Override
                       public void process(ProcessAllWindowFunction<ArrayList<Tuple2<String, Long>>, String, TimeWindow>.Context context, Iterable<ArrayList<Tuple2<String, Long>>> elements, Collector<String> out) throws Exception {
                           long start = context.window().getStart();
                           long end = context.window().getEnd();
                           StringBuilder builder = new StringBuilder();
                           ArrayList<Tuple2<String, Long>> list = elements.iterator().next();
                           for (int i = 0; i < 2; i++) {
                                  builder.append("N").append(i + 1).append("事件类型").append(list.get(i).f0).append("  访问次数 ").append(list.get(i).f1).append("\n");
                           }
                           out.collect("窗口开始事件" + new Timestamp(start) + "  窗口结束事件" + new Timestamp(end) + "\n" + builder) ;
                       }
                   }).print();
    
                 env.execute();
    
        }
    }
    

    每个重写方法是每来一条数据执行一次,根据窗口来执行,这个要了解清楚

    2. 多流转换

    侧输出流

    Flink 无法推断接OutputTag的类型

    
    public class outTagDemo {
    
    
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            conf.setInteger(RestOptions.PORT, 8050);
            StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
            env.setParallelism(1);
            OutputTag<Tuple2<String, String>> tag_小明 = new OutputTag<>("1"){};
            OutputTag<Tuple2<String, String>> tag_小红 = new OutputTag<>("2"){};
            DataStreamSource<Event> source = env.addSource(new Source());
            SingleOutputStreamOperator<Event> operator = source.process(new ProcessFunction<Event, Event>() {
                @Override
                public void processElement(Event value, ProcessFunction<Event, Event>.Context ctx, Collector<Event> out) throws Exception {
                             if (value.user.equals("小明")){
                                   ctx.output(tag_小明,Tuple2.of(value.user,value.url));
                             }
                             else if(value.user.equals("小红")){
                                 ctx.output(tag_小红,Tuple2.of(value.user,value.url));
    
                    }
                             else {
                                   out.collect(value);
                             }
                }
            });
    
            operator.getSideOutput(tag_小明).print();
            operator.getSideOutput(tag_小红).print();
            operator.print();
    
    
            env.execute();
        }
    }
    
    
    联合(union)
    连接(connect)
    双流join-窗口连接
    
    public class windowJoinDemo1 {
    
        public static void main(String[] args) throws Exception {
            Configuration conf= new Configuration();
            conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
            //自定义端口
            conf.setInteger(RestOptions.PORT, 8050);
            StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
            env.setParallelism(1);
    
            SingleOutputStreamOperator<Tuple2<String, Long>> source1 = env.fromElements(
                    Tuple2.of("1", 1000L),
                    Tuple2.of("2", 1001L),
                    Tuple2.of("1", 2000L),
                    Tuple2.of("2", 2001L),
                    Tuple2.of("1", 5001L)
            ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                @Override
                public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                    return element.f1;
                }
            }));
            SingleOutputStreamOperator<Tuple2<String, Long>> source2 = env.fromElements(
                    Tuple2.of("1", 4000L),
                    Tuple2.of("2", 4001L),
                    Tuple2.of("1", 5000L),
                    Tuple2.of("2", 6001L),
                    Tuple2.of("1", 7001L),
                    Tuple2.of("1", 4001L)
            ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                @Override
                public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                    return element.f1;
                }
            }));
    
            source1.join(source2)
                    .where(data -> data.f0)
                    .equalTo(data -> data.f0)
                    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                    .apply(new JoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {
                        @Override
                        public String join(Tuple2<String, Long> first, Tuple2<String, Long> second) throws Exception {
                            return first + "  --->  " + second;
                        }
                    }).print();
    
    
            env.execute();
        }
    
    }
    
    双流join-间隔连接
    
    public class windowJoinDemo2 {
        public static void main(String[] args) throws Exception {
            Configuration conf= new Configuration();
            conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
            //自定义端口
            conf.setInteger(RestOptions.PORT, 8050);
            StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
            env.setParallelism(1);
    
            SingleOutputStreamOperator<Tuple2<String, Long>> source1 = env.fromElements(
                    Tuple2.of("1", 1000L),
                    Tuple2.of("2", 1001L),
                    Tuple2.of("1", 2000L),
                    Tuple2.of("2", 2001L),
                    Tuple2.of("1", 5001L)
            ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                @Override
                public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                    return element.f1;
                }
            }));
            SingleOutputStreamOperator<Tuple2<String, Long>> source2 = env.fromElements(
                    Tuple2.of("1", 4000L),
                    Tuple2.of("2", 4001L),
                    Tuple2.of("1", 5000L),
                    Tuple2.of("2", 6001L),
                    Tuple2.of("1", 7001L),
                    Tuple2.of("1", 4001L),
                    Tuple2.of("1", 8001L),
                    Tuple2.of("1", 9001L)
            ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                @Override
                public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                    return element.f1;
                }
            }));
    
            source2.keyBy(data -> data.f0)
                   .intervalJoin(source1.keyBy(data -> data.f0))
                   .between(Time.seconds(-2),Time.seconds(2))
                   .process(new ProcessJoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {
                               @Override
                               public void processElement(Tuple2<String, Long> left, Tuple2<String, Long> right, ProcessJoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>.Context ctx, Collector<String> out) throws Exception {
                                   out.collect(left + "-->" + right + "  " + new Timestamp(ctx.getTimestamp()) + "");
                               }
                           }).print();
    
    
    
    
            env.execute();
        }
    }
    
    
    双流join-同组连结
            source1.coGroup(source2)
                    .where(data -> data.f0)
                    .equalTo(data -> data.f0)
                    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                    .apply(new RichCoGroupFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {
                        @Override
                        public void coGroup(Iterable<Tuple2<String, Long>> first, Iterable<Tuple2<String, Long>> second, Collector<String> out) throws Exception {
                                out.collect(first + "-->" + second );
                        }
                    }).print();
    
    [(1,1000), (1,2000)]-->[(1,4000), (1,4001)]
    [(2,1001), (2,2001)]-->[(2,4001)]
    [(1,5001)]-->[(1,5000), (1,7001)]
    []-->[(2,6001)]
    

    3. 状态编程

    flink中的状态
    1. 状态的访问权限
    2. 容错性
    3. 分布式应用的横向扩展性

    补充一下理解flink中的task :flink taskmanager&slots&并行度&任务链&task分配详解

    按KEY分区状态
    值状态每隔10S统计一次PV
    
    public class stateDemo1 {
    
    
        public static void main(String[] args) throws Exception {
            Configuration conf= new Configuration();
            conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
            //自定义端口
            conf.setInteger(RestOptions.PORT, 8050);
            StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
            env.setParallelism(1);
            SingleOutputStreamOperator<Event> operator = env.addSource(new clickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                @Override
                public long extractTimestamp(Event element, long recordTimestamp) {
                    return element.timestamp;
                }
            }));
    
    
            operator.keyBy(data -> data.user)
                  .process(new KeyedProcessFunction<String, Event, String>() {
                      //定义两个状态:1.保存值  2.保存定时器
                      ValueState<Long> valueState ;
                      ValueState<Long> timeState;
                      @Override
                      public void open(Configuration parameters) throws Exception {
                          valueState  = getRuntimeContext().getState(new ValueStateDescriptor<Long>("valueState", Long.class));
                          timeState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timeState", Long.class));
                      }
    
                      @Override
                      public void processElement(Event value, KeyedProcessFunction<String, Event, String>.Context ctx, Collector<String> out) throws Exception {
                          //更新key-value值
                          valueState.update(valueState.value() == null ? 1 : valueState.value() + 1);
                          //判断这个key的定时器是否注册过:我们的目的是10S只输出一次
                          if (timeState.value() == null){
                                 ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 10 * 1000L);
                                 timeState.update(ctx.timestamp() + 10 * 1000L);
    
                          }
                            out.collect(ctx.getCurrentKey() + "PV  " + valueState.value() + "水位线 " + new Timestamp(ctx.timerService().currentWatermark()) + "  " + new Timestamp(ctx.timestamp())) ;
    
                      }
    
                      @Override
                      public void onTimer(long timestamp, KeyedProcessFunction<String, Event, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
                              out.collect("onTimer" + ctx.getCurrentKey() + "PV  " + valueState.value() + "水位线 " + new Timestamp(ctx.timerService().currentWatermark()) + "  " + new Timestamp(ctx.timestamp()));
                               //清空定时器
                               timeState.clear();
                               // valueState.clear();  这里清除PV的化就相当于滚动窗口了
                               // 直到现在下一个key的定时器的产生,依赖于下一个key数据的到达,这就有点类似于会话窗口了,不符合题意,所以需要我们手动注册一下定时器
                               ctx.timerService().registerEventTimeTimer(timestamp + 10 * 1000L);
                               timeState.update(timestamp + 10 * 1000L);
                      }
                  })
                  .print();
    
    
            env.execute();
        }
    }
    
    

    这里有个问题,我一直没搞清楚,事件时间和水位线的问题.....把事件时间给打印,修改Duration.ofSeconds(2)测试案例

    list-state案例
    
    public class stateDemo2 {
    
        public static void main(String[] args) throws Exception {
            Configuration conf= new Configuration();
            conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
            //自定义端口
            conf.setInteger(RestOptions.PORT, 8050);
            StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
            env.setParallelism(1);
    
    
            SingleOutputStreamOperator<Tuple2<String, Long>> operator1 = env.fromElements(
                    Tuple2.of("a", 100L),
                    Tuple2.of("a", 100L),
                    Tuple2.of("b", 200L)
    
            ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                @Override
                public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                    return element.f1;
                }
            }));
    
    
            SingleOutputStreamOperator<Tuple2<String, Long>> operator2 = env.fromElements(
                    Tuple2.of("a", 1000L),
                    Tuple2.of("b", 2000L),
                    Tuple2.of("c", 2000L)
    
            ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                @Override
                public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                    return element.f1;
                }
            }));
    
    
            operator1.keyBy(data -> data.f0)
                    .connect(operator2.keyBy(data -> data.f0))
                    .process(new CoProcessFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {
                        ListState<Tuple2<String, Long>> left;
                        ListState<Tuple2<String, Long>> right;
    
                        @Override
                        public void open(Configuration parameters) throws Exception {
                            left = getRuntimeContext().getListState(new ListStateDescriptor<Tuple2<String, Long>>("left", Types.TUPLE(Types.STRING, Types.LONG)));
                            right = getRuntimeContext().getListState(new ListStateDescriptor<Tuple2<String, Long>>("right", Types.TUPLE(Types.STRING, Types.LONG)));
    
                        }
    
                        @Override
                        public void processElement1(Tuple2<String, Long> value, CoProcessFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>.Context ctx, Collector<String> out) throws Exception {
    
    
                            for (Tuple2<String, Long> left : right.get()) {
                                out.collect(value + " " + left);
                                System.out.println("----");
    
                            }
                            System.out.println("11111");
                            left.add(Tuple2.of(value.f0, value.f1));
    
                        }
    
                        @Override
                        public void processElement2(Tuple2<String, Long> value, CoProcessFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>.Context ctx, Collector<String> out) throws Exception {
                            for (Tuple2<String, Long> right : left.get()) {
                                out.collect(value + " " + right);
                            }
                            System.out.println("22222");
                            right.add(Tuple2.of(value.f0, value.f1));
                        }
                    }).print();
    
    
            env.execute();
        }
    }
    
    

    测试了一下,为什么执行结果是这样的,非常诡异...

    11111
    (a,1000) (a,100)
    22222
    (a,100) (a,1000)
    ----
    11111
    22222
    (b,200) (b,2000)
    ----
    11111
    22222
    

    学习一下:Flink进阶(三):双流connect的用法

                    Tuple2.of("a", 1000L),
                    Tuple2.of("a", 3000L),
                    Tuple2.of("b", 2000L),
                    Tuple2.of("c", 2000L)
    改了一下,测试案例,确实不太对,3000L的匹配了两次100L,没有进行清除....
    
    实现窗口
    
    public class stateDemo3 {
    
        public static void main(String[] args) throws Exception {
            Configuration conf= new Configuration();
            conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
            //自定义端口
            conf.setInteger(RestOptions.PORT, 8050);
            StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
            env.setParallelism(1);
            SingleOutputStreamOperator<Event> operator = env.addSource(new clickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                @Override
                public long extractTimestamp(Event element, long recordTimestamp) {
                    return element.timestamp;
                }
            }));
            operator.print();
    
            //按url统计PV
            operator.keyBy(data -> data.url)
                            .process(new keyProcess(10 * 1000L))
                            .print();
    
            env.execute();
        }
    }
    
    class  keyProcess extends KeyedProcessFunction<String,Event,String>{
           private  Long  tumbleTime;
           MapState<Long, Long> mapState;
    
    
        public keyProcess(Long tumbleTime) {
            this.tumbleTime = tumbleTime;
        }
    
        @Override
        public void open(Configuration parameters) throws Exception {
            mapState = getRuntimeContext().getMapState(new MapStateDescriptor<Long, Long>("mapState", Long.class, Long.class));
        }
    
        @Override
        public void processElement(Event value, KeyedProcessFunction<String, Event, String>.Context ctx, Collector<String> out) throws Exception {
            long startTime = value.timestamp / tumbleTime * tumbleTime;
            long endTime  = startTime + tumbleTime ;
            if (mapState.contains(startTime)){
                   mapState.put(startTime,mapState.get(startTime) + 1);
            }
            else {
                  mapState.put(startTime,1L);
            }
            //相同的定时器会重复吗???
            ctx.timerService().registerEventTimeTimer(endTime - 1L);
    
        }
        
        @Override
        public void onTimer(long timestamp, KeyedProcessFunction<String, Event, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
            long endTime = timestamp + 1L;
            long startTime = endTime - tumbleTime;
            out.collect("窗口" + new Timestamp(startTime) + "--" + new Timestamp(endTime) + " " + ctx.getCurrentKey() + "  PV:   " + mapState.get(startTime));
            mapState.remove(startTime);
         }
    }
    
    

    这里为什么要+1 -1???有什么区别吗

    看起来 ontime一次会输出多条数据,但是不是已经keyBy了吗,也就是说一个key一个ontime...是这样吗,这样做性能是不是有点低

    状态生存时间TTL
    算子状态

    算子状态也支持不同的结构类型,主要有三种:liststate,unionliststate,broadcaststate

    广播状态
    状态持久化

    检查点
    状态后端

    4. 容错机制

    检查点(checkpoint)
    1. 检查点的保存
    2. 从检查点恢复状态
    3. 检查点算法
    4. 检查点配置
    5. 保存点(savepoint)
    状态一致性

    什么是状态一致性

    1. 对于流处理器内部来说,所谓的状态一致性,其实就是我们所说的计算结果要保证准确
    2. 一条数据不应该丢失,也不应该重复计算
    3. 在遇到故障可以恢复状态,恢复以后的重新计算,结果应该也是完全正确的

    状态一致性分类

    1. 最多一次:当任务故障,最简单的做法是什么都不干,既不恢复丢失的状态,也不重播丢失的数据.at-most-once 语义的含义是最多处理一次事件
    2. 至少一次:在大多数的真实应用场景,我们希望不丢失事件,这种类型的保障称为at-least-once,意思是所有的事件都得到了处理,而一些事件还可能被处理多次
    3. 精确一次: 恰好处理一次是最严格的保证,也是最难实现的,恰好处理一次语义不仅仅意味着没有事件丢失,还意味着针对每一个数据,内部状态仅仅更新一次(逻辑上的)
    端到端的精确一致性

    幂等写入(idempotent writes)

    所谓的幂等性操作,是说一个操作,可以重复执行很多次,但是只会导致一次结果更改,也就是说,后面再重复执行就不起作用了


    幂等

    有一种场景:数据消费如下
    10,20,30,40,50
    我们在消费到10的时候做了一个检查点,但是数据在消费到30的时候挂掉了,所以数据要从最近一个检查点恢复,所以看到的数据走势是 10,20,30,10,20,30.....虽然数据最终是一致性的,但是中间的过程客户看到的是不对的...所以我们需要事务写入这种类型的

    事务写入(transactional writes)

    1. 事务(transaction)
      应用程序中一系列严密的操作.所有操作必须成功完成,否则在每个操作中所作的操作更改都会被撤销
      具有原子性:一个事务中的一系列操作要么全部成功,要么一个都不做
    2. 实现思想
      构建的事务对应着checkpoint 等到checkpoint 真正完成的时候,才把所有对应的结果写入sink系统中
    3. 实现方式
      预写日志
      两阶段提交

    预写日志(write-ahead-log)
    把结果数据先当成状态保存,然后在收到checkpoint完成的通知时,一次性写入sink系统
    简单易于实现,由于数据提前在状态后端中做了缓存,所以无论做什么sink系统,都能用这种方式搞定
    在大多数场景下是至少一次性语义的

    两阶段提交(two-phase-commit)
    参考一下: 第九章:一致性与共识
    对于每一个checkpoint ,sink任务都会启动一个事务,并将接下来所有接收的数据添加到事务中
    然后将这些数据写入到sink系统中,但不提交他们 -- 这时只是预提交(这里能查数据不?不能被消费的)
    当收到checkpoint完成的通知,它才正式提交事务,实现结果的真正写入
    这种方式真正实现了exactly-once,它需要一个提供事务支持的外部sink系统

    flink和kafka连接时的精准一次保障

    public class KafkaToKafka {
    
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            //开启checkpointing
            env.enableCheckpointing(30000, CheckpointingMode.EXACTLY_ONCE);
            env.setStateBackend(new FsStateBackend("file:///Users/xing/Desktop/flinkck20210123"));
    
            //设置Kafka相关参数
            Properties properties = new Properties();//设置Kafka的地址和端口
            properties.setProperty("bootstrap.servers", "linux03:9092,linux04:9092,linux05:9092");
            //读取偏移量策略:如果没有记录偏移量,就从头读,如果记录过偏移量,就接着读
            properties.setProperty("auto.offset.reset", "earliest");
            //设置消费者组ID
            properties.setProperty("group.id", "g1");
            //没有开启checkpoint,让flink提交偏移量的消费者定期自动提交偏移量
            properties.setProperty("enable.auto.commit", "false");
            //创建FlinkKafkaConsumer并传入相关参数
            FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
                    "kafka2021", //要读取数据的Topic名称
                    new SimpleStringSchema(), //读取文件的反序列化Schema
                    properties //传入Kafka的参数
            );
            //使用addSource添加kafkaConsumer
            kafkaConsumer.setCommitOffsetsOnCheckpoints(false); //在checkpoint时,不将偏移量写入到kafka特殊的topic中
    
            DataStreamSource<String> lines = env.addSource(kafkaConsumer);
    
            SingleOutputStreamOperator<String> filtered = lines.filter(e -> !e.startsWith("error"));
    
    
            //写入Kafka的topic
            String topic = "out2021";
            //设置Kafka相关参数
            properties.setProperty("transaction.timeout.ms",1000 * 60 * 5 + "");
    
            //创建FlinkKafkaProducer
            FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(
                    topic, //指定topic
                    new KafkaStringSerializationSchema(topic), //指定写入Kafka的序列化Schema
                    properties, //指定Kafka的相关参数
                    FlinkKafkaProducer.Semantic.EXACTLY_ONCE //指定写入Kafka为EXACTLY_ONCE语义
            );
    
            filtered.addSink(kafkaProducer);
    
            env.execute();
    
        }
    }
    
    /**
     * 自定义String类型数据Kafka的序列化Schema
     */
    public class KafkaStringSerializationSchema implements KafkaSerializationSchema<String> {
    
        private String topic;
        private String charset;
        //构造方法传入要写入的topic和字符集,默认使用UTF-8
        public KafkaStringSerializationSchema(String topic) {
            this(topic, "UTF-8");
        }
        public KafkaStringSerializationSchema(String topic, String charset) {
            this.topic = topic;
            this.charset = charset;
        }
        //调用该方法将数据进行序列化
        @Override
        public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {
            //将数据转成bytes数组
            byte[] bytes = element.getBytes(Charset.forName(charset));
            //返回ProducerRecord
            return new ProducerRecord<>(topic, bytes);
        }
    }
    

    相关文章

      网友评论

          本文标题:2022-05-28-Flink-47(六)

          本文链接:https://www.haomeiwen.com/subject/vpokprtx.html