美文网首页
Flink基础系列8-Flink on yarn运行wordco

Flink基础系列8-Flink on yarn运行wordco

作者: 只是甲 | 来源:发表于2021-10-15 10:27 被阅读0次

    环境介绍

    测试服务器CDH 6.3.1版本安装Flink 1.9版本。

    hello.txt文件

    hello word
    hello hdfs
    hello mapreduce
    hello yarn
    hello hive
    hello spark
    hello flink
    

    一.Maven配置

    Flink依赖的配置

    <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-java</artifactId>
          <version>1.9.0</version>
          <scope>provided</scope>
        </dependency>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-java_2.11</artifactId>
          <version>1.9.0</version>
          <scope>provided</scope>
        </dependency>
    

    因为是本地写Java代码,要打包成jar文件,然后上传到服务器后运行,要设置主入口,不然会报错

    <plugin>
              <groupId>org.apache.maven.plugins</groupId>
              <artifactId>maven-jar-plugin</artifactId>
              <version>3.0.2</version>
              <configuration>
                <archive>
                  <manifest>
                    <addClasspath>true</addClasspath>
                    <mainClass>org.example.wordCount</mainClass> <!-- 此处为主入口-->
                  </manifest>
                </archive>
              </configuration>
            </plugin>
            <plugin>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
    

    其中 org.example.wordCount 需要自己调整
    org.example 是包名
    wordCount 是类名

    二.Java代码编写

    如下:

    package org.example;
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.java.DataSet;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.operators.DataSource;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.util.Collector;
    
    /*
      @author  只是甲
     * @date    2021-08-24
     * @remark  Flink的第一个wordCount程序
     */
    
    public class wordCount {
        public static void main(String[] args) throws Exception{
            //创建执行环境
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
            //从文件中读取数据
            String inputPath = "hdfs://hp1:8020/user/hive/warehouse/hello.txt";
            DataSet<String> inputDataSet = env.readTextFile(inputPath);
    
            // 对数据集进行处理,按空格分词展开,转换成(word, 1)二元组进行统计
            DataSet<Tuple2<String, Integer>> resultSet = inputDataSet.flatMap(new MyFlatMapper())
                    .groupBy(0) // 按照第一个位置的word分组
                    .sum(1); // 将第二个位置上的数据求和;
    
            resultSet.print();
    
            //env.execute();
    
            //env.execute("Word Count Example");
        }
    
        //自定义类,实现FlatMapFunction接口
        public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>> {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                //按空格分词
                String[] words = value.split(" ");
                //遍历所有word,包成二元组输出
                for (String word : words) {
                    out.collect(new Tuple2<String, Integer>(word, 1));
                }
    
            }
        }
    }
    

    三.Maven打包并上传

    通过mvn package命令打包

    C:\Users\Administrator\IdeaProjects\FlinkStudy>mvn package
    [INFO] Scanning for projects...
    [WARNING]
    [WARNING] Some problems were encountered while building the effective model for org.example:FlinkStudy:jar:1.0-SNAPSHOT
    [WARNING] 'build.pluginManagement.plugins.plugin.(groupId:artifactId)' must be unique but found duplicate declaration of plugin org.apache.maven.plugins:maven-compiler-plugin @ line 98, column 17
    [WARNING] 'build.pluginManagement.plugins.plugin.(groupId:artifactId)' must be unique but found duplicate declaration of plugin org.apache.maven.plugins:maven-surefire-plugin @ line 107, column 17
    [WARNING] 'build.pluginManagement.plugins.plugin.(groupId:artifactId)' must be unique but found duplicate declaration of plugin org.apache.maven.plugins:maven-jar-plugin @ line 116, column 17
    [WARNING]
    [WARNING] It is highly recommended to fix these problems because they threaten the stability of your build.
    [WARNING]
    [WARNING] For this reason, future Maven versions might no longer support building such malformed projects.
    [WARNING]
    [INFO]
    [INFO] -----------------------< org.example:FlinkStudy >-----------------------
    [INFO] Building FlinkStudy 1.0-SNAPSHOT
    [INFO] --------------------------------[ jar ]---------------------------------
    [INFO]
    [INFO] --- maven-resources-plugin:3.0.2:resources (default-resources) @ FlinkStudy ---
    [INFO] Using 'UTF-8' encoding to copy filtered resources.
    [INFO] Copying 0 resource
    [INFO]
    [INFO] --- maven-compiler-plugin:3.6.0:compile (default-compile) @ FlinkStudy ---
    [INFO] Changes detected - recompiling the module!
    [INFO] Compiling 2 source files to C:\Users\Administrator\IdeaProjects\FlinkStudy\target\classes
    [INFO]
    [INFO] --- maven-resources-plugin:3.0.2:testResources (default-testResources) @ FlinkStudy ---
    [INFO] Using 'UTF-8' encoding to copy filtered resources.
    [INFO] Copying 0 resource
    [INFO]
    [INFO] --- maven-compiler-plugin:3.6.0:testCompile (default-testCompile) @ FlinkStudy ---
    [INFO] Nothing to compile - all classes are up to date
    [INFO]
    [INFO] --- maven-surefire-plugin:2.19:test (default-test) @ FlinkStudy ---
    [INFO] Tests are skipped.
    [INFO]
    [INFO] --- maven-jar-plugin:3.0.2:jar (default-jar) @ FlinkStudy ---
    [INFO] Building jar: C:\Users\Administrator\IdeaProjects\FlinkStudy\target\FlinkStudy-1.0-SNAPSHOT.jar
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time:  1.848 s
    [INFO] Finished at: 2021-08-25T09:41:03+08:00
    [INFO] ------------------------------------------------------------------------
    
    C:\Users\Administrator\IdeaProjects\FlinkStudy>
    

    然后将生产的FlinkStudy-1.0-SNAPSHOT.jar文件上传到服务器

    四.运行jar文件

    命令:

    flink run  -m yarn-cluster -yn 2 -yjm 1024 -ytm 1024 /home/flink/FlinkStudy-1.0-SNAPSHOT.jar 
    

    运行结果:


    image.png

    Web界面显示执行结果:


    image.png

    五.运行其它的class文件

    虽然我们pom文件指定了main class,如果不指定对应的class,就执行pom文件里面指定的class,如果我们想执行该工程下其它class文件怎么办?这个时候我们需要用 -c 或者--class命令来指定对应的class文件

    代码:

    package org.example;
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.java.DataSet;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.operators.DataSource;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.util.Collector;
    
    /*
      @author  只是甲
     * @date    2021-08-24
     * @remark  Flink的第二个wordCount程序
     */
    
    public class wordCount2 {
        public static void main(String[] args) throws Exception{
            //创建执行环境
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    
            //从文件中读取数据
            String inputPath = "hdfs://hp1:8020/user/hive/warehouse/hello.txt";
            DataSet<String> inputDataSet = env.readTextFile(inputPath);
    
            // 对数据集进行处理,按空格分词展开,转换成(word, 1)二元组进行统计
            DataSet<Tuple2<String, Integer>> resultSet = inputDataSet.flatMap(new MyFlatMapper())
                    .groupBy(0) // 按照第一个位置的word分组
                    .sum(1); // 将第二个位置上的数据求和;
    
            resultSet.print();
    
            //env.execute();
    
            //env.execute("Word Count Example");
    
            System.out.println("这是第二个测试的wordcount");
        }
    
        //自定义类,实现FlatMapFunction接口
        public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>> {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                //按空格分词
                String[] words = value.split(" ");
                //遍历所有word,包成二元组输出
                for (String word : words) {
                    out.collect(new Tuple2<String, Integer>(word, 1));
                }
    
            }
        }
    }
    
    

    运行命令:

    -- 正确
    flink run -m yarn-cluster -c org.example.wordCount2 FlinkStudy-1.0-SNAPSHOT.jar
    -- 正确
    flink run -m yarn-cluster --class org.example.wordCount2 FlinkStudy-1.0-SNAPSHOT.jar
    --错误(依旧执行pom文件里面的main class)
    flink run -m yarn-cluster FlinkStudy-1.0-SNAPSHOT.jar -c org.example.wordCount2 
    

    执行截图:


    image.png

    参考

    1. https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/projectsetup/dependencies.html

    相关文章

      网友评论

          本文标题:Flink基础系列8-Flink on yarn运行wordco

          本文链接:https://www.haomeiwen.com/subject/eirsiltx.html