传送门
Spark实战系列之一--Spark是什么
Spark实战系列之二--什么是RDD以及RDD的常用API
Spark实战系列之三--RDD编程基础上
Spark实战系列之四--RDD编程基础下
Spark实战系列之五--键值对RDD
Spark实战系列之六--数据读写
Spark实战系列之七--综合案例
Spark基础系列之八--Spark SQL是什么
Spark基础系列之九--使用Spark SQL读写数据库
传送门
前言
在Spark中,RDD采用惰性求值的机制,每次遇到行动操作,都会从头开始执行计算。每次调用行动操作,都会触发一次从头开始的计算。这对于迭代计算而言,代价是很大的,迭代计算经常需要多次重复使用同一组数据。
三、持久化
- 可以通过持久化(缓存)机制避免这种重复计算的开销
- 可以使用persist()方法对一个RDD标记为持久化
- 之所以说“标记为持久化”,是因为出现persist()语句的地方,并不会马上计算生成RDD并把它持久化,而是要等到遇到第一个行动操作触发真正的计算后,才会把计算结果进行持久化
- 持久化后的RDD将会被保留在计算节点的内存中被后面的行动操作重复使用
1)针对上面的示例,增加持久化语句以后的执行过程如下:
四、分区
RDD是弹性分布式数据集,通常RDD很大,会被分成很多个分区,分别保存在不同的节点上
1)分区的作用
- 增加并行度
- 减少通信开销
2)RDD分区原则
- RDD分区的一个原则是使得分区的个数尽量对于集群中的CPU核心(core)数目
3)设置分区的个数
- 3.1)在调用textFile()和parallelize()方法的时候手动指定分区个数即可,语法格式如下:
sc.textFile(path,partitionNum)
val array =Array(1,2,3,4,5)
val rdd = sc.parallelize(array,2) //设置两个分区
- 3.2)使用reparition方法重新设置分区个数
通过转换操作得到新RDD时,直接调用repartition方法即可。例如:
val data = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt",2)
data.partitions.size //显示data这个RDD的分区数量,结果为2
val rdd = data.repartition(1)
rdd.partitions.size //显示data这个RDD的分区数量,结果为1
4)自定义分区方法
五、练习
给定文件如下:包含序号,性别和身高
1 F 170
2 M 178
3 M 174
4 F 165
统计
1)男生总数、女生总数
2)男生最高和最低的身高、
3)女生最高和最低的身高。
import org.apache.spark.{SparkConf, SparkContext}
//1.统计男生总数,女生总数 2.男生最高和最低的身高 3.女生最高和最低的身高
object Count{
def main(args:Array[String]){
val input = "file:///usr/local/spark/mycode/wordcount/Count.txt"
val conf = new SparkConf().setAppName("Count").setMaster("local")
val sc = new SparkContext(conf)
val textFile = sc.textFile(input)
//1.统计男生总数,女生总数
val all_boy_count = textFile.flatMap(line => line.split(" ")).filter(line => line.contains("M")).map(word => (word,1)).reduceByKey(_+_)
all_boy_count.foreach{case(k,v) => println("男生的总数: " + v)}
val all_girl_count = textFile.flatMap(line => line.split(" ")).filter(line => line.contains("F")).map(word => (word,1)).reduceByKey((x,y) => x+y)
all_girl_count.foreach{case(k,v) => println("女生的总数: "+ v)}
//2.男生最高和最低的身高
val boy_highest = textFile.map(x => x.split(" ")).map(line => {
val gender = line(1)
val height = line(2)
(gender,height)
}).filter(x => x._1=="M")
println("男生最高的身高: " + boy_highest.map(x => x._2).max)
println("男生最低的身高: " + boy_highest.map(x => x._2).min)
//3.女生最高和最低的身高
val girl_highest = textFile.map(x => x.split(" ")).map(line => {
val gender = line(1)
val height = line(2)
(gender,height)
}).filter(x => x._1=="F")
println("女生最高的身高: " + girl_highest.map(x => x._2).max)
println("女生最低的身高: " + girl_highest.map(x => x._2).min)
}
}
网友评论