1.自定义一个集合val list = List("张三:20","李四:21","王五:20"),分三个区 使用mapPartitions算子构建一个新的集合,要求内部元素的格式为(String,Int),分别代表姓名和年龄
2.自定义一个集合val list = List(1,2,3,4,5,6,7,8,9),分三个区 使用mapPartitionsWithIndex算子构建一个新的集合,要求记录每个分区的元素,同时记录每个分区元素的平方和
3.自定义一个集合val list = List((1,1),(1,3),(1,5),(1,7),(2,2),(2,4),(2,6),(2,8))(1)使用aggregateByKey算子构建一个新的集合,要求输出每个key对应的元素乘积
(2)使用aggregateByKey算子构建一个新的集合,要求输出每个key对应的元素加和
object Work01 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Master")
val sc = new SparkContext(sparkConf)
val list = List("张三:20","李四:21","王五:20")
val listRDD: RDD[String] = sc.parallelize(list,3)
val f : (Iterator[String]) => (Iterator[(String,Int)]) = it => {
var result:ListBuffer[(String,Int)] = ListBuffer()
while (it.hasNext){
val value = it.next()
val name = value.split(":")(0)
val age = value.split(":")(1).toInt
result.append((name,age))
}
result.iterator
}
println(listRDD.mapPartitions(f).partitions.size)
// 重新分区 - 传入分区器(分区数量)
println(listRDD.mapPartitions(f).partitionBy(new HashPartitioner(1)).partitions.size)
}
}
object Work02 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Master")
val sc = new SparkContext(sparkConf)
val list = List(1,2,3,4,5,6,7,8,9)
val listRDD: RDD[Int] = sc.parallelize(list,3)
val f: (Int,Iterator[Int]) => (Iterator[(Int,Int)]) = (index,it) => {
var sum = 0
var result:ListBuffer[(Int,Int)] = ListBuffer()
while (it.hasNext){
val value = it.next()
sum += (value * value)
result.append((index,value))
}
result.append((index,sum))
result.iterator
}
listRDD.mapPartitionsWithIndex(f).groupByKey().foreach(println)
}
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Master")
val sc = new SparkContext(sparkConf)
val list = List((1,1),(1,3),(1,5),(1,7),(2,2),(2,4),(2,6),(2,8))
val pairRDD: RDD[(Int,Int)] = sc.parallelize(list)
pairRDD.groupByKey().foreach(println)
val seq1: (Int,Int) => (Int) = (zero,value) => {
zero * value
}
val com1: (Int,Int) => (Int) = (par,otherPar) => {
par * otherPar
}
pairRDD.aggregateByKey(1)(seq1,com1).foreach(println)
val seq2: (Int,Int) => (Int) = (zero,value) => {
zero + value
}
val com2: (Int,Int) => (Int) = (par,otherPar) => {
par + otherPar
}
pairRDD.aggregateByKey(0)(seq2,com2).foreach(println)
}
scala> var data = sc.parallelize(List((1,3),(1,2),(1, 4),(2,3),(7,9),(2,4)))
scala> data.sortByKey(true).collect
res10: Array[(Int, Int)] = Array((1,3), (1,2), (1,4), (2,3), (2,4), (7,9))
-
join(otherDataset, [numTasks])
对于数据集类型为 (K, V) 及 (K, W)的RDD,join操作后返回类型为 (K, (V, W))
scala> val rdd1=sc.parallelize(Array((1,2),(1,3))
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[24] at parallelize at <console>:21
scala> val rdd2=sc.parallelize(Array((1,3)))
rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[32] at parallelize at <console>:21
scala> rdd1.join(rdd2).collect
res13: Array[(Int, (Int, Int))] = Array((1,(3,3)), (1,(2,3)))
-
cogroup(otherDataset, [numTasks])
如果输入的RDD类型为(K, V) 和(K, W),则返回的RDD类型为 (K, (Iterable, Iterable)) . 该操作与 groupWith等同
scala> val rdd1=sc.parallelize(Array((1,2),(1,3))
| )
rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[24] at parallelize at <console>:21
scala> val rdd2=sc.parallelize(Array((1,3)))
rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[32] at parallelize at <console>:21
scala> rdd1.cogroup(rdd2).collect
res17: Array[(Int, (Iterable[Int], Iterable[Int]))] = Array((1,(CompactBuffer(3, 2),CompactBuffer(3))))
scala> rdd1.groupWith(rdd2).collect
res18: Array[(Int, (Iterable[Int], Iterable[Int]))] = Array((1,(CompactBuffer(2, 3),CompactBuffer(3))))
---------------------
-
pipe(command):
返回一个新的RDD,使用shell命令对各分区进行管
道化,生成新的RDD
val list = List(1,2,3,4,5,6,7,8)
val listRDD = sc.parallelize(list,4)
val resultRDD = listRDD.pipe("head -n 1")
resultRDD.foreach(println)
// 结果: 1 3 5 7
-
coalesce(numPartitions):
返回一个新的RDD,将原RDD重新分区,将分区数量减少到指定个数,默认不执行shuffle操作
val list = List(1,2,3,4,5,6,7,8)
val listRDD = sc.parallelize(list,4)
listRDD.foreach(println)
// 结果:1 2 -> 3 4 -> 5 6 -> 7 8
val resultRDD = listRDD.coalesce(2)
resultRDD.foreach(println)
// 结果:1 2 3 4 -> 5 6 7 8
- repartition(numPartitions):返回一个新的RDD,将原RDD重新分
区,尽量保持各分区数据平衡,会执行shuffle操作
repartition(numPartitions):返回一个新的RDD,将原RDD重新分区,尽量保持各分区数据平衡,会执行shuffle操作
val list = List(1,2,3,4,5,6,7,8)
val listRDD = sc.parallelize(list,4)
val resultRDD = listRDD.repartition(2)
resultRDD.foreach(println)
// 结果: 1 3 5 7 -> 2 4 6 8
Active
- reduce
val list = List(1,2,3,4,5,6)
val listRDD = sc.parallelize(list)
val result = listRDD.reduce(_
+
_)
// 结果: 21
- collect
以数组的形式返回RDD中的所有元素
val list = List(1,2,3,4,5,6)
val listRDD = sc.parallelize(list)
val result = listRDD.collect
// 结果: Array(1, 2, 3, 4, 5, 6)
- count:返回RDD中元素的个数,数据类型为Long
val list = List(1,2,3,4,5,6)
val listRDD = sc.parallelize(list)
val result = listRDD.count
// 结果: 6
重点:
first
:返回RDD中的第一个元素
val list = List(1,2,3,4,5,6)
val listRDD = sc.parallelize(list)
val result = listRDD.first
// 结果: 1
- takeSample(withReplacement,num[,seed])
对一个RDD进行随机抽
样,withReplacement用于指定是否有放回,num为返回的数据数
量,结果格式为数组
val list = List(1,2,3,4,5,6,7,8,9,10)
val listRDD = sc.parallelize(list)
val result = listRDD.takeSample(false,4)
// 结果: Array(5, 8, 10, 6)
-
take(num):
返回一个RDD中的前n个元素,结果格式为数组
val list = List(1,2,3,4,5,6,7,8,9,10)
val listRDD = sc.parallelize(list)
val result = listRDD.take(4)
// 结果: Array(1, 2, 3, 4)
-
takeOrdered(num):
返回RDD的前n个元素,默认升序排序,也可以
传入自定义比较器,结果格式为数组
val list = List(6,5,4,3,2,1)
val listRDD = sc.parallelize(list)
val result = listRDD.takeOrdered(4)
// 结果: Array(1, 2, 3, 4)
网友评论