美文网首页
Flink实战—Flink SQL在Batch场景的Demo

Flink实战—Flink SQL在Batch场景的Demo

作者: 北邮郭大宝 | 来源:发表于2020-04-11 19:14 被阅读0次

    最近工作会用到Flink SQL,周末学习了一下,写个demo做记录,全部代码请参考Github.

    基于的Flink版本是1.9.1,使用的是java8开发。

    本例是Flink SQL在Batch场景下的应用,目标是从students、scores表中读取学生的信息,计算班级平均分。

    1. 准备数据

    students.txt 保存学生信息:id,name,classname

    1 张三 1班
    2 李四 1班
    3 王五 2班
    4 赵六 2班
    5 郭大宝 2班
    

    scores.txt 保存成绩:id,chinese,math,english

    1 100 90 80
    2 97 87 74
    3 70 50 43
    4 100 99 99
    5 80 81 82
    

    2. 创建工程

    根据官网的提示,通过mvn创建flink项目

       $ mvn archetype:generate                               \
          -DarchetypeGroupId=org.apache.flink              \
          -DarchetypeArtifactId=flink-quickstart-java      \
          -DarchetypeVersion=1.9.0
    

    创建后使用IDEA打开,项目结构如图,把创建好的两份数据保存在resources中.


    1586602165374.jpg

    编辑pom.xml,主要是引入一些flink的依赖:

    <dependencies>
    <!--flink core-->
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-java</artifactId>
          <version>${flink.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
          <version>${flink.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
          <version>${flink.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-clients_${scala.binary.version}</artifactId>
          <version>${flink.version}</version>
        </dependency>
        <!--flink-table-->
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
          <version>${flink.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
          <version>${flink.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
          <version>${flink.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-table-common</artifactId>
          <version>${flink.version}</version>
        </dependency>
        <!--kafka-->
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-kafka-0.10_${scala.binary.version}</artifactId>
          <version>${flink.version}</version>
        </dependency>
      </dependencies>
    

    3. 实现功能

    创建SQLBatch的JAVA类,实现功能。

    package com.cmbc.flink;
    ​
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.DataSet;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.tuple.*;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.java.BatchTableEnvironment;
    ​
    import static org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE;
    ​
    ​
    public class SQLBatch {
        public static void main(String[] args) throws Exception {
            // set up execution environment
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
    ​
            // read files
            DataSet<String> s_students = env.readTextFile("/Users/guoxingyu/Documents/work/java/flink/flinksql/src/main/resources/students.txt");
            DataSet<String> s_score = env.readTextFile("/Users/guoxingyu/Documents/work/java/flink/flinksql/src/main/resources/scores.txt");
    ​
            // prepare data
            DataSet<Tuple3<Integer, String, String>> students = s_students.map(new MapFunction<String, Tuple3<Integer, String, String>>() {
                @Override
                public Tuple3<Integer, String, String> map(String s) throws Exception {
                    String[] line = s.split(" ");
                    return new Tuple3<Integer, String, String>(Integer.valueOf(line[0]), line[1], line[2]);
                }
            });
    ​
            DataSet<Tuple4<Integer, Integer, Integer, Integer>> score = s_score.map(new MapFunction<String, Tuple4<Integer, Integer, Integer, Integer>>() {
                @Override
                public Tuple4<Integer, Integer, Integer, Integer> map(String s) throws Exception {
                    String[] line = s.split(" ");
                    return new Tuple4<Integer, Integer, Integer, Integer>(Integer.valueOf(line[0]), Integer.valueOf(line[1]),
                            Integer.valueOf(line[2]), Integer.valueOf(line[3]));
                }
            });
    ​
            // join data
            DataSet<Tuple6<Integer, String, String, Integer, Integer, Integer>> data = students.join(score)
                    .where(0)
                    .equalTo(0)
                    .projectFirst(0,1,2)
                    .projectSecond(1,2,3);
    ​
    ​
            // register to a table
            tEnv.registerDataSet("Data", data, "id, name, classname, chinese, math, english");
    ​
    ​
            // do sql
            Table sqlQuery = tEnv.sqlQuery("SELECT classname, AVG(chinese) as avg_chinese, AVG(math) as avg_math, AVG(english) as avg_english, " +
                    "AVG(chinese + math + english) as avg_total " +
                    "FROM Data " +
                    "GROUP BY classname " +
                    "ORDER BY avg_total"
            );
    ​
            // to sink
            DataSet<Info> result = tEnv.toDataSet(sqlQuery, Info.class);
            result.writeAsText("/Users/guoxingyu/Documents/work/java/flink/flinksql/src/main/resources/info.txt", OVERWRITE);
            tEnv.execute("do flink sql demo in batch");
    ​
        }
    ​
        public static class Info {
            public String classname;
            public Integer avg_chinese;
            public Integer avg_math;
            public Integer avg_english;
            public Integer avg_total;
    ​
            public Info() {
            }
    ​
            public Info(String classname, Integer avg_chinese, Integer avg_math, Integer avg_english, Integer avg_total) {
                this.classname = classname;
                this.avg_chinese = avg_chinese;
                this.avg_math = avg_math;
                this.avg_english = avg_english;
                this.avg_total = avg_total;
            }
    ​
            @Override
            public String toString() {
                return
                        "classname=" + classname +
                        ", avg_chinese=" + avg_chinese +
                        ", avg_math=" + avg_math +
                        ", avg_english=" + avg_english +
                        ", avg_total=" + avg_total +
                        "";
            }
        }
    }
    

    功能比较简单,简单说一下:

    • 初始化flink env
    • 读取文件数据,这里读取student.txt、scores.txt两张表
    • 数据预处理,这里通过id字段将两个表的数据join出dataset
    • 将dataset映射成table,并执行sql
    • 数据保存

    4. 运行和结果

    • 启动flink on local的模式 ,在flink的安装路径下找到脚本start-cluster.sh
    • mvn打Jar包:mvn clean package,或者在idea里完成这一步,jar包位置在项目target路径下
    • 执行脚本:
    flink run -c com.cmbc.flink.SQLBatch flinksql-1.0-SNAPSHOT.jar
    
    • 结果


      1586602913833.jpg

    5. 参考

    相关文章

      网友评论

          本文标题:Flink实战—Flink SQL在Batch场景的Demo

          本文链接:https://www.haomeiwen.com/subject/nyoumhtx.html