Flink实战—Flink SQL+Kafka在Streamin

作者: 北邮郭大宝 | 来源:发表于2020-04-12 09:42 被阅读0次

    继续Flink的实战,这次实现的是Flink+Kafka,实现在streaming场景下的应用。全部代码请关注GitHub

    Flink版本是1.9.1,kafka版本是2.1.0,使用java8开发。

    本例是Flink SQL在Streaming场景下的应用,目标是从kafka中读取json串,串中包含id, site, proctime,计算5秒内的网站流量pv。

    1. 数据准备

    数据的json结构很简单,包含id,site,proctime三个字段。可以写个脚本不停的写入kafka的topic,我这里就简单使用kafka-console-producer.sh往里面粘贴数据了。

    {"id": 1, "site": "www.baidu.com", "proctime": "2020-04-11 00:00:01"}
    {"id": 2, "site": "www.bilibili.com/", "proctime": "2020-04-11 00:00:02"}
    {"id": 3, "site": "www.baidu.com", "proctime": "2020-04-11 00:00:03"}
    {"id": 4, "site": "www.baidu.com/", "proctime": "2020-04-11 00:00:05"}
    {"id": 5, "site": "www.baidu.com", "proctime": "2020-04-11 00:00:06"}
    {"id": 6, "site": "www.bilibili.com/", "proctime": "2020-04-11 00:00:07"}
    {"id": 7, "site": "https://github.com/tygxy", "proctime": "2020-04-11 00:00:08"}
    {"id": 8, "site": "www.bilibili.com/", "proctime": "2020-04-11 00:00:09"}
    {"id": 9, "site": "www.baidu.com", "proctime": "2020-04-11 00:00:11"}
    {"id": 10, "site": "www.bilibili.com/", "proctime": "2020-04-11 00:00:18"}
    

    2. 创建工程

    这里直接使用上一篇Flink SQL in Batch创建的项目了,具体信息可参考Flink实战—Flink SQL在Batch场景的Demo

    唯一注意的是pox.xml里添了一个处理json的依赖

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-json</artifactId>
      <version>${flink.version}</version>
    </dependency>
    

    3. 实现功能

    创建SQLStreaming的JAVA类。

    package com.cmbc.flink;
    ​
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.java.StreamTableEnvironment;
    import org.apache.flink.table.descriptors.Json;
    import org.apache.flink.table.descriptors.Kafka;
    import org.apache.flink.table.descriptors.Schema;
    ​
    import java.sql.Timestamp;
    ​
    ​
    public class SQLStreaming {
        public static void main(String[] args) throws Exception {
    ​
            // set up execution environment
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    ​
            // kafka source
            Kafka kafka = new Kafka()
                    .version("0.10")
                    .topic("flink-streaming")
                    .property("bootstrap.servers", "localhost:9092")
                    .property("zookeeper.connect", "localhost:2181");
            tableEnv.connect(kafka)
                    .withFormat(
                            new Json().failOnMissingField(true).deriveSchema()
                    )
                    .withSchema(
                            new Schema()
                                    .field("id", Types.INT)
                                    .field("site", Types.STRING)
                                    .field("proctime", Types.SQL_TIMESTAMP).proctime()
                    )
                    .inAppendMode()
                    .registerTableSource("Data");
    ​
            // do sql
            String sql = "SELECT TUMBLE_END(proctime, INTERVAL '5' SECOND) as processtime," +
                    "count(1) as pv, site " +
                    "FROM Data " +
                    "GROUP BY TUMBLE(proctime, INTERVAL '5' SECOND), site";
            Table table = tableEnv.sqlQuery(sql);
    ​
            // to sink
            tableEnv.toAppendStream(table, Info.class).print();
            tableEnv.execute("Flink SQL in Streaming");
        }
    ​
        public static class Info {
            public Timestamp processtime;
            public String site;
            public Long pv;
    ​
            public Info() {
            }
    ​
            public Info(Timestamp processtime, String site, Long pv) {
                this.processtime = processtime;
                this.pv = pv;
                this.site = site;
            }
    ​
            @Override
            public String toString() {
                return
                        "processtime=" + processtime +
                                ", site=" + site +
                                ", pv=" + pv +
                                "";
            }
        }
    }
    

    功能也比较简单,简单说一下:

    • 初始化flink env
    • 读取kafka内容,配置基本信息并,映射schema,注册成表
    • 消费数据,执行sql
    • 数据保存或输出

    4. 运行和结果

    • 启动flink on local的模式 ,在flink的安装路径下找到脚本start-cluster.sh
    • 开启zookeeper, sh zkServer start
    • 开启kafka
    sh kafka-server-start ../config/server.properties
    
    • 开启kafka-console-producer.sh,开始塞数据
    sh kafka-console-producer --broker-list localhost:9092 --topic flink-streaming
    
    • 启动flink程序,查看结果


      1586655394408.jpg

    5. 参考

    相关文章

      网友评论

        本文标题:Flink实战—Flink SQL+Kafka在Streamin

        本文链接:https://www.haomeiwen.com/subject/kafvmhtx.html