Flink 使用之数据分流

作者: AlienPaul | 来源:发表于2022-09-29 16:18 被阅读0次

    Flink 使用介绍相关文档目录

    Flink 使用介绍相关文档目录

    背景

    数据分流在Flink中叫做旁路输出(side output)。Flink的工作流程可以理解为是一条流水线。我们编写的程序是流水线上的各个工序。然而,在一些场景中,我们将一部分满足特定条件的数据从主线分出去,形成旁路输出,好比是流水线中发现不合格的产品必须要剔除一样。本篇为大家带来Flink使用旁路输出的方法。

    OutputTag

    Flink可以支持1个或多个side output。不同的side output使用OutputTag区分。OutputTag是side output数据流的标记,和数据流严格对应。

    OutputTag使用如下方式创建:

    public static final OutputTag<X> someTag = new OutputTag<X>("tag-name"){};
    

    其中泛型X为旁路输出流的数据类型,tag-name为tag的名称。

    需要注意的是我们必须使用匿名内部类的方式创建OutputTag。下面这种写法是错误的:

    public static final OutputTag<X> someTag = new OutputTag<X>("tag-name");
    

    这是因为Flink内部使用clazz.getGenericSuperclass()方式获取父类的Type,返回的类型是ParameterizedType,从中可以获取到泛型类型(代码位于TypeExtractor::getParameterType)。如果不使用匿名内部类则无法获取到泛型类型,会引发错误。

    迟到数据分流

    下面这个例子是将迟到的数据单独从旁路输出。代码和分析如下所示:

    public class OutOfOrderDemo {
        // 创建tag
        public static final OutputTag<Tuple2<String, Integer>> lateTag = new OutputTag<Tuple2<String, Integer>>("late"){};
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
            // 示例数据,其中D乱序,I来迟(H到来的时候认为15000ms之前的数据已经到齐)
            SingleOutputStreamOperator<Tuple2<String, Integer>> source = executionEnvironment.fromElements(
                    new Tuple2<>("A", 0),
                    new Tuple2<>("B", 1000),
                    new Tuple2<>("C", 2000),
                    new Tuple2<>("D", 7000),
                    new Tuple2<>("E", 3000),
                    new Tuple2<>("F", 4000),
                    new Tuple2<>("G", 5000),
                    new Tuple2<>("H", 20000),
                    new Tuple2<>("I", 8000)
            ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Integer>>forGenerator(new WatermarkGeneratorSupplier<Tuple2<String, Integer>>() {
                // 这里自定义WatermarkGenerator的原因是Flink按照运行时间周期发送watermark,但我们的例子是单次执行的,可以认为数据是一瞬间到来
                // 因此我们改写为每到来一条数据发送一次watermark,watermark的时间戳为数据的事件事件减去5000毫秒,意思是最多容忍数据来迟5000毫秒
                @Override
                public WatermarkGenerator<Tuple2<String, Integer>> createWatermarkGenerator(Context context) {
                    return new WatermarkGenerator<Tuple2<String, Integer>>() {
                        @Override
                        public void onEvent(Tuple2<String, Integer> event, long eventTimestamp, WatermarkOutput output) {
                            long watermark = eventTimestamp - 5000L < 0 ? 0L : eventTimestamp - 5000L;
                            output.emitWatermark(new Watermark(watermark));
                        }
    
                        @Override
                        public void onPeriodicEmit(WatermarkOutput output) {
    
                        }
                    };
                }
            // 取第二个字段为watermark
            }).withTimestampAssigner((element, timestamp) -> element.f1));
    
            // 窗口大小5秒,允许延迟5秒
            // watermark和allowedLateness的区别是,watermark决定了什么时候窗口数据触发计算,allowedLateness决定什么数据被认为是lateElement,从而发送到sideOutput
            // 设置side output tag
            source.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))).allowedLateness(Time.seconds(5)).sideOutputLateData(lateTag).process(new ProcessAllWindowFunction<Tuple2<String, Integer>, Object, TimeWindow>() {
                @Override
                public void process(ProcessAllWindowFunction<Tuple2<String, Integer>, Object, TimeWindow>.Context context, Iterable<Tuple2<String, Integer>> elements, Collector<Object> out) throws Exception {
                    Iterator<Tuple2<String, Integer>> iterator = elements.iterator();
                    System.out.println("--------------------");
                    while(iterator.hasNext()) {
                        System.out.println(iterator.next());
                    }
                }
            // 打印sideoutput流内容
            }).getSideOutput(lateTag).process(new ProcessFunction<Tuple2<String, Integer>, Object>() {
                @Override
                public void processElement(Tuple2<String, Integer> value, ProcessFunction<Tuple2<String, Integer>, Object>.Context ctx, Collector<Object> out) throws Exception {
                    System.out.println("Late element: " + value);
                }
            });
    
            executionEnvironment.execute();
        }
    }
    

    从执行结果可以发现打印出了Late element: (I,8000)。表明I为迟到的元素,它被发往旁路输出。

    条件分流

    除了迟到数据输出之外,用户还可以根据自定义条件将数据分流。下面的例子根据一个整数数据流元素的奇偶性,分发到不同的side output中。代码如下:

    public class SplitDemo {
        public static final OutputTag<Integer> evenTag = new OutputTag<Integer>("even"){};
        public static final OutputTag<Integer> oddTag = new OutputTag<Integer>("odd"){};
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
    
            DataStreamSource<Integer> source = executionEnvironment.fromElements(1, 2, 3, 4, 5);
    
            SingleOutputStreamOperator<Integer> process = source.process(new ProcessFunction<Integer, Integer>() {
                @Override
                public void processElement(Integer value, ProcessFunction<Integer, Integer>.Context ctx, Collector<Integer> out) throws Exception {
                    if (value % 2 == 0) {
                        // 这里不使用out.collect,而是使用ctx.output
                        // 这个方法多了一个参数,可以指定output tag,从而实现数据分流
                        ctx.output(evenTag, value);
                    } else {
                        ctx.output(oddTag, value);
                    }
                }
            });
    
            // 依赖OutputTag获取对应的旁路输出
            DataStream<Integer> evenStream = process.getSideOutput(evenTag);
            DataStream<Integer> oddStream = process.getSideOutput(oddTag);
    
            // 分别打印两个旁路输出流中的数据
            evenStream.process(new ProcessFunction<Integer, String>() {
                @Override
                public void processElement(Integer value, ProcessFunction<Integer, String>.Context ctx, Collector<String> out) throws Exception {
                    out.collect("Even: " + value);
                }
            }).print();
    
            oddStream.process(new ProcessFunction<Integer, String>() {
                @Override
                public void processElement(Integer value, ProcessFunction<Integer, String>.Context ctx, Collector<String> out) throws Exception {
                    out.collect("Odd: " + value);
                }
            }).print();
    
            executionEnvironment.execute();
        }
    }
    

    条件分流在实际开发中用途非常广泛。比如将数据分类发往不同的下游sink。也可以将不符合规范无法处理的异常数据通过旁路输出收集起来,便于问题的收集和定位。

    本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。

    相关文章

      网友评论

        本文标题:Flink 使用之数据分流

        本文链接:https://www.haomeiwen.com/subject/yjjkartx.html