Scala
因为Scala是基于Java虚拟机的,所有Scala代码都需要编译为字节码,然后在JVM上运行,所以Scala和Java是可以无缝操作的,Scala可以调用任意Java代码。
Map
val map = Map(("a", 1), ("b", 2))
for ((key, value) <- map) {
println("key:" + key + ", value:" + value)
}
Tuple
val t = ("a", 1, 2)
println(t._3)
函数式编程
将函数赋值给变量
def main(a: Array[String]) = {
val addFun = add _
println(addFun(1, 2))
}
def add(a: Int, b: Int): Int = {
a + b
}
匿名函数
val addFun = (a:Int, b: Int) => {a + b}
println(addFun(1, 2))
高阶函数
接收其它函数作为函数的参数
Spark
Spark基本工作原理
- 分布式
客户端(在本地编写Spark程序的机器)将Spark程序提交到Spark集群,Spark集群一般从HDFS上读取数据,然后分布式存放在集群的各个节点上。 - 主要基于内存
- 迭代式计算
节点1、2并行计算,计算完成后可能将结果数据交给3、4节点再接着计算,最后处理的结果可以写入到MySQL中或直接返回客户端。
Spark和Hadoop最大的不同在于迭代式计算,MapReduce分为两个阶段,map和reduce,两个阶段完了就结束了,所以一个job只能在map和reduce中做。而Spark的计算可以分为n个阶段,因为它是内存迭代式的,处理完一个阶段后可以继续往下处理很多阶段,而不仅仅是2个阶段,所以Spark可以提供更强大的功能。
注:所有计算操作都是针对多个节点上的数据进行并行计算的。
RDD(弹性分布式数据集)
1、是一个数据集合,分为多个分区,每个分区分布在集群中的不同节点上,从而让RDD可以被并行操作;
2、RDD最重要的特性就是提供了容错性,可以自动从节点失败中恢复。即假如某个节点上的RDD partition因为节点故障导致数据丢失,那么RDD会自动通过自己的数据来源重新计算该partition,这一切对使用者是透明的;
3、RDD的数据默认是放在内存中的,但在内存资源不足时,Spark会自动将RDD数据写入磁盘(弹性),这一切对于用户是透明的;
Spark的核心编程时什么?
1、定义RDD,即RDD是从哪里读取数据来创建RDD,比如hdfs、mysql、hive等;
2、定义对RDD的计算操作,这个叫做算子,比如map、reduce、flatMap;
3、循环往复的过程;
什么是Spark开发?
1、核心开发:离线批处理/延迟性的交互式数据处理;
2、SQL查询:底层都是RDD和计算操作;
3、实时计算:底层都是RDD和计算操作;
Scala开发WordCount程序
val lines: RDD[String] = sc.textFile("E:/a.txt")
val wordsRDD: RDD[String] = lines.flatMap(line => line.split(" "))
val tupleWordsRDD: RDD[(String, Int)] = wordsRDD.map(word => (word, 1))
val countRDD: RDD[(String, Int)] = tupleWordsRDD.reduceByKey((x, y) => x + y)
countRDD.foreach(count => {
print(count._1)
print(count._2)
println()
})
WordCount程序原理
创建RDD
- 使用集合创建
主要用于测试;
/* 配置Spark */
val conf: SparkConf = new SparkConf()
.setAppName("WordCountMain")
.setMaster("local") //local代表本地单线程模式运行
val sc: SparkContext = new SparkContext(conf)
val list: List[Int] = List(1, 2, 3)
val rdd: RDD[Int] = sc.makeRDD(list)
rdd.foreach(x => println(x))
注:makeRDD方法的第二个参数可以只能将RDD切分成多少个partition,Spark会为每一个partition运行一个task来进行处理。官方建议是,为集群中每隔CPU创建2~4个partition。Spark默认会根据集群的情况来设置partition的数量。
-
使用本地文件创建
主要用于临时性地处理一些存储了大量数据的文件; -
使用hdfs创建
应该是最常用的生产环境处理方式;
map算子
val nums: List[Int] = List(1, 2, 3)
val numsRDD: RDD[Int] = sc.makeRDD(nums)
/* 使用map算子将集合中的每个元素都乘2 */
val newNumsRDD: RDD[Int] = numsRDD.map(num => num * 2)
newNumsRDD.foreach(num => println(num))
val lines: List[String] = List("hello spark", "hello scala")
val linesRDD: RDD[String] = sc.makeRDD(lines)
val words: RDD[Array[String]] = linesRDD.map(line => line.split(" "))
words.foreach(arr => {
arr.foreach(word => println(word))
})
filter算子
val nums: List[Int] = List(1, 2, 3, 4)
val numsRDD: RDD[Int] = sc.makeRDD(nums)
val evenNumsRDD: RDD[Int] = numsRDD.filter(num => num % 2== 0)
evenNumsRDD.foreach(num => println(num)) // 2 4
val lines: List[String] = List("hello", "world", "hi")
val linesRDD: RDD[String] = sc.makeRDD(lines)
val hasHWords: RDD[String] = linesRDD.filter(word => word.contains("h"))
hasHWords.foreach(word => println(word)) //hello hi
flatMap算子
val lines: List[String] = List("hello spark", "hello scala")
val linesRDD: RDD[String] = sc.makeRDD(lines)
val wordsRDD: RDD[String] = linesRDD.flatMap(line => line.split(" "))
wordsRDD.foreach(word => println(word))
注意区别:flatMap是把数组里面的数据拆分到最小的粒度后返回
groupByKey算子
val scores: List[(String, Int)] = List(("cai", 80), ("cai", 70), ("bing", 90))
val scoresRDD: RDD[(String, Int)] = sc.makeRDD(scores)
val scoreGrouped: RDD[(String, Iterable[Int])] = scoresRDD.groupByKey()
scoreGrouped.foreach(score => {
print(score._1 + "=")
score._2.foreach(score => print(score + " "))
println()
})
/*
cai=80 70
bing=90
*/
reduceByKey算子
val scores: List[(String, Int)] = List(("cai", 80), ("cai", 70), ("bing", 90))
val scoresRDD: RDD[(String, Int)] = sc.makeRDD(scores)
val scoreReduecd: RDD[(String, Int)] = scoresRDD.reduceByKey((x, y) => x + y)
scoreReduecd.foreach(score => {
print(score._1 + "=")
println(score._2)
})
/*
cai=150
bing=90
*/
注:reduceByKey的作用对像是(key, value)形式的rdd,而reduce有减少、压缩之意,reduceByKey的作用就是对相同key的数据进行处理,最终每个key只保留一条记录。
保留一条记录通常有两种结果。一种是只保留我们希望的信息,比如每个key出现的次数。第二种是把value聚合在一起形成列表,这样后续可以对value做进一步的操作,比如排序。
reduceByKey会寻找相同key的数据,当找到这样的两条记录时会对其value(分别记为x,y)做(x,y) => x+y的处理,即只保留求和之后的数据作为value。反复执行这个操作直至每个key只留下一条记录。
sortByKey算子
val scores: List[(String, Int)] = List(("cai", 80), ("cai", 70), ("bing", 90))
val scoresRDD: RDD[(String, Int)] = sc.makeRDD(scores)
val scoreSortedRDD: RDD[(String, Int)] = scoresRDD.sortByKey()
scoreSortedRDD.foreach(score => {
print(score._1)
print(score._2)
println()
})
/*
bing90
cai80
cai70
*/
join算子
val stuList: List[(Int, String)] = List((1, "cai"), (2, "zhong"), (3, "bing"))
val scoreList: List[(Int, Int)] = List((1, 70), (2, 80), (3, 90))
val stuRDD: RDD[(Int, String)] = sc.makeRDD(stuList)
val scoreRDD: RDD[(Int, Int)] = sc.makeRDD(scoreList)
val joinedRDD: RDD[(Int, (String, Int))] = stuRDD.join(scoreRDD)
joinedRDD.foreach(stu => {
println("学号:" + stu._1)
println("姓名:" + stu._2._1)
println("成绩:" + stu._2._2)
})
/*
学号:1
姓名:cai
成绩:70
学号:3
姓名:bing
成绩:90
学号:2
姓名:zhong
成绩:80
*/
action操作案例
reduce
val nums: Array[Int] = Array(1, 2, 3)
val numsRDD: RDD[Int] = sc.makeRDD(nums)
val sum: Int = numsRDD.reduce((x, y) => x + y)
println(sum) //6
网友评论