美文网首页
大数据HelloWorld-Flink实现WordCount

大数据HelloWorld-Flink实现WordCount

作者: 指尖数虫 | 来源:发表于2019-08-12 09:04 被阅读0次

    所有的语言开篇都是Hello Word,数据处理引擎也有Hello Word。那就是Word Count。MR,Spark,Flink以来开篇第一个程序都是Word Count。那么今天Flink开始目标就是在本地调试出Word Count。

    [图片上传失败...(image-b74b22-1565571853767)]

    单机安装Flink

    开始Flink之前先在本机尝试安装一下Flink,当然FLink正常情况下是部署的集群方式。作者比较穷,机器配置太低开不了几个虚拟机。所以只能先演示个单机的安装。
    Apache Flink需要在Java1.8+以上的环境中运行
    所以,先确保自己的JDK版本是1.8包含以上的。

    image Flink单机部署非常简单,只需安装下载安装即可。 如果需要与Hadoop版本结合,那么下载相应的Hadoop关联版本即可。如果不与Hadoop结合就直接下载Scala版即可。我这里就直接下载了Scala2.11的相关版本。 image

    点击进入Apache页面进行下载,大小约有283MB。

    把下载下来的压缩包进行解压即可。


    image

    打开命令行直接执行 /bin/start-cluster.bat 进行启动。

    image

    浏览器打开 http://localhost:8081

    image

    至此在Windows10环境下即完成Flink的启动。

    编写WordCount

    因为Flink是由Scala进行开发的,而Scala是基于JVM的一种语言。所以最终也会转换为JAVA字节码文件,所以Flink程序可以由Java、Scala两种语言都可以进行开发。也可以同时开发。比如Java写一部分代码,Scala写另一部分代码。可以参考<Apache Flink利用Maven对Scala与Java进行混编>。

    Flink官方提供快速生成工程的两种工具:SBT与Maven。由于作者比较熟悉Maven,(或者说没用过SBT)。所以直接使用Maven快速创建一个工程。

    Java版本

    mvn archetype:generate                               \
          -DarchetypeGroupId=org.apache.flink              \
          -DarchetypeArtifactId=flink-quickstart-java      \
          -DarchetypeVersion=1.8.0
    
    

    Java版本

    mvn archetype:generate                               \
          -DarchetypeGroupId=org.apache.flink              \
          -DarchetypeArtifactId=flink-quickstart-scala     \
          -DarchetypeVersion=1.8.0
    
    

    按照提示输入相关信息,即可生成最终的项目。

    ├── pom.xml
    └── src
        └── main
            ├── resources
            │   └── log4j.properties
            └── scala/java
                └── org
                    └── myorg
                        └── quickstart
                            ├── BatchJob.scala
                            └── StreamingJob.scala
    
    

    把工程导入到IDEA中
    如果使用Scala的话,那么需要安装Scala的插件。搜索安装同时需要把Scala语言包进行安装。不知道如何操作可以联系我 微信公号<指尖数虫>。

    package jar;
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.common.functions.ReduceFunction;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.operators.DataSource;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.util.Collector;
    
    public class BatchJob {
    
        public static void main(String[] args) throws Exception {
            // set up the batch execution environment
            final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            //读取目录下的文件
            DataSource<String> data = env.readTextFile("/opt/Server_Packets/log/ServerLog_1_runtime.log");
            //把文件中的内容按照空格进行拆分为 word,1    1 是为了能够在下面进行计算.
            data.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                    for (String word : s.split(" ")){
                        collector.collect(new Tuple2<>(word,1));
                    }
                }
            })
            // 按照元组中的第1位进行分组
            .groupBy(0)
            // 分组的元组的计算方式为  value +value  也就是刚才的 同样的词 把 1+1
            .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t2, Tuple2<String, Integer> t1) throws Exception {
                    return new Tuple2<>(t1.f0,t1.f1+ t2.f1);
                }
            })
            //输出结果
            .print();
        }
    }
    

    相关文章

      网友评论

          本文标题:大数据HelloWorld-Flink实现WordCount

          本文链接:https://www.haomeiwen.com/subject/nfhyjctx.html