美文网首页
Flink官方入门项目简介

Flink官方入门项目简介

作者: 叩丁狼教育 | 来源:发表于2019-02-15 11:51 被阅读66次

    本文作者:林伟兵,叩丁狼高级讲师。原创文章,转载请注明出处。

    1. Flink简介

    Flink 提供了三个核心的用户API:

    1. Batch
    2. Streaming
    3. Talbe & SQL

    ​ 本文不介绍Flink是什么,Flink的核心组件和特性,本文从用户的角度解读Batch和Streaming代码的实现方式,本文使用Flink1.6.1版本作为讲解。首先需要下载官方的案例,地址是:https://ci.apache.org/projects/flink/flink-docs-release-1.6/quickstart/java_api_quickstart.html.

    2. 官方案例搭建

    1. 通过maven工具下载源码:

      $ mvn archetype:generate                               \
           -DarchetypeGroupId=org.apache.flink              \
           -DarchetypeArtifactId=flink-quickstart-java      \
           -DarchetypeVersion=1.6.1
      
    2. 也可以使用官方提供的地址下载,但是不管如何 都必须保证有maven环境:

      $ curl https://flink.apache.org/q/quickstart.sh | bash -s 1.6.1
      
    3. 下载后的项目架构是这样的:

      $ tree quickstart/
      quickstart/
      ├── pom.xml
      └── src
          └── main
              ├── java
              │   └── org
              │       └── myorg
              │           └── quickstart
              │               ├── BatchJob.java
              │               └── StreamingJob.java
              └── resources
                  └── log4j.properties
      
    4. 将源码导入IDE中。打开IDE,点击“open”,将项目导进去,此时需要加载对应的maven依赖。等待项目构建完毕后,删除BatchJob.java和StreamingJob.java代码,因为这个两个代码只是提供了一个模板,并没有实质性的代码。

    5. 打开github网站,找到flink项目的example代码,将batch和streaming代码粘贴到项目中。

    01.png
    02.png
    03.png

    粘贴后的代码如下,运行程序后打印出单词统计的结果:

    04.png

    3. Batch代码解读:

    如下贴出源代码,重要的代码部分参考注释:

    public class WordCount {
    
        public static void main(String[] args) throws Exception {
    
            final ParameterTool params = ParameterTool.fromArgs(args);
    
            //创建执行环境,这里的环境可以是本地环境,也可以是集群环境.
            // 用户可以调用createLocalEnvironment()运行本地环境 也可以调用createRemoteEnvironment()运行集群环境
            //getExecutionEnvironment()可以动态获取环境
            final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
            // ParameterTool就是对参数的一个封装类,这里将传递进来的参数作为全局参数
            env.getConfig().setGlobalJobParameters(params);
    
            // get input data
            DataSet<String> text;
            if (params.has("input")) {
                //如果运行程序携带input参数,则从input参数后面获取文件地址,从而读取文件的内容
                text = env.readTextFile(params.get("input"));
            } else {
                //如果不携带参数,则从WordCountData类读取每一行数据并进行分词统计
                System.out.println("Executing WordCount example with default input data set.");
                System.out.println("Use --input to specify file input.");
                text = WordCountData.getDefaultTextLineDataSet(env);
            }
    
            DataSet<Tuple2<String, Integer>> counts =
                    // split up the lines in pairs (2-tuples) containing: (word,1)
                    // 这里将一行数据转换成 (letter,1) (lette,1)(letter,1) (...)
                    text.flatMap(new Tokenizer())
                    // 将tuple(letter,1) 的第0个元素 进行groupby,再对相同的letter的count进行累加。
                    .groupBy(0)
                    .sum(1);
            //上面程序得到的结果是一个队列,该队列由一个个的Tuple2<String, Integer>组成。
    
            // emit result
            if (params.has("output")) {
                //如果有output参数,则将output提供的路径作为文件夹,将结果写入到该文件夹下
                counts.writeAsCsv(params.get("output"), "\n", " ");
                //开始执行程序
                env.execute("WordCount Example");
            } else {
                //执行程序,将结果打印出来
                System.out.println("Printing result to stdout. Use --output to specify output path.");
                counts.print();
            }
        }
    
        public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
                // normalize and split the line
                String[] tokens = value.toLowerCase().split("\\W+");
                // Tuple2 就是将2个元素作为一个元组, 如果将3个元素作为一个元祖,则需要使用Tuple3
                for (String token : tokens) {
                    if (token.length() > 0) {
                        out.collect(new Tuple2<>(token, 1));
                    }
                }
            }
        }
    }
    

    4. Streaming代码解读:

    Streaming代码与batch代码并没有多大的不同。不同点参考注释:

    public class WordCount {
    
        public static void main(String[] args) throws Exception {
    
            // Checking input parameters
            final ParameterTool params = ParameterTool.fromArgs(args);
    
            // 不同于批量处理,实时流处理的环境代码前面添加了Stream标识。
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // make parameters available in the web interface
            env.getConfig().setGlobalJobParameters(params);
    
            // get input data
            DataStream<String> text;
            if (params.has("input")) {
                // read the text file from given input path
                text = env.readTextFile(params.get("input"));
            } else {
                System.out.println("Executing WordCount example with default input data set.");
                System.out.println("Use --input to specify file input.");
                // get default test text data
                text = env.fromElements(WordCountData.WORDS);
            }
    
            DataStream<Tuple2<String, Integer>> counts =
                // split up the lines in pairs (2-tuples) containing: (word,1)
                text.flatMap(new Tokenizer())
                // 这里的统计采用的是keyBy而非groupBy,因为实时流数据正常的逻辑是实时产生,它是无边数据
                .keyBy(0).sum(1);
    
            // emit result
            if (params.has("output")) {
                counts.writeAsText(params.get("output"));
            } else {
                System.out.println("Printing result to stdout. Use --output to specify output path.");
                counts.print();
            }
            //开始执行程序
            env.execute("Streaming WordCount");
        }
    
        public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
                // normalize and split the line
                String[] tokens = value.toLowerCase().split("\\W+");
                // emit the pairs
                for (String token : tokens) {
                    if (token.length() > 0) {
                        out.collect(new Tuple2<>(token, 1));
                    }
                }
            }
        }
    }
    

    总结:

    1. 不管是离线处理还是实时处理,都离不开三板斧,1-获取当前运行环境,2-通过算子进行数据分析,3-将结果输出到某一端。
    2. 算子包括了map() , flatMap(), filter(), keyBy( ), sum( ) ...
    3. 当前案例的批量处理的结果是将数据一次性输出,实时流数据是对每一行数据进行逐一的分析和输出。
    4. 除了Batch / Streaming 代码,官方还提供了 SQL 代码的案例。有兴趣的同学可以下载来试试。

    想获取更多技术干货,请前往叩丁狼官网:http://www.wolfcode.cn/all_article.html

    相关文章

      网友评论

          本文标题:Flink官方入门项目简介

          本文链接:https://www.haomeiwen.com/subject/zfepeqtx.html