美文网首页
Apache Flink——侧输出流(side output)

Apache Flink——侧输出流(side output)

作者: 小波同学 | 来源:发表于2022-07-19 00:44 被阅读0次

    前言

    flink处理数据流时,经常会遇到这样的情况:处理一个数据源时,往往需要将该源中的不同类型的数据做分割(分流)处理,假如使用 filter算子对数据源进行筛选分割的话,势必会造成数据流的多次复制,造成不必要的性能浪费;

    flink中的侧输出,就是将数据流进行分割,而不对流进行复制的一种分流机制。flink的侧输出的另一个作用就是对延时迟到的数据进行处理,这样就可以不必丢弃迟到的数据;

    简单理解就是,根据业务上的一定规则,将一个源中的数据拆分成不同的流,即主流和侧输出流。

    侧输出流(side output)

    大部分的DataStream API的算子的输出是单一输出,也就是某种数据类型的流。除了split算子,可以将一条流分成多条流,这些流的数据类型也都相同。process function的side outputs功能可以产生多条流,并且这些流的数据类型可以不一样。一个side output可以定义为OutputTag[X]对象,X是输出流的数据类型。process function可以通过Context对象发射一个事件到一个或者多个side outputs。

    除了从DataStream操作输出主结果流外,也可以生成任一数量的额外的侧输出流。结果流可以和主输出流的类型可以不匹配,并且侧输出流可以有不同类型。侧输出流的操作当你分流时非常有用,之前你需要先复制一个流再过滤出来,有了侧输出流,就不需要这样操作。

    具体应用时,只要在处理函数的.processElement()或者.onTimer()方法中,调用上下文
    的.output()方法就可以了。

    DataStream<Integer> stream = env.addSource(...);
    
    SingleOutputStreamOperator<Long> longStream stream.process(new ProcessFunction<Integer, Long>() {
    
        @Override
        public void processElement( Integer value, Context ctx, Collector<Integer> out) throws Exception {
            // 转换成 Long,输出到主流中
            out.collect(Long.valueOf(value));
            // 转换成 String,输出到侧输出流中
            ctx.output(outputTag, "side-output: " + String.valueOf(value));
        }
    
    });
    

    当使用侧输出流时,首先需要定义一个OutputTag,它将要被用来确定一个侧输出流。

    OutputTag<String> outputTag = new OutputTag<String>("side-output") {};
    

    注意:侧输出流的类型是根据侧输出流包括元素的类型来确定。

    如果想要获取这个侧输出流,可以基于处理之后的 DataStream 直接调用.getSideOutput() 方法,传入对应的OutputTag,这个方式与窗口API 中获取侧输出流是完全一样的。

    DataStream<String> stringStream = longStream.getSideOutput(outputTag);
    

    可以从以下方法中来把数据输出到侧输出流

    在以上的函数中可以用参数Context来暴露给用户发送数据到侧输出流。下面例子是用ProcessFunction来发送数据到侧输出流。

    import com.yibo.flink.datastream.Event;
    import com.yibo.flink.sourcecustom.ClickSource;
    import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.ProcessFunction;
    import org.apache.flink.util.Collector;
    import org.apache.flink.util.OutputTag;
    import scala.Tuple3;
    
    import java.time.Duration;
    
    /**
     * @Author: huangyibo
     * @Date: 2022/7/19 0:34
     * @Description:
     */
    
    public class SideOutputStreamTest {
    
        public static void main(String[] args) throws Exception {
            //创建执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            //设置生成水位线的时间间隔
            env.getConfig().setAutoWatermarkInterval(100);
    
            //乱序流的Watermark生成
            SingleOutputStreamOperator<Event> stream = env.addSource(new ClickSource())
                    // 插入水位线的逻辑 设置 watermark 延迟时间,2 秒
                    .assignTimestampsAndWatermarks(
                            // 针对乱序流插入水位线,延迟时间设置为 2s
                            WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                                    // 抽取时间戳的逻辑
                                    .withTimestampAssigner((SerializableTimestampAssigner<Event>) (element, recordTimestamp) -> element.getTimestamp())
                    );
    
            OutputTag<Tuple3<String, String, Long>> maryTag = new OutputTag<Tuple3<String, String, Long>>("Mary"){};
            OutputTag<Tuple3<String, String, Long>> boboTag = new OutputTag<Tuple3<String, String, Long>>("Bobo"){};
    
            SingleOutputStreamOperator<Event> processStream = stream.process(new ProcessFunction<Event, Event>() {
                @Override
                public void processElement(Event event, Context context, Collector<Event> out) throws Exception {
                    if ("Mary".equals(event.getUser())) {
                        context.output(maryTag, Tuple3.apply(event.getUser(), event.getUrl(), event.getTimestamp()));
                    } else if ("Bobo".equals(event.getUser())) {
                        context.output(boboTag, Tuple3.apply(event.getUser(), event.getUrl(), event.getTimestamp()));
                    } else {
                        out.collect(event);
                    }
                }
            });
    
            processStream.print("else");
            processStream.getSideOutput(maryTag).print("Mary");
            processStream.getSideOutput(boboTag).print("Bobo");
    
            env.execute();
        }
    }
    
    import com.yibo.flink.datastream.Event;
    import com.yibo.flink.sourcecustom.ClickSource;
    import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.api.common.functions.AggregateFunction;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.util.Collector;
    import org.apache.flink.util.OutputTag;
    
    import java.time.Duration;
    
    /**
     * @Author: huangyibo
     * @Date: 2022/7/7 0:00
     * @Description: 测试迟到数据
     */
    
    public class LateDataTest {
    
        public static void main(String[] args) throws Exception {
            //创建执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            //设置生成水位线的时间间隔
            env.getConfig().setAutoWatermarkInterval(100);
    
            //乱序流的Watermark生成
            SingleOutputStreamOperator<Event> streamOperator = env.addSource(new ClickSource())
                    // 插入水位线的逻辑 设置 watermark 延迟时间,2 秒
                    .assignTimestampsAndWatermarks(
                            // 针对乱序流插入水位线,延迟时间设置为 2s
                            WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                                    // 抽取时间戳的逻辑
                                    .withTimestampAssigner((SerializableTimestampAssigner<Event>) (element, recordTimestamp) -> element.getTimestamp())
                    );
    
            //定义一个输出标签
            OutputTag<Event> lateTag = new OutputTag<Event>("late"){};
    
            //统计每个url的访问量
            SingleOutputStreamOperator<UrlViewCount> result = streamOperator.keyBy(Event::getUrl)
                    //滚动事件时间窗口
                    .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                    //允许窗口处理迟到数据 允许1分钟的延迟
                    .allowedLateness(Time.minutes(1))
                    //将最后的迟到数据输出到侧输出流
                    .sideOutputLateData(lateTag)
                    .aggregate(new UrlViewCountAgg(), new UrlViewCountResult());
    
            result.print("result");
            result.getSideOutput(lateTag).print("late");
    
            env.execute();
        }
    
    
        /**
         * 自定义实现AggregateFunction, 增量计算url页面的访问量,来一条数据就 +1
         */
        public static class UrlViewCountAgg implements AggregateFunction<Event, Long, Long> {
    
            @Override
            public Long createAccumulator() {
                return 0L;
            }
    
            @Override
            public Long add(Event value, Long accumulator) {
                return accumulator + 1;
            }
    
            @Override
            public Long getResult(Long accumulator) {
                return accumulator;
            }
    
            @Override
            public Long merge(Long a, Long b) {
                return null;
            }
        }
    
    
        /**
         * 自定义实现ProcessWindowFunction, 包装窗口信息输出
         */
        public static class UrlViewCountResult extends ProcessWindowFunction<Long, UrlViewCount, String, TimeWindow> {
    
            @Override
            public void process(String url, Context context, Iterable<Long> iterable, Collector<UrlViewCount> out) throws Exception {
                Long urlCount = iterable.iterator().next();
                //集合窗口信息输出
                long start = context.window().getStart();
                long end = context.window().getEnd();
                UrlViewCount urlCountView = new UrlViewCount();
                urlCountView.setUrl(url);
                urlCountView.setCount(urlCount);
                urlCountView.setWindowStart(start);
                urlCountView.setWindowEnd(end);
                out.collect(urlCountView);
            }
        }
    }
    

    Flink侧输出流有两个作用

    • 1、分隔过滤:充当filter算子功能,将源中的不同类型的数据做分割处理。因为使用filter 算子对数据源进行筛选分割的话,会造成数据流的多次复制,导致不必要的性能浪费,过滤后不需要的数据可以重新写入Pulsar或Kafka的topic中供其他应用消费处理。

    • 2、延时数据处理:在做对延时迟窗口计算时,对延时迟到的数据进行处理,即时数据迟到也不会造成丢失。

    参考:
    https://blog.csdn.net/rustwei/article/details/121102439

    相关文章

      网友评论

          本文标题:Apache Flink——侧输出流(side output)

          本文链接:https://www.haomeiwen.com/subject/pxljirtx.html