1.自定义一个集合val list = List("张三:20","李四:21","王五:20"),分三个区 使用mapPartitions算子构建一个新的集合,要求内部元素的格式为(String,Int),分别代表姓名和年龄
2.自定义一个集合val list = List(1,2,3,4,5,6,7,8,9),分三个区 使用mapPartitionsWithIndex算子构建一个新的集合,要求记录每个分区的元素,同时记录每个分区元素的平方和
3.自定义一个集合val list = List((1,1),(1,3),(1,5),(1,7),(2,2),(2,4),(2,6),(2,8))(1)使用aggregateByKey算子构建一个新的集合,要求输出每个key对应的元素乘积
(2)使用aggregateByKey算子构建一个新的集合,要求输出每个key对应的元素加和
object Work01 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Master")
val sc = new SparkContext(sparkConf)
val list = List("张三:20","李四:21","王五:20")
val listRDD: RDD[String] = sc.parallelize(list,3)
val f : (Iterator[String]) => (Iterator[(String,Int)]) = it => {
var result:ListBuffer[(String,Int)] = ListBuffer()
while (it.hasNext){
val value = it.next()
val name = value.split(":")(0)
val age = value.split(":")(1).toInt
result.append((name,age))
}
result.iterator
}
println(listRDD.mapPartitions(f).partitions.size)
// 重新分区 - 传入分区器(分区数量)
println(listRDD.mapPartitions(f).partitionBy(new HashPartitioner(1)).partitions.size)
}
}
object Work02 {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Master")
val sc = new SparkContext(sparkConf)
val list = List(1,2,3,4,5,6,7,8,9)
val listRDD: RDD[Int] = sc.parallelize(list,3)
val f: (Int,Iterator[Int]) => (Iterator[(Int,Int)]) = (index,it) => {
var sum = 0
var result:ListBuffer[(Int,Int)] = ListBuffer()
while (it.hasNext){
val value = it.next()
sum += (value * value)
result.append((index,value))
}
result.append((index,sum))
result.iterator
}
listRDD.mapPartitionsWithIndex(f).groupByKey().foreach(println)
}
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Master")
val sc = new SparkContext(sparkConf)
val list = List((1,1),(1,3),(1,5),(1,7),(2,2),(2,4),(2,6),(2,8))
val pairRDD: RDD[(Int,Int)] = sc.parallelize(list)
pairRDD.groupByKey().foreach(println)
val seq1: (Int,Int) => (Int) = (zero,value) => {
zero * value
}
val com1: (Int,Int) => (Int) = (par,otherPar) => {
par * otherPar
}
pairRDD.aggregateByKey(1)(seq1,com1).foreach(println)
val seq2: (Int,Int) => (Int) = (zero,value) => {
zero + value
}
val com2: (Int,Int) => (Int) = (par,otherPar) => {
par + otherPar
}
pairRDD.aggregateByKey(0)(seq2,com2).foreach(println)
}
网友评论