美文网首页
Stream SQL Join【流转批】

Stream SQL Join【流转批】

作者: bigdata张凯翔 | 来源:发表于2021-03-31 06:43 被阅读0次

    场景说明

    假定某个Flink业务1每秒就会收到1条消息记录,消息记录某个用户的基本信息,包括名字、性别、年龄。另有一个Flink业务2会不定时收到1条消息记录,消息记录该用户的名字、职业信息。

    基于某些业务要求,开发的Flink应用程序实现功能:实时的以根据业务2中消息记录的用户名字作为关键字,对两个业务数据进行联合查询。

    数据规划

    • 业务1的数据存储在Kafka组件中。向Kafka组件发送数据(需要有Kafka权限用户),并从Kafka组件接收数据。Kafka配置参见样例数据规划章节。
    • 业务2的数据通过socket接收消息记录,可使用netcat命令用户输入模拟数据源。
      • 使用Linux命令netcat -l -p <port>,启动一个简易的文本服务器。
      • 启动应用程序连接netcat监听的port成功后,向netcat终端输入数据信息。

    开发思路

    1.启动Flink Kafka Producer应用向Kafka发送数据。
    2.启动Flink Kafka Consumer应用从Kafka接收数据,构造Table1,保证topic与producer一致。
    3.从soket中读取数据,构造Table2。
    4.使用Flink SQL对Table1和Table2进行联合查询,并进行打印。

    功能介绍

    在Flink应用中,调用flink-connector-kafka模块的接口,生产并消费数据。
    用户在开发前需要kafka-client-1.1.0.jar,该jar包可在maven reposity目录下获取。

    下面列出producer和consumer,以及Flink Stream SQL Join使用主要逻辑代码作为演示:
    1.每秒钟往Kafka中生产一条用户信息,用户信息有姓名、年龄、性别组成。

    //producer代码
    public class WriteIntoKafka {
          public static void main(String[] args) throws Exception { 
            // 构造执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 设置并发度
            env.setParallelism(1);
            // 解析运行参数
            ParameterTool paraTool = ParameterTool.fromArgs(args);
            // 构造流图,将自定义Source生成的数据写入Kafka
            DataStream<String> messageStream = env.addSource(new SimpleStringGenerator());
    
            FlinkKafkaProducer010 producer = new FlinkKafkaProducer010<>(new FlinkKafkaProducer010<>(paraTool.get("topic"),
    
               new SimpleStringSchema(),
    
               paraTool.getProperties()));
    
            messageStream.addSink(producer);
    
            // 调用execute触发执行
            env.execute();
         }
    
    // 自定义Source,每隔1s持续产生消息
    public static class SimpleStringGenerator implements SourceFunction<String> {
            static final String[] NAME = {"Carry", "Alen", "Mike", "Ian", "John", "Kobe", "James"};
    
            static final String[] SEX = {"MALE", "FEMALE"};
    
            static final int COUNT = NAME.length;   
    
            boolean running = true;
    
            Random rand = new Random(47);
    
           @Override
            //rand随机产生名字,性别,年龄的组合信息
             public void run(SourceContext<String> ctx) throws Exception {
    
                while (running) {
    
                    int i = rand.nextInt(COUNT);
    
                    int age = rand.nextInt(70);
    
                    String sexy = SEX[rand.nextInt(2)];
    
                    ctx.collect(NAME[i] + "," + age + "," + sexy);
    
                    thread.sleep(1000);
    
                }
    
        }
    
           @Override
    
           public void cancel() {
    
             running = false;
    
           }
    
         }
    
       }
    

    2.生成Table1和Table2,并使用Join对Table1和Table2进行联合查询,打印输出结果。

    public class SqlJoinWithSocket {
        public static void main(String[] args) throws Exception{
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
    
            //基于EventTime进行处理
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    
            env.setParallelism(1);
    
            ParameterTool paraTool = ParameterTool.fromArgs(args);
    
            //Stream1,从Kafka中读取数据
            DataStream<Tuple3<String, String, String>> kafkaStream = env.addSource(new FlinkKafkaConsumer010<>(paraTool.get("topic"),
                    new SimpleStringSchema(),
                    paraTool.getProperties())).map(new MapFunction<String, Tuple3<String, String, String>>() {
                @Override
                public Tuple3<String, String, String> map(String s) throws Exception {
                    String[] word = s.split(",");
    
                    return new Tuple3<>(word[0], word[1], word[2]);
                }
            });
    
            //将Stream1注册为Table1
            tableEnv.registerDataStream("Table1", kafkaStream, "name, age, sexy, proctime.proctime");
    
            //Stream2,从Socket中读取数据
            DataStream<Tuple2<String, String>> socketStream = env.socketTextStream(hostname, port, "\n").
                    map(new MapFunction<String, Tuple2<String, String>>() {
                        @Override
                        public Tuple2<String, String> map(String s) throws Exception {
                            String[] words = s.split("\\s");
                            if (words.length < 2) {
                                return new Tuple2<>();
                            }
    
                            return new Tuple2<>(words[0], words[1]);
                        }
                    });
    
            //将Stream2注册为Table2
            tableEnv.registerDataStream("Table2", socketStream, "name, job, proctime.proctime");
    
            //执行SQL Join进行联合查询
            Table result = tableEnv.sqlQuery("SELECT t1.name, t1.age, t1.sexy, t2.job, t2.proctime as shiptime\n" +
                    "FROM Table1 AS t1\n" +
                    "JOIN Table2 AS t2\n" +
                    "ON t1.name = t2.name\n" +
                    "AND t1.proctime BETWEEN t2.proctime - INTERVAL '1' SECOND AND t2.proctime + INTERVAL '1' SECOND");
    
            //将查询结果转换为Stream,并打印输出
            tableEnv.toAppendStream(result, Row.class).print();
    
            env.execute();
        }
    }
    

    相关文章

      网友评论

          本文标题:Stream SQL Join【流转批】

          本文链接:https://www.haomeiwen.com/subject/dcpnhltx.html