美文网首页Flink
Flink-sql 基于事件时间的窗口

Flink-sql 基于事件时间的窗口

作者: wudl | 来源:发表于2021-08-11 11:17 被阅读0次

    1. 基于事件时间的窗口有三种

    1.1 基于事件时间的滚动窗口

    package com.wudl.flink.sql;
    
    import com.wudl.flink.bean.WaterSensor;
    import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.Slide;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.Tumble;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.types.Row;
    
    import java.time.Duration;
    
    import static org.apache.flink.api.common.eventtime.WatermarkStrategy.forBoundedOutOfOrderness;
    import static org.apache.flink.table.api.Expressions.*;
    
    /**
     * @ClassName : Flink_Group_Window  --  基于事件 的处理滚动窗口
     * @Description : Flink sql 窗口
     * @Author :wudl
     * @Date: 2021-08-04 23:13
     */
    
    public class Flink_Group_ShiJianWindow {
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
            // 读取数据流中的数据并且提取时间搓生成waterMark
            WatermarkStrategy<WaterSensor> waterSensorWatermarkStrategy =   WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                    .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
    
                        @Override
                        public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                            return element.getTs() * 1000L;
                        }
                    });
            DataStreamSource<String> streamSource = env.socketTextStream("192.168.1.180", 9999);
            SingleOutputStreamOperator<WaterSensor> waterDS = streamSource.map(new MapFunction<String, WaterSensor>() {
                @Override
                public WaterSensor map(String s) throws Exception {
                    String[] split = s.split(",");
                    return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
                }
            }).assignTimestampsAndWatermarks(waterSensorWatermarkStrategy);
    
            // 将流转化为表
            Table table = tableEnvironment.fromDataStream(waterDS,
                    $("id"),
                    $("ts"),
                    $("vc"),
                    //  事件的处理时间
                    $("rt").rowtime());
    
            // 开窗滚动窗口计算wordCound
            Table result = table.window(Tumble.over(lit(5).seconds()).on($("rt")).as("tw"))
                    .groupBy($("id"), $("tw"))
                    .select($("id"), $("id").count());
    
            // 将结果表转化为流进行输出
    
            tableEnvironment.toAppendStream(result, Row.class).print();
            env.execute();
        }
    }
    
    

    1.2 基于事件时间的滑动窗口

    package com.wudl.flink.sql;
    
    import com.wudl.flink.bean.WaterSensor;
    import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.Slide;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.Tumble;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.types.Row;
    
    import java.time.Duration;
    
    import static org.apache.flink.table.api.Expressions.$;
    import static org.apache.flink.table.api.Expressions.lit;
    
    /**
     * @ClassName : Flink_Group_Window  --  基于事件时间的滑动窗口
     * @Description : Flink sql 窗口
     * @Author :wudl
     * @Date: 2021-08-04 23:13
     */
    
    public class Flink_Group_ShiJian_HuaDongWindow {
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
            // 读取数据流中的数据并且提取时间搓生成waterMark
            WatermarkStrategy<WaterSensor> waterSensorWatermarkStrategy =   WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                    .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
    
                        @Override
                        public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                            return element.getTs() * 1000L;
                        }
                    });
            DataStreamSource<String> streamSource = env.socketTextStream("192.168.1.180", 9999);
            SingleOutputStreamOperator<WaterSensor> waterDS = streamSource.map(new MapFunction<String, WaterSensor>() {
                @Override
                public WaterSensor map(String s) throws Exception {
                    String[] split = s.split(",");
                    return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
                }
            }).assignTimestampsAndWatermarks(waterSensorWatermarkStrategy);
    
            // 将流转化为表
            Table table = tableEnvironment.fromDataStream(waterDS,
                    $("id"),
                    $("ts"),
                    $("vc"),
                    //  事件的处理时间
                    $("rt").rowtime());
    
            // 开窗滚动窗口计算wordCound
            Table result = table.window(Slide.over(lit(6).seconds()).every(lit(2).seconds()).on($("rt")).as($("sw")))
                    .groupBy($("id"), $("sw"))
                    .select($("id"), $("id").count());
    
            // 将结果表转化为流进行输出
    
            tableEnvironment.toAppendStream(result, Row.class).print();
            env.execute();
        }
    }
    
    

    1.3 基于事件时间的会话窗口

    package com.wudl.flink.sql;
    
    import com.wudl.flink.bean.WaterSensor;
    import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.Session;
    import org.apache.flink.table.api.Slide;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.types.Row;
    
    import java.time.Duration;
    
    import static org.apache.flink.table.api.Expressions.$;
    import static org.apache.flink.table.api.Expressions.lit;
    
    /**
     * @ClassName : Flink_Group_Window  --  基于事件时间的滑动窗口
     * @Description : Flink sql 窗口
     * @Author :wudl
     * @Date: 2021-08-04 23:13
     */
    
    public class Flink_Group_ShiJian_huihuaWindow {
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
            // 读取数据流中的数据并且提取时间搓生成waterMark
            WatermarkStrategy<WaterSensor> waterSensorWatermarkStrategy =   WatermarkStrategy.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                    .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
    
                        @Override
                        public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                            return element.getTs() * 1000L;
                        }
                    });
            DataStreamSource<String> streamSource = env.socketTextStream("192.168.1.180", 9999);
            SingleOutputStreamOperator<WaterSensor> waterDS = streamSource.map(new MapFunction<String, WaterSensor>() {
                @Override
                public WaterSensor map(String s) throws Exception {
                    String[] split = s.split(",");
                    return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
                }
            }).assignTimestampsAndWatermarks(waterSensorWatermarkStrategy);
    
            // 将流转化为表
            Table table = tableEnvironment.fromDataStream(waterDS,
                    $("id"),
                    $("ts"),
                    $("vc"),
                    //  事件的处理时间
                    $("rt").rowtime());
    
            // 开窗滚动窗口计算wordCound
            Table result = table.window(Session.withGap(lit(5).seconds()).on($("rt")).as("sw"))
                    .groupBy($("id"), $("sw"))
                    .select($("id"), $("id").count());
    
            // 将结果表转化为流进行输出
    
            tableEnvironment.toAppendStream(result, Row.class).print();
            env.execute();
        }
    }
    
    

    相关文章

      网友评论

        本文标题:Flink-sql 基于事件时间的窗口

        本文链接:https://www.haomeiwen.com/subject/jekqbltx.html