美文网首页flinkFlink实践
Flink中Window/WaterMark/SideOutpu

Flink中Window/WaterMark/SideOutpu

作者: 神奇的考拉 | 来源:发表于2019-04-02 17:26 被阅读0次

    一.分类

    TunbingWindow:滚动窗口

    1.前后两个计算不存在重叠

    SlidingWindow:滑动窗口

    1.元素会在多个窗口中存在,存在重叠

    二.方式

    基于Time方式

    • EventTime:
    1. 每个独立event在其生产设备上产生的时间;
      2.event记录的时间戳在进入flink时已经存在;
      在使用的时候需要提供时间戳提取方法
      (实现AssignTimeStampAndWaterMark接口)
      3.使用eventtime时,数据决定了数据进度时间,并不受系统的物理时钟影响;
      4.基于EventTime实现的程序需要指定如何生成TimeStamp和WaterMark这样能够显示event处理进度;
    • IngestionTime:
      1.该time记录的是event进入flink的时间;一般是在source操作时每个event记录都能得到当前source的时间,而后续的基于time的操作使用的时间戳即为该时间戳;
      2.IngestTime处于EventTime和ProcessTime间;对比ProcessTime提供稳定的timestamp,成本却有点高;同时在进行每个Window操作时对应的timestamp却是相同的,不同于ProcessTime进行每个Window操作使用的不同时间戳;
      对比EventTime来说面对out-of-order或late data记录时却无能为力.除此之外两者是类似的,由于IngestTime对应的timestamp是自动生成的,则watermark不需要指定;
    • ProcessTime:
      1.event在flink中被执行的时间,是基于当前执行机器的物理时钟(会导致不同的机器上ProcessTime存在差异)
      2.执行Window的操作是基于机器物理时钟周期内达到的所有记录的操作;
      (比如当应用09:15开始,对应的窗口大小1h,则第一个window[9:15, 10:00],第二个window[10:00,11:00]等等)
      3.ProcessTime相对来说是一个比较简单,同时也不需要streams和machine间的协调的Window时间机制,并能保证最好的吞吐性能又保障了低延迟.
      4.在分布式和异构的环境下,ProcessTime会受event到达系统的影响其确定性会出现不确定性;

    基于Count方式

    三.应用

    类结构

    • TimeCharacteristic

      • 目前只提供:ProcessingTime/IngestionTime/EventTime三类时间类型
    • Window:
      1.窗口Window主要用来将不同event分组到不同的buckets中;
      2.maxTimestamp()用来标记在某一时刻,<=maxTimestamp的记录均会到达对应的Window;
      3.任何实现Window抽象类的子类,需要实现equals()和hashCode()方法来保证逻辑相同的Window得到同样的处理;
      4.每种Window都需要提供的Serialzer实现用于Window类型的序列化

      • TimeWindow:
        1.时间类型窗口:具有一个从[start,end)间隔的窗口;
        2.在使用过程中能够产生多个Window
        • maxTimestamp=end-1;
          例如当前创建时间10:05,对应的窗口间隔=5min,则窗口的有效间隔[10:05, 10:10);结束点 ≈ 10:09 59:999
        • 实现equals:针对相同TimeWindow比较其窗口start和end
        • 实现hashCode: 基于start + end将long转为int
        • intersects:判断指定Window是否包含在当前窗口内
        • cover:根据指定Window和当前窗口生成新的包含两个窗口的新Window
    • GlobalWindow:
      1.默认使用的Window,将所有数据放置到一个窗口;对应窗口时间戳不超过Long.MAX_VALUE即可;
      2.在使用过程中只会存在一个GlobalWindow;
      * maxTimestamp=Long.MAX_VALUE
      * 实现equals:只要属于相同类型即可
      * 实现hashCode: return 0;

    • Serializer:
      1.主要用于完成对Window序列化
      2.通过继承抽象类TpyeSerializerSingleton<? extends Window>

    • 接口: TypeSerializer<T>
      1.描述Flink运行时处理数据类型所需的序列化和复制方法。在该接口中的方法被假定为无状态的,因此它实际上是线程安全的。
      (有状态的这些方法的实现可能会导致不可预测的副作用,并且会损害程序的稳定性和正确性)
      2.duplicate()
      创建一个serializer的deep copy:
      a.若serializer是无状态的 则直接返回this
      b.若是serializer是有状态的,则需要创建该serializer的deep-copy
      由于serializer可能会在多个thread中被使用,对应无状态的serializer是线程安全的,而有状态的则是存在非线程安全的风险;
      3.snapshotConfiguration()
      创建serializer当前配置snapshot及其关联的managed state一起存储;
      配置snapshot需要包括serializer的parameter设置以及序列化格式等信息;
      当一个新的serializer注册用来序列化相同的Managed State,配置snapshot需要确保新的Serializer的兼容性,也会存在状态迁移的需要;
      4.ensureCompatibility()
      用于完成不同的Serializer间的兼容性:
      a.snapshot配置类型属于ParameterlessTypeSerializerConfig同时当前Serializer标识相同则进行兼容处理
      b.当不满足a情况 则需要进行状态迁移

    • 关于TimeWindow的mergeWindows:
      针对TimeWindow定义的窗口集将重叠/交叉部分进行合并,减少Window的数量;
      首先会将所有的Window基于其start字段进行排序,便于Window合并.
      a.当前记录的Window包含迭代的Window,则会以当前Window作为key,并将迭代Window放置到Set中
      b.当前记录的Window并不包含迭代的Window,重新添加一条新的记录<candidate,Set<TimeWindow>>
      以下是使用伪码

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
     // 指定使用eventtime
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
    
    DataStream<UserDefinedEvent> stream = env.addSource(new FlinkKafkaConsumer09<UserDefinedEvent>(topic, schema, props));
    
    stream
        .keyBy( (event) -> event.getUser() )
        .timeWindow(Time.hours(1)) // 指定窗口:大小=1h,以自然小时为周期
        .reduce( (a, b) -> a.add(b) )
        .addSink(...);
    

    四. Watermark

    在Flink中提供了使用Eventtime来衡量event被处理额机制: Watermark.会作为DataStream的一部分进行传输并携带timestamp,比如Watermark(t)声明了达到Window数据的结束时间,换句话说也是没有DataStream中的element对应的timestamp t' <= t; watermark是一种衡量Event Time进展的机制,它是数据本身的一个隐藏属性。通常基于Event Time的数据,自身都包含一个timestamp,例如1472693399700(2016-09-01 09:29:59.700),而这条数据的watermark时间则可能是:

    watermark(1472693399700) = 1472693396700(2016-09-01 09:29:56.700)
    
    watermark

    在实际中流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。在然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但也并不排除由于网络、背压等原因,导致乱序的产生(out-of-order或者说late element)。
    但是对于late element,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark。
    生成watermark的方式主要有2大类:

    (1):With Periodic Watermarks  # 定义一个最大允许乱序的时间
    (2):With Punctuated Watermarks
    

    接下来会结合代码展示 Window + Watermark + SideOutput基于EventTime来解决乱序/延迟等情况的解决方案(仅限于演示)

    五.实例

    以下实例通过模拟现实环境的数据产生,同时通过ScheduledExecutorService来默认延迟event超出有效时间Window范围,完成对乱序数据的排序,通过也对超过有效时间Window的event单独处理使用SideOutput.

    import org.apache.commons.collections.IteratorUtils;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.tuple.Tuple;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
    import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.util.Collector;
    import org.apache.flink.util.OutputTag;
    import java.util.*;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import java.util.concurrent.*;
    import static java.util.concurrent.TimeUnit.SECONDS;
    
    /**
     * @Auther: dalan
     * @Date: 19-4-2 11:35
     * @Description:
     */
    public class SimpleSideOutput {
        /** logger */
        private static final Logger LOGGER = LoggerFactory.getLogger(SimpleSideOutput.class);
        public static void main(String[] args) throws Exception {
            final OutputTag<SimpleWaterMark.Event> REJECTEDWORDSTAG = new OutputTag<SimpleWaterMark.Event>("rejected_words_tag"){};
    
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            String[] datas = {"hello","world","good","yes","ok","here"};
            String[] ops = {"-","+"};
    
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            env.setParallelism(4);
    
            DataStream<SimpleWaterMark.Event> strs = env.addSource(new SourceFunction<SimpleWaterMark.Event>() {
                private Random rand = new Random(9527);
                private volatile boolean isRunning = true;
                private volatile Long nums =0L;
    
    
                @Override public void run(SourceContext<SimpleWaterMark.Event> out) throws Exception {
                    final long cts = System.currentTimeMillis();
    
                    // 模拟延迟数据
                    final ScheduledExecutorService exec = new ScheduledThreadPoolExecutor(1);
                    exec.scheduleAtFixedRate(new Runnable() {
                        @Override public void run() {
                          SimpleWaterMark.Event e = new SimpleWaterMark.Event(datas[rand.nextInt(datas.length)], ops[rand.nextInt(2)].equals("+")? (cts + rand.nextInt(100)) : (cts - rand.nextInt(100)) );
                            System.out.println(
                                "======single thread event=====" + e + " current_thread_id " + Thread.currentThread().getId());
                            out.collect(e);
                        }}, 3, 4, TimeUnit.SECONDS);
    
                    // 模拟正常数据
                    while (isRunning && nums < 500){
                        long ts = System.currentTimeMillis();
                        SimpleWaterMark.Event e = new SimpleWaterMark.Event(datas[rand.nextInt(datas.length)], ops[rand.nextInt(2)].equals("+")? (ts + rand.nextInt(100)) : (ts - rand.nextInt(100)) );
                        System.out.println("======event=====" + e + " current_thread_id " + Thread.currentThread().getId());
                        out.collect(e);
    
                        nums++;
                        Thread.sleep(rand.nextInt(50)+10);
                    }
                    exec.shutdown();
                }
    
                @Override public void cancel() {
                    isRunning = false;
                }
            });
    
            SingleOutputStreamOperator<SimpleWaterMark.Event> sides = strs
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SimpleWaterMark.Event>(Time.of(2L, SECONDS)) {
                    private volatile Long currentTimestamp = 0L;
                    @Override public long extractTimestamp(SimpleWaterMark.Event event) {
                        long ts = event.ts;
                        currentTimestamp = ts > currentTimestamp ? ts : currentTimestamp;
                        return ts;
                    }
                })
                .keyBy("name")
    //            .process(new KeyedProcessFunction<String, SimpleWaterMark.Event, SimpleWaterMark.Event>() {
    //                @Override
    //                public void processElement(SimpleWaterMark.Event event, Context ctx, Collector<SimpleWaterMark.Event> out)
    //                    throws Exception {
    //                    String key = event.name;
    //                    if(key.length() >= 5){
    //                        ctx.output(REJECTEDWORDSTAG, event);
    //                    }else if (key.length() > 0){
    //                        out.collect(event);
    //                    }
    //                }
    //            })
                //.timeWindow(Time.of(2, SECONDS))
                .window(TumblingEventTimeWindows.of(Time.seconds(2)))
                .sideOutputLateData(REJECTEDWORDSTAG)
                .apply(new WindowFunction<SimpleWaterMark.Event, SimpleWaterMark.Event, Tuple, TimeWindow>() {
                    @Override
                    public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<SimpleWaterMark.Event> iterable,
                        Collector<SimpleWaterMark.Event> out) throws Exception {
                        Iterator<SimpleWaterMark.Event> iter = iterable.iterator();
                        List<SimpleWaterMark.Event> events = IteratorUtils.toList(iter);
                        Collections.sort(events);
                        for (SimpleWaterMark.Event e: events) {
                            out.collect(e);
                        }
    
                        System.out.println("the time window " +
                            "\tstart " + timeWindow.getStart()+
                            "\tend " + timeWindow.getEnd() +
                            "\tkey " + tuple.toString() +
                            "\telement_size " + events.size());
    
                    }
                });
    
            // 记录延迟数据可单独做处理
            DataStream<String> events =
                sides.getSideOutput(REJECTEDWORDSTAG)
                     .map(new MapFunction<SimpleWaterMark.Event, String>() {
                         @Override public String map(SimpleWaterMark.Event event) throws Exception {
                             return "rejected_"+event;
                         }
                     });
            events.print();
    
            env.execute("a simple sideoutput demo");
        }
    }
    

    以下实例通过socket来模拟现实情况,这种方式由用户自己来提供模拟数据;效果同上面实例相似.

    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.tuple.Tuple;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
    import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
    import org.apache.flink.streaming.api.watermark.Watermark;
    import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.util.Collector;
    import org.apache.flink.util.OutputTag;
    
    import javax.annotation.Nullable;
    import java.text.SimpleDateFormat;
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.Iterator;
    import java.util.List;
    import java.util.concurrent.TimeUnit;
    
    /**
     * 测试: nc -ln 9000
     *      0001,1538359882000
     *      0002,1538359886000
     *      0003,1538359892000
     *      0004,1538359893000
     *      0005,1538359894000
     *      0006,1538359896000
     *      0007,1538359897000
     *      0008,1538359897000
     *      0009,1538359872000 此条信息比较触发sideoutput的存储 已超出Window的有效时间
     * @Auther: dalan
     * @Date: 19-4-2 15:36
     * @Description:
     */
    public class SocketSideOutput {
        public static void main(String[] args) throws Exception {
            //定义socket的端口号
            int port = 9000;
            //获取运行环境
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //设置使用eventtime,默认是使用processtime
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            //设置并行度为1,默认并行度是当前机器的cpu数量
            env.setParallelism(4);
            //连接socket获取输入的数据
            DataStream<String> text = env.socketTextStream("localhost", port, "\n");
            //解析输入的数据
            DataStream<Tuple2<String, Long>> inputMap = text.map(new MapFunction<String, Tuple2<String, Long>>() {
                @Override public Tuple2<String, Long> map(String value) throws Exception {
                    String[] arr = value.split(",");
                    return new Tuple2<>(arr[0], Long.parseLong(arr[1]));
                }
            });
    
            //抽取timestamp和生成watermark
            // 设定水印current_watermark = max(event.timestamp) 同时设置最大可忍受延迟时间=1s;
            // 通过使用current_watermark - 最大可忍受event延迟时间,将对应的watermark代表的窗口结束时间前移来接受延迟的event
            DataStream<Tuple2<String, Long>> waterMarkStream = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() {
                Long currentMaxTimestamp = 0L;
                final Long maxOutOfOrderness = 1000L;  // 最大可忍受延迟时间1s
                // 最大允许的乱序时间是10s
                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
    
                /**
                 * 定义生成watermark的逻辑 * 默认100ms被调用一次
                 */
                @Nullable @Override public Watermark getCurrentWatermark() {
                    return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
                }
    
                //定义如何提取timestamp
                @Override public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {
                    long timestamp = element.f1;
                    currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
    //                System.out.println("key:" + element.f0 + ",eventtime:[" + element.f1 + "|" + sdf.format(element.f1) + "],currentMaxTimestamp:[" + currentMaxTimestamp + "|" + sdf.format(currentMaxTimestamp)
    //                    + "],watermark:[" + getCurrentWatermark().getTimestamp() + "|" + sdf.format(getCurrentWatermark().getTimestamp())
    //                    + "]");
                    return timestamp;
                }
            });
    
            //保存被丢弃的数据
            OutputTag<Tuple2<String, Long>> outputTag = new OutputTag<Tuple2<String, Long>>("late-data"){};
            //注意,由于getSideOutput方法是SingleOutputStreamOperator子类中的特有方法,所以这里的类型,不能使用它的父类dataStream。
            SingleOutputStreamOperator<String> window =
                waterMarkStream.keyBy(0)
                    .window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.SECONDS)))
                    //按照消息的EventTime分配窗口,和调用TimeWindow效果一样
                    // .allowedLateness(Time.seconds(2)) //允许数据迟到2秒
                    .sideOutputLateData(outputTag)
                    .apply(new WindowFunction<Tuple2<String, Long>, String, Tuple, TimeWindow>() {
                    /**
                     * 对window内的数据进行排序,保证数据的顺序
                     *
                     * @param tuple
                     * @param window
                     * @param input
                     * @param out
                     * @throws Exception
                     */
                    @Override public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception {
                        String key = tuple.toString();
                        List<Long> arrarList = new ArrayList<Long>();
                        Iterator<Tuple2<String, Long>> it = input.iterator();
                        while (it.hasNext()) {
                            Tuple2<String, Long> next = it.next();
                            arrarList.add(next.f1);
                        }
                        Collections.sort(arrarList);
                        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                        String result = key + "," + arrarList.size() + "," + sdf.format(arrarList.get(0)) + "," + sdf
                            .format(arrarList.get(arrarList.size() - 1)) + "," + sdf.format(window.getStart()) + "," + sdf.format(window.getEnd());
                        out.collect(result);
    
                        //System.out.println(result);
                    }
                });
    
            //把迟到的数据暂时打印到控制台,实际中可以保存到其他存储介质中
            // 本处延迟的event已经超过指定Window的[start,end)有效范围,并且在已忍受可延迟最大周期的基础上出现延迟的信息
            DataStream<Tuple2<String, Long>> sideOutput = window.getSideOutput(outputTag);
            sideOutput.print();
            //测试-把结果打印到控制台即可 window.print();
            // 注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行
            env.execute("eventtime-watermark");
        }
    }
    
    详细见github源码

    相关文章

      网友评论

        本文标题:Flink中Window/WaterMark/SideOutpu

        本文链接:https://www.haomeiwen.com/subject/qbdtbqtx.html