6.Spark RDD编程
在代码实例中,如果前缀为"scala>",则表示在Scala解释器中执行,那么对应的在实际代码中写的话,只需要将前缀去掉.
(1).RDD的创建
我们进行的map、flatmap、group等操作,都是针对RDD进行操作的,然后不断的转换操作。
-
Spark采用textFile()方法从文件系统中加载数据创建RDD
- 本地文件系统的地址
// sc是解释器中已经初始化好的SparkContext对象 scala>val lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt") // 使用textFile函数读取文件,解释器返回的数据类型为RDD lines: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/mycode/rdd/word.txt MapPartitionsRDD[12] at textFile at <console>:27
-
或者是分布式文件系统HDFS的地址
// 以下三行语句等价的,在配置好了Hadoop环境的集群中,我推荐使用第二种,因为目录可以在设置,并不一定要在Hadoop的当前用户目录下,所以第三种方式有一定的局限性 scala>val lines = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt") scala>val lines = sc.textFile("/user/hadoop/word.txt") // 提取的是当前用户所在的目录下的word.txt scala>val lines = sc.textFile("word.txt")
-
或者是来自云端的地址,如Amazon S3(亚马逊简易存储服务,是一种在线的存储服务)的地址等等.
-
通过并行集合(数组)创建RDD
我们在进行矩阵运算的时候,从文本文档中读入进来的数据一般是数组
-
从数组进行变换后形成RDD
scala> val array = Array(1,2,3,4,5) // 解释器输出的类型为Array array: Array[Int] = Array(1, 2, 3, 4, 5) scala>val rdd = sc.parallelize(array) // 解释器输出的类型为RDD rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[13] at parallelize at <console>:29
-
从列表进行变换后形成RDD
scala>val list = List(1,2,3,4,5) // 解释器输出的类型为 List list: List[Int] = List(1, 2, 3, 4, 5) scala>val rdd = sc.parallelize(list) // 解释器输出的类型为RDD rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[14] at parallelize at <console>:29
-
(2).RDD算子操作简介
- 1、==transformation(转换)==
- 根据已经存在的rdd转换生成一个新的rdd, 它是延迟加载,它不会立即执行
- 例如
- map / flatMap / reduceByKey 等
- 2、==action (动作)==
- 它会真正触发任务的运行
- 将rdd的计算的结果数据返回给Driver端,或者是保存结果数据到外部存储介质中
- 例如
- collect / saveAsTextFile 等
1),转换操作(TransformAction)
2),行动操作(Action)
行动操作是真正触发计算的地方。Spark程序执行到行动操作时,才会执行真正的计算,从文件中加载数据,完成一次又一次转换操作,最终,完成行动操作得到结果。
动作 | 含义 |
---|---|
reduce(func) | reduce将RDD中元素前两个传给输入函数,产生一个新的return值,新产生的return值与RDD中下一个元素(第三个元素)组成两个元素,再被传给输入函数,直到最后只有一个值为止。 |
collect() | 以数组的形式返回数据集中的所有元素,把散布在各个WorkNode上的数据收集到Driver节点上。 |
count() | 返回RDD的元素个数 |
first() | 返回RDD的第一个元素(类似于take(1)) |
take(n) | 返回一个由数据集的前n个元素组成的数组 |
takeOrdered(n, [ordering]) | 返回自然顺序或者自定义顺序的前 n 个元素 |
saveAsTextFile(path) | 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本 |
saveAsSequenceFile(path) | 将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。 |
saveAsObjectFile(path) | 将数据集的元素,以 Java 序列化的方式保存到指定的目录下 |
countByKey() | 针对(K,V)类型的pairRDD(键值对RDD),返回一个(K,Int)的map,表示每一个key对应的元素个数。 |
foreach(func) | 在数据集的每一个元素上,运行函数func |
foreachPartition(func) | 在数据集的每一个分区上,运行函数func |
3),Shuffle类算子说明
- 去重
def distinct()
def distinct(numPartitions: Int)
- 聚合
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
def groupBy[K](f: T => K, p: Partitioner):RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner):RDD[(K, Iterable[V])]
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner): RDD[(K, U)]
def aggregateByKey[U: ClassTag](zeroValue: U, numPartitions: Int): RDD[(K, U)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)]
- 排序
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]
def sortBy[K](f: (T) => K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
- 重分区
def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null)
- 集合或者表操作
def intersection(other: RDD[T]): RDD[T]
def intersection(other: RDD[T], partitioner: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
def intersection(other: RDD[T], numPartitions: Int): RDD[T]
def subtract(other: RDD[T], numPartitions: Int): RDD[T]
def subtract(other: RDD[T], p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T]
def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)]
def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)]
def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)]
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))]
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
4).惰性机制代码实例
所谓的“惰性机制”是指:整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操作时,才会触发“从头到尾”的真正的计算。
scala> val lines = sc.textFile("/user/local/spark/mycode/data.txt")
scala> val lineLengths = lines.map(s => s.length)
scala> val totalLength = lineLengths.reduce((a, b) => a + b)
/*
1.第三行代码的reduce()方法是一个"行动(Action)"类型的操作,这时候就会触发真正的计算
2.这时,Spark会把计算分解为多个任务在不容的机器中运行,每台机器运行属于他自己的map和reduce,最后把结果返回给Driver
*/
(3).RDD操作实例
1),filter()操作
scala>val lines =sc.textFile(file:///usr/local/spark/mycode/rdd/word.txt)
// 使用filter()函数进行元素的过滤,在函数中添加lambda表达式(执行contains()判断操作),把所有包含"Spark"的哪一行全部都找出来;然后使用count()函数统计出来有多少行
scala>val linesWithSpark=lines.filter(line => line.contains("Spark")).count()
// 上述执行过程中是filter()是转换操作,count()行动操作,当执行count()操作的时候才会触发所有的计算。
2),map()操作
实例要求:找出文本中单行文本所包含的单词数量的最大值。
// 加载文件
scala>val lines =sc.textFile(file:///usr/local/spark/mycode/rdd/word.txt)
/*
1.map操作: 将整个文件的每一行根据空格进行拆分,得到一个数组对象构成的RDD,然后求出拆分之后的大小,即拆分之后的元素的数量,得到一个Int类型的RDD。
scala> lines.map(line=> line.split(" ").size).foreach(println)
3
3
3
2.reduce操作:传入lambda表达式,依次判断当前RDD中的每一个值,如果前一个值大于当前保存的最大值,则保留当前值为最大值。
此时第一次传入的a的值是RDD中第一个元素,b的值是RDD的第二个元素,然后判断a与b的大小,保留大的那个作为下一次的a,紧接着RDD的下一个元素,作为b,再依次往下运行。注意此时是使用的reduce行动操作,对传入的两个参数做聚合操作,可以做数学运算,也可以做判断,类似于三目运算符。
3.注意在转换过程中,RDD内的单个元素的类型是不断的发生变化的
*/
scala>lines.map(line=> line.split(" ").size).reduce((a,b) => if(a>b) a else b)
3).flatmap()操作
// 加载文件到Spark中,然后类型是RDD
scala> val lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
// 使用flatMap()函数,通过传入lambda表达式来指定分割条件;line可以是代表每一行元素,然后分割出每一个元素
scala> val words=lines.flatMap(line => line.split(" "))
// 也可以使用占位符的方式,代表对所有元素进行操作。
scala> val words=lines.flatMap(_.split(" "))
flatmap()操作示例执行过程.png
(4).持久化(Cache-缓存)
背景:
在Spark中,由于RDD采用惰性求值的机制,每次遇到行动操作,都会从头开始执行计算。每次调用行动操作,都会触发一次从头开始的计算。这对于迭代计算而言,代价是很大的,迭代计算经常需要多次重复使用同一组数据。
持久化特点:
- 可以通过持久化(缓存)机制避免这种重复计算的开销。
- 可以使用persist()方法对一个RDD标记为持久化。
- 之所以说“标记为持久化”,是因为出现persist()语句的地方,并不会马上计算生成RDD并把它持久化,而是要等到遇到第一个行动操作触发真正计算以后,才会把计算结果进行持久化。
- 持久化后的RDD将会被保留在计算节点的内存中被后面的行动操作重复使用。
- persist()圆括号中包含的是持久化级别的参数:
- persist(MEMORY_ONLY):表示将RDD作为反序列化的对象存储于JVM中,如果内存不足,就要按照LRU原则替换缓存中的内容。
- persist(MEMORY_AND_DISK)表示将RDD作为反序列化的对象存储在JVM中,如果内存不足,超出的分区将会被存放在硬盘上。
- 一般而言,使用cache()方法时,会调用persist(MEMORY_ONLY)。
- 可以使用unpersist()方法手动地把持久化的RDD从缓存中移除。
- 可以用unpersist()方法来取消RDD缓存;
1).未采用持久化案例
// 初始化集合对象
scala> val list = List("Hadoop","Spark","Hive")
list: List[String] = List(Hadoop, Spark, Hive)
// 使用 parallelize()方法,传入集合对象,将列表类型的数据转换成RDD类型
scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[22] at parallelize at <console>:29
// count()是行动操作,触发一次真正从头到尾的计算
scala> println(rdd.count())
3
// collect()行动操作,触发一次真正从头到尾的计算;
// 使用mkString()函数将元素转成字符串类型,可以通过传入参数实现在转换过程中实现在字符串的拼接过程中加入分隔符。
scala> println(rdd.collect().mkString(","))
Hadoop,Spark,Hive
// 此时需要多次从头到尾的计算,浪费资源,所以需要采用持久化
2).采用持久化案例
// 初始化集合对象
scala> val list = List("Hadoop","Spark","Hive")
list: List[String] = List(Hadoop, Spark, Hive)
// 使用parallelize()方法,传入集合对象,将列表类型的数据转换成RDD类型
scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[22] at parallelize at <console>:29
// 会调用persist(MEMORY_ONLY),但是,语句执行到这里,并不会缓存rdd,因为这时rdd还没有被计算生成,只是标记为持久化。
scala> rdd.cache()
// 第一次count()行动操作,触发一次真正从头到尾的计算,这时上面的rdd.cache()才会被执行,把这个rdd放到缓存中。
scala> println(rdd.count())
3
// 第二次行动操作,不需要触发从头到尾的计算,只需要重复使用上面缓存中的rdd。
scala> println(rdd.collect().mkString(","))
Hadoop,Spark,Hive
(5).RDD的分区
RDD是弹性分布式数据集,通常RDD很大,会被分成很多个分区,分别保存在不同的节点上,可以增加并行度、减小通信开销。
1).分区的作用
A.增加并行度
分区-增加并行度.png也就是下图中红色图标的是可以并行操作的,可以在不同节点上去执行,这就是可以增加并行度。
B.减小通信开销
分区-减小通信开销.png连接操作的时候可以大大降低开销。
group、join等都可以起效益。
2),分区的原则
- RDD分区的一个原则是使得分区的个数尽量等于集群中的CPU核心(core)数目,这样可以最大程度的发挥并行度、最大程度的发挥CPU的性能;
- 默认的分区数目:
对于不同的Spark部署模式而言(本地模式、Standalone模式、YARN模式、Mesos模式),都可以通过设置spark.default.parallelism这个参数的值,来配置默认的分区数目,一般而言:- 本地模式:默认为本地机器的CPU数目,若设置了local[N],则默认为N(在不知道的时候可以写"*")
- ApacheMesos:默认的分区数为8
- Standalone或YARN:在“集群中所有CPU核心数目总和”和“2”二者中取较大值作为默认值
3),手动设置分区
-
在调用textFile()和parallelize()方法的时候手动指定分区个数即可,
语法格式如下:
sc.textFile(path, partitionNum);
sc.parallelize(集合,partitionNum)
其中,path参数用于指定要加载的文件的地址,partitionNum参数用于指定分区个数。 集合可以是数组、List集合等。
示例:scala>val array = Array(1,2,3,4,5) scala>val rdd = sc.parallelize(array,2) // 设置两个分区 scala>val rdd = sc.textFile("/user/spark/word.txt",2) // 设置两个分区
-
使用reparititon():方法重新设置分区个数。
随着计算过程的变换,数据量可能会减少,然后通过转换操作得到新 RDD 时,直接调用 repartition() 方法即可。示例:
scala> val data = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt",2) data: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/mycode/rdd/word.txt MapPartitionsRDD[12] at textFile at <console>:24 scala> data.partitions.size //显示data这个RDD的分区数量 res2: Int=2 scala> val rdd = data.repartition(1) //对data这个RDD进行重新分区 rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at repartition at :26 // 查看当前分区大小 scala> rdd.partitions.size res4: Int = 1
4).分区实例(使用partitionBy(func)对数据进行重新分区)
repartition 和 partitionBy 都是对数据进行重新分区,默认都是使用 HashPartitioner,区别在于partitionBy 只能用于 PairRDD,但是当它们同时都用于 PairRDD时,结果却不一样:其中partitionBy 才是我们所期望的结果,即使是RairRDD也不会使用自己的key,repartition 其实使用了一个随机生成的数来当做 Key,而不是使用原来的 Key!!
题目要求:根据key值的最后一位数字,写到不同的文件。
例如:
10 写入到 part-00000
11 写入到 part-00001
.
.
.
19 写入到 part-00009
// 代码实现
// 导入相应的代码包,因为要引入多个包,所以可以使用大括号将其添加进来
import org.apache.spark.{Partitioner, SparkContext, SparkConf}
// 自定义分区类,需要继承 org.apache.spark.Partitioner 类
class MyPartitioner(numParts:Int) extends Partitioner{
// 覆盖 "分区数" 函数
override def numPartitions: Int = numParts
// 覆盖 "分区号获取" 函数(此时函数的返回值是Int类型的,在函数体中不需要加return关键字)
override def getPartition(key: Any): Int = {
// 根据尾数进行分区,所以需要将传入的数据转换为String类型,然后转成Int类型,再对10取余。
key.toString.toInt%10
}
}
object TestPartitioner {
def main(args: Array[String]) {
val conf=new SparkConf()
val sc=new SparkContext(conf)
// 先生成1-10的数据,使用parallelize(集合,分区数)函数转成RDD,模拟5个分区的数据。
val data=sc.parallelize(1 to 10, 5)
// 根据尾号转变为10个分区,分别写到10个文件
/*
1.(_,1):把所有的元素重新处理,经过map变换,重新生成一个新的映射,如:(XXX,1)的形式,
其中"_"表示集合中的所有元素;
data.map(_, 1) 等价于 data.map(x => (x,1))
第一步的作用是构造一个pairRDD,因为partitionBy(func)方法.
2.调用partitionBy(func)分区方法,此时是传入的参数是我们自定义的分区方法.
3.分区之后的结果对于我们有用的就只是pairRDD的Key,所以使用"_1"的方式访pairRDD的第一个值,也就是key的值
4.保存结果文件到指定路径中(要保存在Linux文件系统中必须要加前缀(file:///),不然默认的路径是hdfs)
*/
data.map((_, 1))
.partitionBy(new MyPartitioner(10))
.map(_._1)
.saveAsTextFile("file:///root/wordCount/partitioner")
}
}
(6).Pair RDD(键值对RDD)
1),创建Pair RDD
A.从文件中加载RDD对象,然后使用map()变换操作
// 加载文件
scala> val lines = sc.textFile("file:///usr/local/spark/mycode/pairrdd/word.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/mycode/pairrdd/word.txt MapPartitionsRDD[1] at textFile at <console>:27
// 使用map()变换操作,依次遍历所有的元素,将每一个元素重新做一个映射<K,V>,转换成pairRDD
scala> val pairRDD = lines.flatMap(line => line.split(" ")).map(word => (word,1))
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at <console>:29
// 遍历打印输出pairRDD中的所有元素,其是根据<K,V>的形式去存储的
scala> pairRDD.foreach(println)
(i,1)
(love,1)
(hadoop,1)
……
B.通过并行集合(数组)创建RDD,然后使用map()变换操作
// 初始化一个集合对象,
scala> val list = List("Hadoop","Spark","Hive","Spark")
list: List[String] = List(Hadoop, Spark, Hive, Spark)
// 将集合转成RDD类型
scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[11] at parallelize at <console>:29
// 使用map()变化,将RDD转为 Pair RDD(键值对RDD)
scala> val pairRDD = rdd.map(word => (word,1))
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[12] at map at <console>:31
// 依次对ParirRDD中的元素进行遍历输出
scala> pairRDD.foreach(println)
(Hadoop,1)
(Spark,1)
(Hive,1)
(Spark,1)
2),常用的键值对RDD转换操作
以下操作的原始数据集(pairRDD中的元素):
(Hadoop,1)
(Spark,1)
(Hive,1)
(Spark,1)
A.reduceByKey(func,[numTasks])
使用传入的func()函数合并具有相同键的值。一般func()函数我们是使用的lambda表达式。
reduceByKey用于对每个key对应的多个value进行merge(合并)操作,最重要的是它能够在本地先进行merge操作,并且merge操作可以通过函数自定义。
// 代码实现:
/*
1.这里的含义是reduceByKey()函数会把所有的相同的key进行去重,只保留一个,然后将相同key的value值进行排列,依次去前两个值赋给a和b,然后计算出来和之后,将和再赋值给a,同时下一个值赋值给b,再依次做队列中剩下的运算。
*/
scala> pairRDD.reduceByKey((a,b)=>a+b).foreach(println)
(Spark,2)
(Hive,1)
(Hadoop,1)
// 第二种方式:使用占位符的方式进行转换,和上述方式是等价的
scala> pairRDD.reduceByKey(_+_).foreach(println)
(Spark,2)
(Hive,1)
(Hadoop,1)
reduceByKey.png
B.groupByKey()
groupByKey([numTasks])的功能是,对具有相同键的值进行分组。
groupByKey也是对每个key进行操作,但只生成一个sequence,groupByKey本身不能自定义函数,需要先用groupByKey生成RDD,然后才能对此RDD通过map进行自定义函数操作。
在pairRDD中按Key分组,将相同Key的元素聚集到同一个分区内,此函数不能接收函数作为参数,只接收一个可选参数任务数,所以不能在RDD分区本地进行聚合计算,如需按Key对Value聚合计算,只能对groupByKey返回的新RDD继续使用其他函数运算.
// 代码实现
/*
1.对四个键值对("spark",1)、("spark",2)、("hadoop",3)和("hadoop",5),
采用groupByKey()后得到的结果是:("spark",(1,2))和("hadoop",(3,5)),
2.其中Iterable[Int]是表示迭代器对象,可以使用 iterator()方法进行遍历。
*/
scala> pairRDD.groupByKey()
res15: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[15] at groupByKey at <console>:34
C.(A,B)两者的区别
- 操作代码示例
// 初始化数组对象:words
scala> val words = Array("one", "two", "two", "three", "three", "three")
// 使用parallelize(words)将数组对象转成RDD类型的数据,然后使用map()变换操作生成Pair RDD
scala> val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))
// 使用reduceByKey进行相同的值的合并
scala> val wordCountsWithReduce = wordPairsRDD.reduceByKey(_ + _)
// 使用groupByKey先生成一个序列化对象(其数据的类型是<K,V>的形式),再使用map()函数做下一变换操作生成一个Pair RDD;
/*
其中t是序列化对象中的每一个键值对(<K,V>)元素,那么"t._1"是取t元素的K(也就是第一个值);
同理,使用"t._2"是取t元素的V(也就是第二个值,此时第二个值是一个迭代器对象,可以使用求和函数对迭代器中所有的元素进行求和);
注意此时使用"_"取值的时候是从1开始取值的。
*/
scala> val wordCountsWithGroup = wordPairsRDD.groupByKey().map(t => (t._1, t._2.sum))
D.Keys()
keys只会把pairRDD中的key返回形成一个新的RDD
// 调用keys方式,取出所有key
scala> pairRDD.keys
res17: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[17] at keys at <console>:34
// 取出key值,并打印输出
scala> pairRDD.keys.foreach(println)
Hadoop
Spark
Hive
Spark
E.Values()
values只会把pairRDD中的value返回形成一个新的RDD。
// 调用values方式,取出所有value
scala> pairRDD.values
res0: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at values at <console>:34
// 取出value值,并打印输出
scala> pairRDD.values.foreach(println)
1
1
1
1
F.sortByKey(boolean)
sortByKey()的功能是返回一个根据键排序的RDD;RDD对象调用此方法的前提必须是一个键值对(pairRDD)。
默认是按key升序排序,如果需要降序排序,则需要传递参数"false"
// 不传参数,默认的按照key升序排序(字母的时候)
scala> pairRDD.sortByKey()
res0: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[2] at sortByKey at <console>:34
// 调用排序方法之后,再打印输出所有的元素
scala> pairRDD.sortByKey().foreach(println)
(Hadoop,1)
(Hive,1)
(Spark,1)
(Spark,1)
G.sortBy(value,boolean)
可以传入传入一个指定的字段(value),按照当前指定的字段进行排序。
/*
1.测试sortByKey(boolean)的排序
2.使用parallelize()方法将Array对象转成RDD类型
*/
scala> val d1 = sc.parallelize(Array(("c",8),("b",25),("c",17),("a",42),("b",4),("d",9),("e",17),("c",2),("f",29),("g",21),("b",9)))
// 使用sortByKey()来按照key降序排序
scala> d1.reduceByKey(_+_).sortByKey(false).collect
res2: Array[(String, Int)] = Array((g,21),(f,29),(e,17),(d,9),(c,27),(b,38),(a,42))
/*
1.测试 reduceBy(value,boolean)的排序
2.使用parallelize()方法将Array对象转成RDD类型
*/
scala> val d2 = sc.parallelize(Array(("c",8),("b",25),("c",17),("a",42),("b",4),("d",9),("e",17),("c",2),("f",29),("g",21),("b",9)))
// 使用sortBy(value,boolean)函数进行按照值进行降序排序
// "_._2":这里第一个"_"表是占位符,是将所有的元素取出来赋值给"_" ;第二个"._2"表取键值对的第二个元素
scala> d2.reduceByKey(_+_).sortBy(_._2,false).collect
res4: Array[(String, Int)] = Array((a,42),(b,38),(f,29),(c,27),(g,21),(e,17),(d,9))
H.mapValues(func)
对pairRDD中的每个value都应用一个函数,但是,key不会发生变化.
// 依次对pairRDD的value值进行+1操作
scala> pairRDD.mapValues(x => x+1)
res2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at mapValues at <console>:34
// 执行+1操作之后,执行遍历操作进行打印输出
scala> pairRDD.mapValues(x => x+1).foreach(println)
(Hadoop,2)
(Spark,2)
(Hive,2)
(Spark,2)
I.join
join就表示内连接。对于内连接,对于给定的两个输入数据集(K,V1)和(K,V2),对每个 key 下的元素进行笛卡尔积操作,返回的结果再展平,只有在两个数据集中都存在的key才会被输出,最终得到一个(K,(V1,V2))类型的数据集合。
// 使用parallelize()方法,传入一个Array数组,构造pairRDD1
scala> val pairRDD1 = sc.parallelize(Array(("spark",1),("spark",2),("hadoop",3),("hadoop",5)))
pairRDD1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[24] at parallelize at <console>:27
// 使用parallelize()方法,传入一个Array数组,构造pairRDD2
scala> val pairRDD2 = sc.parallelize(Array(("spark","fast")))
pairRDD2: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[25] at parallelize at <console>:27
// 执行连接操作,将相同的key进行连接,得到一个新的<K,V>对
scala> pairRDD1.join(pairRDD2)
res9: org.apache.spark.rdd.RDD[(String, (Int, String))] = MapPartitionsRDD[28] at join at <console>:32
// 将连接操作之后的结果打印输出
scala> pairRDD1.join(pairRDD2).foreach(println)
(spark,(1,fast))
(spark,(2,fast))
3).pairRDD综合实例
题目:给定一组键值对("spark",2),("hadoop",6),("hadoop",4),("spark",6),键值对的key表示图书名称,value表示某天图书销量,请计算每个键对应的平均值,也就是计算每种图书的每天平均销量。
// 初始化RDD对象
scala> val rdd = sc.parallelize(Array(("spark",2),("hadoop",6),("hadoop",4),("spark",6)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[38] at parallelize at <console>:27
/*
1.题目中要求是每天的平均销量,但是没有给出天数,只是给出的几个键值对,说明每个键值对是代表一天的,所以使用mapValues()操作,将 value 值重新做一个映射(传入的参数是lambda表达式,将键值对赋值给原来的value值);
2.执行reduceByKey()操作将value中的两个值分别取出来进行分别相加,第一个值是销量,第二个值是天数;得到总销量和总天数
3.再调用mapValues()函数对value值进行计算,这里第一个值是总销量,第二个值是总天数,然后使用总销量除以总天数.
4.调用collect()方法将其他节点的数据取出到driver节点中
*/
scala> rdd.mapValues(x => (x,1))
.reduceByKey((x,y) => (x._1+y._1,x._2 + y._2))
.mapValues(x => (x._1 / x._2)).collect()
res22: Array[(String, Int)] = Array((spark,4), (hadoop,5))
pairRDD综合案例.jpg
(7). 共享变量
1).广播变量
①,简介
-
对于某些时候,我们需要大表join小表,或者小表join大表的时候,我们就可以考虑使用广播变量,将小表的数据一次性全部读取加载,然后进行join,避免shuffle的过程产生,提高数据处理速度。
-
我们可以通过使用广播变量,将小表数据存放到每个Executor的内存中,只驻留一份变量副本, 而不是对每个 task 都传输一次大变量,省了很多的网络传输, 对性能提升具有很大帮助, 而且会通过高效的广播算法来减少传输代价。
因为如果task的数目十分多的情况下,Driver的带宽就会成为系统的瓶颈,而且会大量的消耗task服务器上的资源
-
使用广播变量的场景很多, 我们都知道spark 一种常见的优化方式就是小表广播, 使用 map join 来代替 reduce join, 我们通过把小的数据集广播到各个节点上,节省了一次特别 expensive 的 shuffle 操作。
比如driver 上有一张数据量很小的表, 其他节点上的task 都需要 lookup 这张表, 那么 driver 可以先把这张表 copy 到这些节点,这样 task 就可以在本地查表了。
-
Spark的"行动"操作会跨越很多个阶段(Stage),对于每个阶段内的所有任务所需要的公共数据,Spark都会自动进行广播。
// 可以通过调用SparkContext.broadcast(V)来从一 个普通变量V中创建一个广播变量。这个广播变量就是对普通变量V的一个包装器,通过调用value方法就可以获得这个广播变量的值,具体代码如下: scala> val broadcastVar = sc.broadcast(Array(1, 2, 3)) broadcastVar: org. apache spark,broadcast.Broadcast[Array[Int] = Broadcast(O) scala> broadcastVar.value res0: Array[Int] = Array(1, 2, 3) /* 1.这个广播变量被创建以后,那么在集群中的任何函数中,都应该使用广播变量broadcastVar的值,而不是使用V的值,这样就不会把V重复分发到这些节点上. 2.此外,一旦广播变量创建后,普通变量V的值就不能再发生修改,从而确保所有节点都获得这个广播变量的相同的值. */
import org.apache.spark.SparkConf import org.apache spark.SparkContext object BroadCastValue { def main(args:Array[String]):Unit={ val conf=new SparkConf().setAppName("BroadCastValue1").setMaster("local[1]") //获取SparkContext val sc=new SparkContext(conf) //创建广播变量 val broads=sc.broadcast(3) // 变量可以是任意类型 //创建一个测试的List val lists=List(1,2,3,4,5) //转换为rdd (并行化) val listRDD=sc.parallelize(lists) //map操作数据 val results=listRDD.map(x=>x*broads.value) //遍历结果 rsultsfreach(x => printin("The result is: "+x)) sc.stop }
②,注意事项
-
能不能将一个RDD使用广播变量广播出去?
答:不能,因为RDD本身只是数据的抽象,实际并不存储数据。但是我们可以通过Spark的collect算子处理之后,将数据收集到Driver端,然后将结果广播出去
-
广播变量只能在Driver端定义,在Executor端只读使用。
-
在Driver端可以修改广播变量的值,当广播变量广播到Executor端的时候就无法更改了
-
如果Executor端是否使用到了Driver的变量,分以下两种情况:
- 如果不使用广播变量在Executor有多少task就有多少Driver端的变量副本。
- 如果使用广播变量在每个Executor中只有一份Driver端的变量副本。
2).累加器
- 累加器是仅仅被相关操作累加的变量,通常可以被用来实现计数器( counter )和求和( sum)。Spark原生地支持数值型( numeric )的累加器,程序开发人员可以编写对新类型的支持。
- 一个数值型的累加器 ,可以通过调用SparkContext.longAccumulator或者SparkContext.doubleAccumulator0)来创建运行在集群中的任务,就可以使用add方法来把数值累加到累加器上,但是,这些任务只能做累加操作,不能读取累加器的值只有任务控制节点( Driver Program )可以使用value方法来读取累加器的值。
演示了使用累加器来对一个数组中的元素进行求和
scala> val accum = sc.longAccumulator("My Accumulator")
accum: org. apache spark. util.L ongAccumulato = LongAccumulator(id: 0, name:
Some(My Accumulator), value: 0)
// 在每一个worker节点上执行add操作
scala> sc parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
// 实际上是在driver节点调用value值
scala> accum.value
res1: Long= 10
网友评论