美文网首页
2022-05-28-Flink-47(六)

2022-05-28-Flink-47(六)

作者: 冰菓_ | 来源:发表于2022-05-28 23:46 被阅读0次

1. 处理函数

测试案例

第一个水位线为 -9223372036854775808
这里watermark 总是比 上一个事件时间 慢1ms


public class processDemo1 {
    public static void main(String[] args) throws Exception {


        Configuration conf= new Configuration();
        conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
        //自定义端口
        conf.setInteger(RestOptions.PORT, 8050);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        // 默认是200L
        env.getConfig().setAutoWatermarkInterval(200L);
        env.setParallelism(2);


        // 随机生成事件数据
        DataStreamSource<Event> addSource = env.addSource(new Source());
        SingleOutputStreamOperator<Event> outputStreamOperator = addSource.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
            @Override
            public long extractTimestamp(Event element, long recordTimestamp) {
                return element.timestamp;
            }
        }));

        outputStreamOperator.process(new ProcessFunction<Event, String>() {
            @Override
            public void processElement(Event value, ProcessFunction<Event, String>.Context ctx, Collector<String> out) throws Exception {
                long watermark = ctx.timerService().currentWatermark();
                long processingTime = ctx.timerService().currentProcessingTime();
                Long timestamp = ctx.timestamp();
                int thisSubtask = getRuntimeContext().getIndexOfThisSubtask();
                System.out.println(processingTime);
                System.out.println("--------");
                System.out.println(timestamp);

                out.collect(value.user + "  " + value.timestamp + "   " + watermark + "  " + thisSubtask);


            }

            @Override
            public void open(Configuration parameters) throws Exception {
                super.open(parameters);
            }

            @Override
            public void onTimer(long timestamp, ProcessFunction<Event, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
                super.onTimer(timestamp, ctx, out);
            }
        }).print();

        env.execute();




    }
}
定时器的简单应用-事件的处理时间

public class processProcessingTime {

    public static void main(String[] args) throws Exception{
        Configuration conf= new Configuration();
        conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
        //自定义端口
        conf.setInteger(RestOptions.PORT, 8050);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        // 默认是200L
        env.getConfig().setAutoWatermarkInterval(200L);
        env.setParallelism(2);

        DataStreamSource<Event> addSource = env.addSource(new Source());


        //定时器的触发和事件处理时间
        addSource.keyBy(data -> data.user).process(new KeyedProcessFunction<String, Event, String>() {
            @Override
            public void processElement(Event value, KeyedProcessFunction<String, Event, String>.Context ctx, Collector<String> out) throws Exception {
                 //获取时间的处理时间
                long processingTime = ctx.timerService().currentProcessingTime();
                out.collect(ctx.getCurrentKey() + "事件处理事件为" + new Timestamp(processingTime));
                //10秒的定时器
                ctx.timerService().registerProcessingTimeTimer(processingTime + 10 * 1000L);
            }

            @Override
            public void onTimer(long timestamp, KeyedProcessFunction<String, Event, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
                       out.collect(ctx.getCurrentKey() + "定时器触发事件" + new Timestamp(timestamp));
            }
        }).print();

        env.execute();
    }
}

定时器的简单应用-事件的事件时间

public class processEventTime {


    public static void main(String[] args) throws Exception{
        Configuration conf= new Configuration();
        conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
        //自定义端口
        conf.setInteger(RestOptions.PORT, 8050);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        // 默认是200L
        env.getConfig().setAutoWatermarkInterval(200L);
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> operator = env.addSource(new Source()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
            @Override
            public long extractTimestamp(Event element, long recordTimestamp) {
                return element.timestamp;
            }
        }));



        operator.keyBy(data -> data.user).process(new KeyedProcessFunction<String, Event, String>() {
            @Override
            public void processElement(Event value, KeyedProcessFunction<String, Event, String>.Context ctx, Collector<String> out) throws Exception {
                Long timestamp = ctx.timestamp();
                System.out.println(new Timestamp(value.timestamp));
                out.collect(ctx.getCurrentKey() + "时间戳" + new Timestamp(timestamp) + "水位线" + ctx.timerService().currentWatermark());
                //10秒的定时器
                ctx.timerService().registerEventTimeTimer(timestamp + 10 * 1000L);
            }

            @Override
            public void onTimer(long timestamp, KeyedProcessFunction<String, Event, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
                out.collect(ctx.getCurrentKey() + "定时器触发事件" + new Timestamp(timestamp) + "水位线" + ctx.timerService().currentWatermark());
            }
        }).print();

        env.execute();
    }
}
2022-05-23 23:54:50.305
大时间戳2022-05-23 23:54:50.305水位线-9223372036854775808
2022-05-23 23:54:51.309
大时间戳2022-05-23 23:54:51.309水位线1653321290304
2022-05-23 23:54:52.309
中时间戳2022-05-23 23:54:52.309水位线1653321291308
2022-05-23 23:54:53.309
中时间戳2022-05-23 23:54:53.309水位线1653321292308
2022-05-23 23:54:54.31
小时间戳2022-05-23 23:54:54.31水位线1653321293308
2022-05-23 23:54:55.31
中时间戳2022-05-23 23:54:55.31水位线1653321294309
2022-05-23 23:54:56.31
中时间戳2022-05-23 23:54:56.31水位线1653321295309
2022-05-23 23:54:57.311
小时间戳2022-05-23 23:54:57.311水位线1653321296309
2022-05-23 23:54:58.311
小时间戳2022-05-23 23:54:58.311水位线1653321297310
2022-05-23 23:54:59.311
中时间戳2022-05-23 23:54:59.311水位线1653321298310
2022-05-23 23:55:00.312
中时间戳2022-05-23 23:55:00.305水位线1653321299310
大定时器触发事件2022-05-23 23:55:00.305水位线1653321300311

定时器的时间必须是小于等于水位线才会触发

processwindowFunction
windowall实现TOP_N

public class processTop_N {


    public static void main(String[] args) throws Exception {
        Configuration conf= new Configuration();
        conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
        //自定义端口
        conf.setInteger(RestOptions.PORT, 8050);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        // 默认是200L
        env.getConfig().setAutoWatermarkInterval(200L);
        env.setParallelism(1);

        SingleOutputStreamOperator<Event> operator = env.addSource(new Source()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
            @Override
            public long extractTimestamp(Event element, long recordTimestamp) {
                return element.timestamp;
            }
        }));

       operator.map(data -> data.url)
               //滑动窗口
               .windowAll(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)))
               .aggregate(new AggregateFunction<String, HashMap<String, Long>, ArrayList<Tuple2<String, Long>>>() {
                   @Override
                   public HashMap<String, Long> createAccumulator() {
                       return new HashMap<String, Long>();
                   }

                   @Override
                   public HashMap<String, Long> add(String value, HashMap<String, Long> accumulator) {
                       //每来一个数据判断是否和map里的key是重复的,重复的+1,不重复的置为1
                       if (accumulator.containsKey(value)) {
                           accumulator.put(value, accumulator.get(value) + 1);
                       } else {
                           accumulator.put(value, 1L);
                       }
                       return accumulator;
                   }

                   @Override
                   public ArrayList<Tuple2<String, Long>> getResult(HashMap<String, Long> accumulator) {
                       ArrayList<Tuple2<String, Long>> list = new ArrayList<>();
                       for (Map.Entry<String, Long> entry : accumulator.entrySet()) {
                           list.add(Tuple2.of(entry.getKey(), entry.getValue()));
                       }
                       list.sort(new Comparator<Tuple2<String, Long>>() {
                           @Override
                           public int compare(Tuple2<String, Long> o1, Tuple2<String, Long> o2) {
                               return (int) (o2.f1 - o1.f1)  ;
                           }
                       });
                       return list;
                   }

                   @Override
                   public HashMap<String, Long> merge(HashMap<String, Long> a, HashMap<String, Long> b) {
                       return null;
                   }
               }, new ProcessAllWindowFunction<ArrayList<Tuple2<String, Long>>, String, TimeWindow>() {
                   @Override
                   public void process(ProcessAllWindowFunction<ArrayList<Tuple2<String, Long>>, String, TimeWindow>.Context context, Iterable<ArrayList<Tuple2<String, Long>>> elements, Collector<String> out) throws Exception {
                       long start = context.window().getStart();
                       long end = context.window().getEnd();
                       StringBuilder builder = new StringBuilder();
                       ArrayList<Tuple2<String, Long>> list = elements.iterator().next();
                       for (int i = 0; i < 2; i++) {
                              builder.append("N").append(i + 1).append("事件类型").append(list.get(i).f0).append("  访问次数 ").append(list.get(i).f1).append("\n");
                       }
                       out.collect("窗口开始事件" + new Timestamp(start) + "  窗口结束事件" + new Timestamp(end) + "\n" + builder) ;
                   }
               }).print();

             env.execute();

    }
}

每个重写方法是每来一条数据执行一次,根据窗口来执行,这个要了解清楚

2. 多流转换

侧输出流

Flink 无法推断接OutputTag的类型


public class outTagDemo {


    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.setInteger(RestOptions.PORT, 8050);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        env.setParallelism(1);
        OutputTag<Tuple2<String, String>> tag_小明 = new OutputTag<>("1"){};
        OutputTag<Tuple2<String, String>> tag_小红 = new OutputTag<>("2"){};
        DataStreamSource<Event> source = env.addSource(new Source());
        SingleOutputStreamOperator<Event> operator = source.process(new ProcessFunction<Event, Event>() {
            @Override
            public void processElement(Event value, ProcessFunction<Event, Event>.Context ctx, Collector<Event> out) throws Exception {
                         if (value.user.equals("小明")){
                               ctx.output(tag_小明,Tuple2.of(value.user,value.url));
                         }
                         else if(value.user.equals("小红")){
                             ctx.output(tag_小红,Tuple2.of(value.user,value.url));

                }
                         else {
                               out.collect(value);
                         }
            }
        });

        operator.getSideOutput(tag_小明).print();
        operator.getSideOutput(tag_小红).print();
        operator.print();


        env.execute();
    }
}

联合(union)
连接(connect)
双流join-窗口连接

public class windowJoinDemo1 {

    public static void main(String[] args) throws Exception {
        Configuration conf= new Configuration();
        conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
        //自定义端口
        conf.setInteger(RestOptions.PORT, 8050);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        env.setParallelism(1);

        SingleOutputStreamOperator<Tuple2<String, Long>> source1 = env.fromElements(
                Tuple2.of("1", 1000L),
                Tuple2.of("2", 1001L),
                Tuple2.of("1", 2000L),
                Tuple2.of("2", 2001L),
                Tuple2.of("1", 5001L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
            @Override
            public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                return element.f1;
            }
        }));
        SingleOutputStreamOperator<Tuple2<String, Long>> source2 = env.fromElements(
                Tuple2.of("1", 4000L),
                Tuple2.of("2", 4001L),
                Tuple2.of("1", 5000L),
                Tuple2.of("2", 6001L),
                Tuple2.of("1", 7001L),
                Tuple2.of("1", 4001L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
            @Override
            public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                return element.f1;
            }
        }));

        source1.join(source2)
                .where(data -> data.f0)
                .equalTo(data -> data.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .apply(new JoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {
                    @Override
                    public String join(Tuple2<String, Long> first, Tuple2<String, Long> second) throws Exception {
                        return first + "  --->  " + second;
                    }
                }).print();


        env.execute();
    }

}
双流join-间隔连接

public class windowJoinDemo2 {
    public static void main(String[] args) throws Exception {
        Configuration conf= new Configuration();
        conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
        //自定义端口
        conf.setInteger(RestOptions.PORT, 8050);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        env.setParallelism(1);

        SingleOutputStreamOperator<Tuple2<String, Long>> source1 = env.fromElements(
                Tuple2.of("1", 1000L),
                Tuple2.of("2", 1001L),
                Tuple2.of("1", 2000L),
                Tuple2.of("2", 2001L),
                Tuple2.of("1", 5001L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
            @Override
            public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                return element.f1;
            }
        }));
        SingleOutputStreamOperator<Tuple2<String, Long>> source2 = env.fromElements(
                Tuple2.of("1", 4000L),
                Tuple2.of("2", 4001L),
                Tuple2.of("1", 5000L),
                Tuple2.of("2", 6001L),
                Tuple2.of("1", 7001L),
                Tuple2.of("1", 4001L),
                Tuple2.of("1", 8001L),
                Tuple2.of("1", 9001L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
            @Override
            public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                return element.f1;
            }
        }));

        source2.keyBy(data -> data.f0)
               .intervalJoin(source1.keyBy(data -> data.f0))
               .between(Time.seconds(-2),Time.seconds(2))
               .process(new ProcessJoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {
                           @Override
                           public void processElement(Tuple2<String, Long> left, Tuple2<String, Long> right, ProcessJoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>.Context ctx, Collector<String> out) throws Exception {
                               out.collect(left + "-->" + right + "  " + new Timestamp(ctx.getTimestamp()) + "");
                           }
                       }).print();




        env.execute();
    }
}

双流join-同组连结
        source1.coGroup(source2)
                .where(data -> data.f0)
                .equalTo(data -> data.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .apply(new RichCoGroupFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {
                    @Override
                    public void coGroup(Iterable<Tuple2<String, Long>> first, Iterable<Tuple2<String, Long>> second, Collector<String> out) throws Exception {
                            out.collect(first + "-->" + second );
                    }
                }).print();
[(1,1000), (1,2000)]-->[(1,4000), (1,4001)]
[(2,1001), (2,2001)]-->[(2,4001)]
[(1,5001)]-->[(1,5000), (1,7001)]
[]-->[(2,6001)]

3. 状态编程

flink中的状态
  1. 状态的访问权限
  2. 容错性
  3. 分布式应用的横向扩展性

补充一下理解flink中的task :flink taskmanager&slots&并行度&任务链&task分配详解

按KEY分区状态
值状态每隔10S统计一次PV

public class stateDemo1 {


    public static void main(String[] args) throws Exception {
        Configuration conf= new Configuration();
        conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
        //自定义端口
        conf.setInteger(RestOptions.PORT, 8050);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        env.setParallelism(1);
        SingleOutputStreamOperator<Event> operator = env.addSource(new clickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
            @Override
            public long extractTimestamp(Event element, long recordTimestamp) {
                return element.timestamp;
            }
        }));


        operator.keyBy(data -> data.user)
              .process(new KeyedProcessFunction<String, Event, String>() {
                  //定义两个状态:1.保存值  2.保存定时器
                  ValueState<Long> valueState ;
                  ValueState<Long> timeState;
                  @Override
                  public void open(Configuration parameters) throws Exception {
                      valueState  = getRuntimeContext().getState(new ValueStateDescriptor<Long>("valueState", Long.class));
                      timeState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timeState", Long.class));
                  }

                  @Override
                  public void processElement(Event value, KeyedProcessFunction<String, Event, String>.Context ctx, Collector<String> out) throws Exception {
                      //更新key-value值
                      valueState.update(valueState.value() == null ? 1 : valueState.value() + 1);
                      //判断这个key的定时器是否注册过:我们的目的是10S只输出一次
                      if (timeState.value() == null){
                             ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 10 * 1000L);
                             timeState.update(ctx.timestamp() + 10 * 1000L);

                      }
                        out.collect(ctx.getCurrentKey() + "PV  " + valueState.value() + "水位线 " + new Timestamp(ctx.timerService().currentWatermark()) + "  " + new Timestamp(ctx.timestamp())) ;

                  }

                  @Override
                  public void onTimer(long timestamp, KeyedProcessFunction<String, Event, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
                          out.collect("onTimer" + ctx.getCurrentKey() + "PV  " + valueState.value() + "水位线 " + new Timestamp(ctx.timerService().currentWatermark()) + "  " + new Timestamp(ctx.timestamp()));
                           //清空定时器
                           timeState.clear();
                           // valueState.clear();  这里清除PV的化就相当于滚动窗口了
                           // 直到现在下一个key的定时器的产生,依赖于下一个key数据的到达,这就有点类似于会话窗口了,不符合题意,所以需要我们手动注册一下定时器
                           ctx.timerService().registerEventTimeTimer(timestamp + 10 * 1000L);
                           timeState.update(timestamp + 10 * 1000L);
                  }
              })
              .print();


        env.execute();
    }
}

这里有个问题,我一直没搞清楚,事件时间和水位线的问题.....把事件时间给打印,修改Duration.ofSeconds(2)测试案例

list-state案例

public class stateDemo2 {

    public static void main(String[] args) throws Exception {
        Configuration conf= new Configuration();
        conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
        //自定义端口
        conf.setInteger(RestOptions.PORT, 8050);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        env.setParallelism(1);


        SingleOutputStreamOperator<Tuple2<String, Long>> operator1 = env.fromElements(
                Tuple2.of("a", 100L),
                Tuple2.of("a", 100L),
                Tuple2.of("b", 200L)

        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
            @Override
            public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                return element.f1;
            }
        }));


        SingleOutputStreamOperator<Tuple2<String, Long>> operator2 = env.fromElements(
                Tuple2.of("a", 1000L),
                Tuple2.of("b", 2000L),
                Tuple2.of("c", 2000L)

        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
            @Override
            public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
                return element.f1;
            }
        }));


        operator1.keyBy(data -> data.f0)
                .connect(operator2.keyBy(data -> data.f0))
                .process(new CoProcessFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {
                    ListState<Tuple2<String, Long>> left;
                    ListState<Tuple2<String, Long>> right;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        left = getRuntimeContext().getListState(new ListStateDescriptor<Tuple2<String, Long>>("left", Types.TUPLE(Types.STRING, Types.LONG)));
                        right = getRuntimeContext().getListState(new ListStateDescriptor<Tuple2<String, Long>>("right", Types.TUPLE(Types.STRING, Types.LONG)));

                    }

                    @Override
                    public void processElement1(Tuple2<String, Long> value, CoProcessFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>.Context ctx, Collector<String> out) throws Exception {


                        for (Tuple2<String, Long> left : right.get()) {
                            out.collect(value + " " + left);
                            System.out.println("----");

                        }
                        System.out.println("11111");
                        left.add(Tuple2.of(value.f0, value.f1));

                    }

                    @Override
                    public void processElement2(Tuple2<String, Long> value, CoProcessFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>.Context ctx, Collector<String> out) throws Exception {
                        for (Tuple2<String, Long> right : left.get()) {
                            out.collect(value + " " + right);
                        }
                        System.out.println("22222");
                        right.add(Tuple2.of(value.f0, value.f1));
                    }
                }).print();


        env.execute();
    }
}

测试了一下,为什么执行结果是这样的,非常诡异...

11111
(a,1000) (a,100)
22222
(a,100) (a,1000)
----
11111
22222
(b,200) (b,2000)
----
11111
22222

学习一下:Flink进阶(三):双流connect的用法

                Tuple2.of("a", 1000L),
                Tuple2.of("a", 3000L),
                Tuple2.of("b", 2000L),
                Tuple2.of("c", 2000L)
改了一下,测试案例,确实不太对,3000L的匹配了两次100L,没有进行清除....
实现窗口

public class stateDemo3 {

    public static void main(String[] args) throws Exception {
        Configuration conf= new Configuration();
        conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
        //自定义端口
        conf.setInteger(RestOptions.PORT, 8050);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        env.setParallelism(1);
        SingleOutputStreamOperator<Event> operator = env.addSource(new clickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
            @Override
            public long extractTimestamp(Event element, long recordTimestamp) {
                return element.timestamp;
            }
        }));
        operator.print();

        //按url统计PV
        operator.keyBy(data -> data.url)
                        .process(new keyProcess(10 * 1000L))
                        .print();

        env.execute();
    }
}

class  keyProcess extends KeyedProcessFunction<String,Event,String>{
       private  Long  tumbleTime;
       MapState<Long, Long> mapState;


    public keyProcess(Long tumbleTime) {
        this.tumbleTime = tumbleTime;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        mapState = getRuntimeContext().getMapState(new MapStateDescriptor<Long, Long>("mapState", Long.class, Long.class));
    }

    @Override
    public void processElement(Event value, KeyedProcessFunction<String, Event, String>.Context ctx, Collector<String> out) throws Exception {
        long startTime = value.timestamp / tumbleTime * tumbleTime;
        long endTime  = startTime + tumbleTime ;
        if (mapState.contains(startTime)){
               mapState.put(startTime,mapState.get(startTime) + 1);
        }
        else {
              mapState.put(startTime,1L);
        }
        //相同的定时器会重复吗???
        ctx.timerService().registerEventTimeTimer(endTime - 1L);

    }
    
    @Override
    public void onTimer(long timestamp, KeyedProcessFunction<String, Event, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
        long endTime = timestamp + 1L;
        long startTime = endTime - tumbleTime;
        out.collect("窗口" + new Timestamp(startTime) + "--" + new Timestamp(endTime) + " " + ctx.getCurrentKey() + "  PV:   " + mapState.get(startTime));
        mapState.remove(startTime);
     }
}

这里为什么要+1 -1???有什么区别吗

看起来 ontime一次会输出多条数据,但是不是已经keyBy了吗,也就是说一个key一个ontime...是这样吗,这样做性能是不是有点低

状态生存时间TTL
算子状态

算子状态也支持不同的结构类型,主要有三种:liststate,unionliststate,broadcaststate

广播状态
状态持久化

检查点
状态后端

4. 容错机制

检查点(checkpoint)
  1. 检查点的保存
  2. 从检查点恢复状态
  3. 检查点算法
  4. 检查点配置
  5. 保存点(savepoint)
状态一致性

什么是状态一致性

  1. 对于流处理器内部来说,所谓的状态一致性,其实就是我们所说的计算结果要保证准确
  2. 一条数据不应该丢失,也不应该重复计算
  3. 在遇到故障可以恢复状态,恢复以后的重新计算,结果应该也是完全正确的

状态一致性分类

  1. 最多一次:当任务故障,最简单的做法是什么都不干,既不恢复丢失的状态,也不重播丢失的数据.at-most-once 语义的含义是最多处理一次事件
  2. 至少一次:在大多数的真实应用场景,我们希望不丢失事件,这种类型的保障称为at-least-once,意思是所有的事件都得到了处理,而一些事件还可能被处理多次
  3. 精确一次: 恰好处理一次是最严格的保证,也是最难实现的,恰好处理一次语义不仅仅意味着没有事件丢失,还意味着针对每一个数据,内部状态仅仅更新一次(逻辑上的)
端到端的精确一致性

幂等写入(idempotent writes)

所谓的幂等性操作,是说一个操作,可以重复执行很多次,但是只会导致一次结果更改,也就是说,后面再重复执行就不起作用了


幂等

有一种场景:数据消费如下
10,20,30,40,50
我们在消费到10的时候做了一个检查点,但是数据在消费到30的时候挂掉了,所以数据要从最近一个检查点恢复,所以看到的数据走势是 10,20,30,10,20,30.....虽然数据最终是一致性的,但是中间的过程客户看到的是不对的...所以我们需要事务写入这种类型的

事务写入(transactional writes)

  1. 事务(transaction)
    应用程序中一系列严密的操作.所有操作必须成功完成,否则在每个操作中所作的操作更改都会被撤销
    具有原子性:一个事务中的一系列操作要么全部成功,要么一个都不做
  2. 实现思想
    构建的事务对应着checkpoint 等到checkpoint 真正完成的时候,才把所有对应的结果写入sink系统中
  3. 实现方式
    预写日志
    两阶段提交

预写日志(write-ahead-log)
把结果数据先当成状态保存,然后在收到checkpoint完成的通知时,一次性写入sink系统
简单易于实现,由于数据提前在状态后端中做了缓存,所以无论做什么sink系统,都能用这种方式搞定
在大多数场景下是至少一次性语义的

两阶段提交(two-phase-commit)
参考一下: 第九章:一致性与共识
对于每一个checkpoint ,sink任务都会启动一个事务,并将接下来所有接收的数据添加到事务中
然后将这些数据写入到sink系统中,但不提交他们 -- 这时只是预提交(这里能查数据不?不能被消费的)
当收到checkpoint完成的通知,它才正式提交事务,实现结果的真正写入
这种方式真正实现了exactly-once,它需要一个提供事务支持的外部sink系统

flink和kafka连接时的精准一次保障

public class KafkaToKafka {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //开启checkpointing
        env.enableCheckpointing(30000, CheckpointingMode.EXACTLY_ONCE);
        env.setStateBackend(new FsStateBackend("file:///Users/xing/Desktop/flinkck20210123"));

        //设置Kafka相关参数
        Properties properties = new Properties();//设置Kafka的地址和端口
        properties.setProperty("bootstrap.servers", "linux03:9092,linux04:9092,linux05:9092");
        //读取偏移量策略:如果没有记录偏移量,就从头读,如果记录过偏移量,就接着读
        properties.setProperty("auto.offset.reset", "earliest");
        //设置消费者组ID
        properties.setProperty("group.id", "g1");
        //没有开启checkpoint,让flink提交偏移量的消费者定期自动提交偏移量
        properties.setProperty("enable.auto.commit", "false");
        //创建FlinkKafkaConsumer并传入相关参数
        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
                "kafka2021", //要读取数据的Topic名称
                new SimpleStringSchema(), //读取文件的反序列化Schema
                properties //传入Kafka的参数
        );
        //使用addSource添加kafkaConsumer
        kafkaConsumer.setCommitOffsetsOnCheckpoints(false); //在checkpoint时,不将偏移量写入到kafka特殊的topic中

        DataStreamSource<String> lines = env.addSource(kafkaConsumer);

        SingleOutputStreamOperator<String> filtered = lines.filter(e -> !e.startsWith("error"));


        //写入Kafka的topic
        String topic = "out2021";
        //设置Kafka相关参数
        properties.setProperty("transaction.timeout.ms",1000 * 60 * 5 + "");

        //创建FlinkKafkaProducer
        FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(
                topic, //指定topic
                new KafkaStringSerializationSchema(topic), //指定写入Kafka的序列化Schema
                properties, //指定Kafka的相关参数
                FlinkKafkaProducer.Semantic.EXACTLY_ONCE //指定写入Kafka为EXACTLY_ONCE语义
        );

        filtered.addSink(kafkaProducer);

        env.execute();

    }
}
/**
 * 自定义String类型数据Kafka的序列化Schema
 */
public class KafkaStringSerializationSchema implements KafkaSerializationSchema<String> {

    private String topic;
    private String charset;
    //构造方法传入要写入的topic和字符集,默认使用UTF-8
    public KafkaStringSerializationSchema(String topic) {
        this(topic, "UTF-8");
    }
    public KafkaStringSerializationSchema(String topic, String charset) {
        this.topic = topic;
        this.charset = charset;
    }
    //调用该方法将数据进行序列化
    @Override
    public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {
        //将数据转成bytes数组
        byte[] bytes = element.getBytes(Charset.forName(charset));
        //返回ProducerRecord
        return new ProducerRecord<>(topic, bytes);
    }
}

相关文章

  • 2022-05-28-Flink-47(六)

    1. 处理函数 测试案例 第一个水位线为 -9223372036854775808这里watermark 总是比...

  • 【乡土】六  六

    四姐端着一大锅猪食,吃力地举过猪圈的围墙,回头骂六六,你就是个贾宝玉!六六一边吃着槽子糕,嘻皮笑脸地问,谁是贾宝玉...

  • 六月六

    晚上吃了山东印象的羊肉汤这次超多羊肉啊……晚上普拉提很给力,基本动作很重要,教练说没有做一个俯卧撑一个月想塑形不可...

  • 六妖六随笔

    今天是六月十六号,在读《影响力》读到兄弟会的一些东西时,突然想要写点什么,就叫它六妖六随笔吧。 兄弟会是美国大...

  • 六月六

    酷热的六月,村居一周,一大重要任务就是过六月六,那是晒家谱男性聚餐的大日子。 六点不到,太阳公公红着脸从东边山...

  • 六、面试总结(六)

    一、final,finally,finalize的区别?1.final : 1、修饰符(关键字) 如果一个类被声明...

  • 六月六

    路上的各种车子 比平常多出很多 从城市飞往乡间的荒野 地下的,凄清了很久 城市的故事对谁说起 纸钱在荒塚上炙烤 缭...

  • 六叔六婶

    一天,六叔六婶来电话,也没什么事,和平常一样,简单问了些我和孩子的情况就挂电话了。我和六叔六婶并没有多亲近,不过,...

  • 六叔六叔

    我最喜欢的就是六叔,他放着四爷爷传下来的窑头不做,一整天的捞鱼套兔子,他的细狗跟着他,在北坡栗子林里,在南沟沿着河...

  • 笑话六又六

    1.不作死就不会死 一栋楼的大门外,有一辆汽车停着。车里面坐着一人。 这时另外一个人跑了过来,拿着大刀,就往车前侧...

网友评论

      本文标题:2022-05-28-Flink-47(六)

      本文链接:https://www.haomeiwen.com/subject/vpokprtx.html