美文网首页
Spark实验:SparkWordCount第一个例子

Spark实验:SparkWordCount第一个例子

作者: 邵奈一 | 来源:发表于2018-10-31 17:55 被阅读21次

    一 实验目的

    通过本实验,了解Scala语言的特点、理解Scala与Java联系、熟悉Scala与Java的语法区别,能够编写SparkWordCount程序,并学会用Spark-shell与编辑器两种方式执行程序。

    二 实验内容

    本实验概述了Scala语言的特点并多角度比较Java与Scala的语法特点,包括HelloWorld代码、构造函数(属性设置)、WordCount代码例子,此外,还加入了Java8版本新特性Lamda表达式实现WordCount,以供参考比较学习,最后用Spark-shell提交与编辑器两种方式实现代码的提交与执行。

    三 实验要求

    以小组为单元进行实验,每小组5人,小组自主协商选一位组长,由组长安排和分配实验任务,做实验前应确保Spark集群部署正确。

    四 准备知识

    4.1 Scala简介

    Scala是一门多范式的编程语言,运行在Java虚拟机(JVM)上,Scala 源代码被编译成Java字节码,可以轻松实现和丰富的 Java 类库互联互通,具有面向对象(OO)以及函数式编程(FP)的特性。它具备动态语言的灵活简洁,同时又保留了静态类型带来的安全保障和执行效率。强大的抽象能力,使Scala不仅能处理脚本化的临时任务,还能处理高并发场景下的分布式互联网大数据应用。

    4.2 Scala与Java的语法比较

    4.2.1 HelloWorld的比较

    【1】Java版本HelloWorld:

    public class HelloWorld {
        public static void main(String[] args) {
          System.out.println("Hello World!");
     }
    }   
    

    【2】Scala版本HelloWorld:

    object HelloWorld {
        def main(args: Array[String]): Unit = {
          println("Hello World!")
     }
    }  
    

    4.2.2 构造函数的比较

    【1】Java版本构造函数:

    class Person {
        private int age;
        private String name;
        public Person(int age, String name) {
          this.age = age;
          this.name = name;
     }
    }
    

    【2】Scala版本构造函数:

    class Person(age: Int, name: String)
    

    4.2.3 WordCount例子比较Java与Scala语法

    【1】Java版本WordCount实现:

    public class WordCount {
    
     public static void main(String[] args) {
    
     SparkConf conf = new SparkConf().setAppName("WordCount")
    
     JavaSparkContext sc = new JavaSparkContext(conf);
    
     JavaRDD<String> lines = sc.textFile("hdfs://master-30405-30406-30407-h81vl:8020/wordcount.txt");
    
     JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
    
     private static final long serialVersionUID = 1L;
    
     public Iterable<String> call(String line) throws Exception {
    
     return Arrays.asList(line.split(" "));
    
     }
    
     });
    
     JavaPairRDD<String, Integer> pairs = words.mapToPair(
    
     new PairFunction<String, String, Integer>() {
    
     private static final long serialVersionUID = 1L;
    
     public Tuple2<String, Integer> call(String word) throws Exception {
    
     return new Tuple2<String, Integer>(word, 1);
    
     }
    
     });
    
     JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(
    
     new Function2<Integer, Integer, Integer>() {
    
     private static final long serialVersionUID = 1L;
    
     public Integer call(Integer v1, Integer v2) throws Exception {
    
     return v1 + v2;
    
     }
    
     });
    
     wordCounts.foreach(new VoidFunction<Tuple2<String,Integer>>() {
    
     private static final long serialVersionUID = 1L;
    
     public void call(Tuple2<String, Integer> wordCount) throws Exception {
    
     System.out.println(wordCount._1 + "出现了" + wordCount._2 + "次");   
    
     }
    
     });
    
     sc.close();
    
     }
    
    }
    

    【2】Scala版本WordCount实现:

    object WordCount {
    
     def main(args: Array[String]) {
    
     val conf = new SparkConf().setAppName("WordCount");
    
     val sc = new SparkContext(conf)
    
     val lines = sc.textFile("hdfs://master-30405-30406-30407-h81vl:8020/wordcount.txt");
    
     val words = lines.flatMap { line => line.split(" ") }
    
     val pairs = words.map { word => (word, 1) }
    
     val wordCounts = pairs.reduceByKey { _ + _ }
    
     wordCounts.foreach(wordCount => println(wordCount._1 + "出现了" + wordCount._2 + "次")) 
    
     }
    
    }
    

    主要语法解释:

    word => (word, 1)等同于java版本的匿名内部类中,传入一个String参数,返回一个为<String,Integer>类型的对象

    _ + _此处等同于java版本中,传入两个Integer参数,合成一个Integer类型的对象

    Java8新特性的Lamda表达式的实现比较:

    public class WordCount {
    
     public static void main(String[] args) {
    
     SparkConf conf = new SparkConf().setAppName("WordCount");
    
     JavaSparkContext sc = new JavaSparkContext(conf);
    
     JavaRDD<String> lines= sc.textFile("hdfs://master-30405-30406-30407-h81vl:8020/wordcount.txt");
    
     JavaRDD<String> words = lines.flatMap(
    
     line -> Arrays.asList(line.split(" ")).iterator());
    
     JavaPairRDD<String, Integer> counts = words
    
     .mapToPair(word -> new Tuple2<String, Integer>(word, 1))
    
     .reduceByKey((x, y)-> x+y);
    
     counts.foreach(wordCount -> System.out.println(wordCount._1() + ":" + wordCount._2()));
    
     spark.stop();
    
     }
    
    }
    

    五 实验步骤

    5.1 Spark-shell方式

    5.1.1 准备测试文件wordcount.txt

    前提:确保HDFS已启动,用下面指令查看数据是否存在,如不存在则参考实验19 自行建立并上传至HDFS。

    hadoop fs -cat /wordcount.txt
    [图片上传失败...(image-c59fc0-1540979695732)]

    5.1.2 启动Spark-shell(需配置好环境变量)

    spark-shell --master spark://master-30405-30406-30407-h81vl:7077

    5.1.3 执行程序(可三步一起执行,也可以分开执行)

    val file=sc.textFile("hdfs://master-30405-30406-30407-h81vl:8020/wordcount.txt")
    
    val count=file.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)
    
    count.collect()               
    

    说明:sc为Spark-shell默认的SparkContext对象。Spark程序必须做的第一件事是创建一个SparkContext对象,该对象告诉Spark如何访问集群。要创建SparkContext,您首先需要构建一个包含有关应用程序信息的SparkConf对象。每个JVM只有一个SparkContext可能是活动的。 在创建新的SparkContext之前,必须先停止活动状态的SparkContext。

    [图片上传失败...(image-10070f-1540979695732)]

    5.1.4 执行结果

    [图片上传失败...(image-6e0b0d-1540979695732)]

    5.2 编辑器方式(IDEA编辑器)

    5.2.1 将4.2.3相应的代码拷贝到IDEA编辑器(Eclipse、IDEA等)

    5.2.2 打包用xftp上传到master执行即可(参考附加实验:项目打包)

    六 总结

    本实验介绍了Spark的第一个例子,应仔细分析比较实验提供的Java与Scala例子,做到融会贯通,提供的Java8新特性Lamda表达式例子可供以后学习参考,注意自己所安装的Java版本。还应学会去比较Spark-shell与Spark-submit两种方式执行代码的不通。本实验承上启下,非常重要,必须多实操几遍。

    相关文章

      网友评论

          本文标题:Spark实验:SparkWordCount第一个例子

          本文链接:https://www.haomeiwen.com/subject/sozatqtx.html