一 实验目的
通过本实验,了解Scala语言的特点、理解Scala与Java联系、熟悉Scala与Java的语法区别,能够编写SparkWordCount程序,并学会用Spark-shell与编辑器两种方式执行程序。
二 实验内容
本实验概述了Scala语言的特点并多角度比较Java与Scala的语法特点,包括HelloWorld代码、构造函数(属性设置)、WordCount代码例子,此外,还加入了Java8版本新特性Lamda表达式实现WordCount,以供参考比较学习,最后用Spark-shell提交与编辑器两种方式实现代码的提交与执行。
三 实验要求
以小组为单元进行实验,每小组5人,小组自主协商选一位组长,由组长安排和分配实验任务,做实验前应确保Spark集群部署正确。
四 准备知识
4.1 Scala简介
Scala是一门多范式的编程语言,运行在Java虚拟机(JVM)上,Scala 源代码被编译成Java字节码,可以轻松实现和丰富的 Java 类库互联互通,具有面向对象(OO)以及函数式编程(FP)的特性。它具备动态语言的灵活简洁,同时又保留了静态类型带来的安全保障和执行效率。强大的抽象能力,使Scala不仅能处理脚本化的临时任务,还能处理高并发场景下的分布式互联网大数据应用。
4.2 Scala与Java的语法比较
4.2.1 HelloWorld的比较
【1】Java版本HelloWorld:
public class HelloWorld {
public static void main(String[] args) {
System.out.println("Hello World!");
}
}
【2】Scala版本HelloWorld:
object HelloWorld {
def main(args: Array[String]): Unit = {
println("Hello World!")
}
}
4.2.2 构造函数的比较
【1】Java版本构造函数:
class Person {
private int age;
private String name;
public Person(int age, String name) {
this.age = age;
this.name = name;
}
}
【2】Scala版本构造函数:
class Person(age: Int, name: String)
4.2.3 WordCount例子比较Java与Scala语法
【1】Java版本WordCount实现:
public class WordCount {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("WordCount")
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile("hdfs://master-30405-30406-30407-h81vl:8020/wordcount.txt");
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
private static final long serialVersionUID = 1L;
public Iterable<String> call(String line) throws Exception {
return Arrays.asList(line.split(" "));
}
});
JavaPairRDD<String, Integer> pairs = words.mapToPair(
new PairFunction<String, String, Integer>() {
private static final long serialVersionUID = 1L;
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<String, Integer>(word, 1);
}
});
JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(
new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
wordCounts.foreach(new VoidFunction<Tuple2<String,Integer>>() {
private static final long serialVersionUID = 1L;
public void call(Tuple2<String, Integer> wordCount) throws Exception {
System.out.println(wordCount._1 + "出现了" + wordCount._2 + "次");
}
});
sc.close();
}
}
【2】Scala版本WordCount实现:
object WordCount {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("WordCount");
val sc = new SparkContext(conf)
val lines = sc.textFile("hdfs://master-30405-30406-30407-h81vl:8020/wordcount.txt");
val words = lines.flatMap { line => line.split(" ") }
val pairs = words.map { word => (word, 1) }
val wordCounts = pairs.reduceByKey { _ + _ }
wordCounts.foreach(wordCount => println(wordCount._1 + "出现了" + wordCount._2 + "次"))
}
}
主要语法解释:
word => (word, 1)等同于java版本的匿名内部类中,传入一个String参数,返回一个为<String,Integer>类型的对象
_ + _此处等同于java版本中,传入两个Integer参数,合成一个Integer类型的对象
Java8新特性的Lamda表达式的实现比较:
public class WordCount {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("WordCount");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines= sc.textFile("hdfs://master-30405-30406-30407-h81vl:8020/wordcount.txt");
JavaRDD<String> words = lines.flatMap(
line -> Arrays.asList(line.split(" ")).iterator());
JavaPairRDD<String, Integer> counts = words
.mapToPair(word -> new Tuple2<String, Integer>(word, 1))
.reduceByKey((x, y)-> x+y);
counts.foreach(wordCount -> System.out.println(wordCount._1() + ":" + wordCount._2()));
spark.stop();
}
}
五 实验步骤
5.1 Spark-shell方式
5.1.1 准备测试文件wordcount.txt
前提:确保HDFS已启动,用下面指令查看数据是否存在,如不存在则参考实验19 自行建立并上传至HDFS。
hadoop fs -cat /wordcount.txt
[图片上传失败...(image-c59fc0-1540979695732)]
5.1.2 启动Spark-shell(需配置好环境变量)
spark-shell --master spark://master-30405-30406-30407-h81vl:7077
5.1.3 执行程序(可三步一起执行,也可以分开执行)
val file=sc.textFile("hdfs://master-30405-30406-30407-h81vl:8020/wordcount.txt")
val count=file.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)
count.collect()
说明:sc为Spark-shell默认的SparkContext对象。Spark程序必须做的第一件事是创建一个SparkContext对象,该对象告诉Spark如何访问集群。要创建SparkContext,您首先需要构建一个包含有关应用程序信息的SparkConf对象。每个JVM只有一个SparkContext可能是活动的。 在创建新的SparkContext之前,必须先停止活动状态的SparkContext。
[图片上传失败...(image-10070f-1540979695732)]
5.1.4 执行结果
[图片上传失败...(image-6e0b0d-1540979695732)]
5.2 编辑器方式(IDEA编辑器)
5.2.1 将4.2.3相应的代码拷贝到IDEA编辑器(Eclipse、IDEA等)
5.2.2 打包用xftp上传到master执行即可(参考附加实验:项目打包)
六 总结
本实验介绍了Spark的第一个例子,应仔细分析比较实验提供的Java与Scala例子,做到融会贯通,提供的Java8新特性Lamda表达式例子可供以后学习参考,注意自己所安装的Java版本。还应学会去比较Spark-shell与Spark-submit两种方式执行代码的不通。本实验承上启下,非常重要,必须多实操几遍。
网友评论