美文网首页
(2)FlinkSQL滚动窗口demo演示

(2)FlinkSQL滚动窗口demo演示

作者: NBI大数据可视化分析 | 来源:发表于2022-08-08 09:44 被阅读0次

    滚动窗口(Tumbling Windows) 滚动窗口有固定的大小,是一种对数据进行均匀切片的划分方式。窗口之间没有重叠,也不会有间隔,是“首尾相接”的状态。滚动窗口可以基于时间定义,也可以基于数据个数定义;需要的参数只有一个,就是窗口的大小(window size)。


    1.png

    demo演示:
    场景:接收通过socket发送过来的数据,每30秒触发一次窗口计算逻辑
    (1)准备一个实体对象,消息对象

    package com.pojo;
    
    import java.io.Serializable;
    
    /**
     * Created by lj on 2022-07-05.
     */
    public class WaterSensor implements Serializable {
        private String id;
        private long ts;
        private int vc;
    
        public WaterSensor(){
    
        }
    
        public WaterSensor(String id,long ts,int vc){
            this.id = id;
            this.ts = ts;
            this.vc = vc;
        }
    
        public int getVc() {
            return vc;
        }
    
        public void setVc(int vc) {
            this.vc = vc;
        }
    
        public String getId() {
            return id;
        }
    
        public void setId(String id) {
            this.id = id;
        }
    
        public long getTs() {
            return ts;
        }
    
        public void setTs(long ts) {
            this.ts = ts;
        }
    }
    

    (2)编写socket代码,模拟数据发送

    package com.producers;
    
    import java.io.BufferedWriter;
    import java.io.IOException;
    import java.io.OutputStreamWriter;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.util.Random;
    
    /**
     * Created by lj on 2022-07-05.
     */
    public class Socket_Producer {
        public static void main(String[] args) throws IOException {
    
            try {
                ServerSocket ss = new ServerSocket(9999);
                System.out.println("启动 server ....");
                Socket s = ss.accept();
                BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(s.getOutputStream()));
                String response = "java,1,2";
    
                //每 2s 发送一次消息
                int i = 0;
                Random r=new Random();   
                String[] lang = {"flink","spark","hadoop","hive","hbase","impala","presto","superset","nbi"};
    
                while(true){
                    Thread.sleep(2000);
                    response= lang[r.nextInt(lang.length)] + "," + i + "," + i+"\n";
                    System.out.println(response);
                    try{
                        bw.write(response);
                        bw.flush();
                        i++;
                    }catch (Exception ex){
                        System.out.println(ex.getMessage());
                    }
    
                }
            } catch (IOException | InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    

    (3)从socket端接收数据,并设置30秒触发执行一次窗口运算

    package com.examples;
    
    import com.pojo.WaterSensor;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.Tumble;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.types.Row;
    
    import static org.apache.flink.table.api.Expressions.$;
    import static org.apache.flink.table.api.Expressions.lit;
    
    /**
     * Created by lj on 2022-07-06.
     *
     * 滚动窗口(Tumbling Windows) 滚动窗口有固定的大小,是一种对数据进行均匀切片的划分方式。窗口之间没有重叠,也不会有间隔,
     * 是“首尾相接”的状态。滚动窗口可以基于时间定义,也可以基于数据个数定义;需要的参数只有一个,
     * 就是窗口的大小(window size)。
     */
    public class Flink_Group_Window_Tumble {
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
            DataStreamSource<String> streamSource = env.socketTextStream("127.0.0.1", 9999,"\n");
            SingleOutputStreamOperator<WaterSensor> waterDS = streamSource.map(new MapFunction<String, WaterSensor>() {
                @Override
                public WaterSensor map(String s) throws Exception {
                    String[] split = s.split(",");
                    return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
                }
            });
    
            // 将流转化为表
            Table table = tableEnv.fromDataStream(waterDS,
                    $("id"),
                    $("ts"),
                    $("vc"),
                    $("pt").proctime());
    
            tableEnv.createTemporaryView("EventTable", table);
    
            Table result = tableEnv.sqlQuery(
                    "SELECT " +
                            "id, " +                //window_start, window_end,
                            "COUNT(ts) ,SUM(ts)" +
                            "FROM TABLE( " +
                            "TUMBLE( TABLE EventTable , " +
                            "DESCRIPTOR(pt), " +
                            "INTERVAL '30' SECOND)) " +
                            "GROUP BY id , window_start, window_end"
            );
    
    //        tableEnv.toChangelogStream(result).print("count");
    //        tableEnv.toDataStream(result).print("toDataStream");
    //        tableEnv.toAppendStream(result, Row.class).print("toAppendStream");           //追加模式
            tableEnv.toRetractStream(result, Row.class).print("toRetractStream");       //缩进模式
            env.execute();
        }
    }
    

    (4)效果演示


    2.png 3.png

    相关文章

      网友评论

          本文标题:(2)FlinkSQL滚动窗口demo演示

          本文链接:https://www.haomeiwen.com/subject/opkvwrtx.html